Coverage for / dolfinx-env / lib / python3.12 / site-packages / io4dolfinx / utils.py: 99%
92 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-26 18:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-26 18:16 +0000
1# Copyright (C) 2023 Jørgen Schartum Dokken
2#
3# This file is part of io4dolfinx
4#
5# SPDX-License-Identifier: MIT
7"""
8Vectorized numpy operations used internally in io4dolfinx
9"""
11from __future__ import annotations
13from pathlib import Path
15from mpi4py import MPI
17import basix.ufl
18import dolfinx
19import numpy as np
20import numpy.typing as npt
21import ufl
22from packaging.version import Version
24__all__ = [
25 "check_file_exists",
26 "compute_local_range",
27 "index_owner",
28 "compute_dofmap_pos",
29 "unroll_dofmap",
30 "compute_insert_position",
31 "unroll_insert_position",
32 "reconstruct_mesh",
33]
36def check_file_exists(filename: Path | str):
37 """Check if file exists."""
38 if not Path(filename).exists():
39 raise FileNotFoundError(f"{filename} not found")
42valid_function_types = np.float32 | np.float64 | np.complex64 | np.complex128
43valid_real_types = np.float32 | np.float64
46def element_signature(V):
47 if Version(dolfinx.__version__) > Version("0.9.0"):
48 return V.element.signature
49 else:
50 return V.element.signature()
53def compute_insert_position(
54 data_owner: npt.NDArray[np.int32],
55 destination_ranks: npt.NDArray[np.int32],
56 out_size: npt.NDArray[np.int32],
57) -> npt.NDArray[np.int32]:
58 """
59 Giving a list of ranks, compute the local insert position for each rank in a list
60 sorted by destination ranks. This function is used for packing data from a
61 given process to its destination processes.
63 Example:
65 .. highlight:: python
66 .. code-block:: python
68 data_owner = [0, 1, 1, 0, 2, 3]
69 destination_ranks = [2,0,3,1]
70 out_size = [1, 2, 1, 2]
71 insert_position = compute_insert_position(data_owner, destination_ranks, out_size)
73 Insert position is then ``[1, 4, 5, 2, 0, 3]``
74 """
75 process_pos_indicator = data_owner.reshape(-1, 1) == destination_ranks
77 # Compute offsets for insertion based on input size
78 send_offsets = np.zeros(len(out_size) + 1, dtype=np.intc)
79 send_offsets[1:] = np.cumsum(out_size)
80 assert send_offsets[-1] == len(data_owner)
82 # Compute local insert index on each process
83 proc_row, proc_col = np.nonzero(process_pos_indicator)
84 cum_pos = np.cumsum(process_pos_indicator, axis=0)
85 insert_position = cum_pos[proc_row, proc_col] - 1
87 # Add process offset for each local index
88 insert_position += send_offsets[proc_col]
89 return insert_position
92def unroll_insert_position(
93 insert_position: npt.NDArray[np.int32], block_size: int
94) -> npt.NDArray[np.int32]:
95 """
96 Unroll insert position by a block size
98 Example:
101 .. highlight:: python
102 .. code-block:: python
104 insert_position = [1, 4, 5, 2, 0, 3]
105 unrolled_ip = unroll_insert_position(insert_position, 3)
107 where ``unrolled_ip = [3, 4 ,5, 12, 13, 14, 15, 16, 17, 6, 7, 8, 0, 1, 2, 9, 10, 11]``
108 """
109 unrolled_ip = np.repeat(insert_position, block_size) * block_size
110 unrolled_ip += np.tile(np.arange(block_size), len(insert_position))
111 return unrolled_ip
114def compute_local_range(comm: MPI.Intracomm, N: np.int64):
115 """
116 Divide a set of `N` objects into `M` partitions, where `M` is
117 the size of the MPI communicator `comm`.
119 NOTE: If N is not divisible by the number of ranks, the first `r`
120 processes gets an extra value
122 Returns the local range of values
123 """
124 rank = comm.rank
125 size = comm.size
126 n = N // size
127 r = N % size
128 # First r processes has one extra value
129 if rank < r:
130 return [rank * (n + 1), (rank + 1) * (n + 1)]
131 else:
132 return [rank * n + r, (rank + 1) * n + r]
135def index_owner(
136 comm: MPI.Intracomm, indices: npt.NDArray[np.int64], N: np.int64
137) -> npt.NDArray[np.int32]:
138 """
139 Find which rank (local to comm) which owns an `index`, given that
140 data of size `N` has been split equally among the ranks.
142 NOTE: If `N` is not divisible by the number of ranks, the first `r`
143 processes gets an extra value.
144 """
145 size = comm.size
146 assert (indices < N).all()
147 n = N // size
148 r = N % size
150 owner = np.empty_like(indices, dtype=np.int32)
151 inc_remainder = indices < (n + 1) * r
152 owner[inc_remainder] = indices[inc_remainder] // (n + 1)
153 owner[~inc_remainder] = r + (indices[~inc_remainder] - r * (n + 1)) // n
154 return owner
157def unroll_dofmap(dofs: npt.NDArray[np.int32], bs: int) -> npt.NDArray[np.int32]:
158 """
159 Given a two-dimensional dofmap of size `(num_cells, num_dofs_per_cell)`
160 Expand the dofmap by its block size such that the resulting array
161 is of size `(num_cells, bs*num_dofs_per_cell)`
162 """
163 num_cells, num_dofs_per_cell = dofs.shape
164 unrolled_dofmap = np.repeat(dofs, bs).reshape(num_cells, num_dofs_per_cell * bs) * bs
165 unrolled_dofmap += np.tile(np.arange(bs), num_dofs_per_cell)
166 return unrolled_dofmap
169def compute_dofmap_pos(
170 V: dolfinx.fem.FunctionSpace,
171) -> tuple[npt.NDArray[np.int32], npt.NDArray[np.int32]]:
172 """
173 Compute a map from each owned dof in the dofmap to a single cell owned by the
174 process, and the relative position of the dof.
176 :param V: The function space
177 :returns: The tuple (`cells`, `dof_pos`) where each array is the size of the
178 number of owned dofs (unrolled for block size)
179 """
180 dofs = V.dofmap.list
181 mesh = V.mesh
182 num_owned_cells = mesh.topology.index_map(mesh.topology.dim).size_local
183 dofmap_bs = V.dofmap.bs
184 num_owned_dofs = V.dofmap.index_map.size_local * V.dofmap.index_map_bs
186 local_cell = np.empty(
187 num_owned_dofs, dtype=np.int32
188 ) # Local cell index for each dof owned by process
189 dof_pos = np.empty(num_owned_dofs, dtype=np.int32) # Position in dofmap for said dof
191 unrolled_dofmap = unroll_dofmap(dofs[:num_owned_cells, :], dofmap_bs)
192 markers = unrolled_dofmap < num_owned_dofs
193 local_indices = np.broadcast_to(np.arange(markers.shape[1]), markers.shape)
194 cell_indicator = np.broadcast_to(
195 np.arange(num_owned_cells, dtype=np.int32).reshape(-1, 1),
196 (num_owned_cells, markers.shape[1]),
197 )
198 indicator = unrolled_dofmap[markers].reshape(-1)
199 local_cell[indicator] = cell_indicator[markers].reshape(-1)
200 dof_pos[indicator] = local_indices[markers].reshape(-1)
201 return local_cell, dof_pos
204def reconstruct_mesh(mesh: dolfinx.mesh.Mesh, coordinate_element_degree: int) -> dolfinx.mesh.Mesh:
205 """
206 Make a copy of a mesh and potentially change the element of the coordinate element.
208 Note:
209 The topology is shared with the original mesh but the geometry is reconstructed.
211 Args:
212 mesh: Mesh to reconstruct
213 coordinate_element_degree: Degree to use for coordinate element
215 Returns:
216 The new mesh
218 """
219 # Extract cell properties
220 ud = mesh.ufl_domain()
221 assert ud is not None
222 c_el = ud.ufl_coordinate_element()
223 family = c_el.family_name
224 lvar = c_el.lagrange_variant
225 ct = c_el.cell_type
227 # Create new UFL element
228 new_c_el = basix.ufl.element(
229 family,
230 ct,
231 coordinate_element_degree,
232 shape=(mesh.geometry.dim,),
233 lagrange_variant=lvar,
234 dtype=mesh.geometry.x.dtype,
235 )
237 # Extract new node coordinates
238 V_tmp = dolfinx.fem.functionspace(mesh, new_c_el)
239 gdim = mesh.geometry.dim
240 x = V_tmp.tabulate_dof_coordinates()[:, :gdim]
242 # Create new geoemtry
243 geom_imap = V_tmp.dofmap.index_map
244 geom_dofmap = V_tmp.dofmap.list
245 num_nodes_local = geom_imap.size_local + geom_imap.num_ghosts
246 original_input_indices = geom_imap.local_to_global(np.arange(num_nodes_local, dtype=np.int32))
247 coordinate_element = dolfinx.fem.coordinate_element(
248 mesh.topology.cell_type, coordinate_element_degree, lvar, dtype=mesh.geometry.x.dtype
249 )
250 # Could use create_geometry here when things are fixed
251 geom = dolfinx.mesh.Geometry(
252 type(mesh.geometry._cpp_object)(
253 geom_imap, geom_dofmap, coordinate_element._cpp_object, x, original_input_indices
254 )
255 )
257 # Create new mesh
258 new_top = mesh.topology
259 cpp_mesh = type(mesh._cpp_object)(mesh.comm, new_top._cpp_object, geom._cpp_object)
260 return dolfinx.mesh.Mesh(cpp_mesh, ufl.Mesh(new_c_el))