Coverage for  / dolfinx-env / lib / python3.12 / site-packages / io4dolfinx / utils.py: 99%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-26 18:16 +0000

1# Copyright (C) 2023 Jørgen Schartum Dokken 

2# 

3# This file is part of io4dolfinx 

4# 

5# SPDX-License-Identifier: MIT 

6 

7""" 

8Vectorized numpy operations used internally in io4dolfinx 

9""" 

10 

11from __future__ import annotations 

12 

13from pathlib import Path 

14 

15from mpi4py import MPI 

16 

17import basix.ufl 

18import dolfinx 

19import numpy as np 

20import numpy.typing as npt 

21import ufl 

22from packaging.version import Version 

23 

24__all__ = [ 

25 "check_file_exists", 

26 "compute_local_range", 

27 "index_owner", 

28 "compute_dofmap_pos", 

29 "unroll_dofmap", 

30 "compute_insert_position", 

31 "unroll_insert_position", 

32 "reconstruct_mesh", 

33] 

34 

35 

36def check_file_exists(filename: Path | str): 

37 """Check if file exists.""" 

38 if not Path(filename).exists(): 

39 raise FileNotFoundError(f"{filename} not found") 

40 

41 

42valid_function_types = np.float32 | np.float64 | np.complex64 | np.complex128 

43valid_real_types = np.float32 | np.float64 

44 

45 

46def element_signature(V): 

47 if Version(dolfinx.__version__) > Version("0.9.0"): 

48 return V.element.signature 

49 else: 

50 return V.element.signature() 

51 

52 

53def compute_insert_position( 

54 data_owner: npt.NDArray[np.int32], 

55 destination_ranks: npt.NDArray[np.int32], 

56 out_size: npt.NDArray[np.int32], 

57) -> npt.NDArray[np.int32]: 

58 """ 

59 Giving a list of ranks, compute the local insert position for each rank in a list 

60 sorted by destination ranks. This function is used for packing data from a 

61 given process to its destination processes. 

62 

63 Example: 

64 

65 .. highlight:: python 

66 .. code-block:: python 

67 

68 data_owner = [0, 1, 1, 0, 2, 3] 

69 destination_ranks = [2,0,3,1] 

70 out_size = [1, 2, 1, 2] 

71 insert_position = compute_insert_position(data_owner, destination_ranks, out_size) 

72 

73 Insert position is then ``[1, 4, 5, 2, 0, 3]`` 

74 """ 

75 process_pos_indicator = data_owner.reshape(-1, 1) == destination_ranks 

76 

77 # Compute offsets for insertion based on input size 

78 send_offsets = np.zeros(len(out_size) + 1, dtype=np.intc) 

79 send_offsets[1:] = np.cumsum(out_size) 

80 assert send_offsets[-1] == len(data_owner) 

81 

82 # Compute local insert index on each process 

83 proc_row, proc_col = np.nonzero(process_pos_indicator) 

84 cum_pos = np.cumsum(process_pos_indicator, axis=0) 

85 insert_position = cum_pos[proc_row, proc_col] - 1 

86 

87 # Add process offset for each local index 

88 insert_position += send_offsets[proc_col] 

89 return insert_position 

90 

91 

92def unroll_insert_position( 

93 insert_position: npt.NDArray[np.int32], block_size: int 

94) -> npt.NDArray[np.int32]: 

95 """ 

96 Unroll insert position by a block size 

97 

98 Example: 

99 

100 

101 .. highlight:: python 

102 .. code-block:: python 

103 

104 insert_position = [1, 4, 5, 2, 0, 3] 

105 unrolled_ip = unroll_insert_position(insert_position, 3) 

106 

107 where ``unrolled_ip = [3, 4 ,5, 12, 13, 14, 15, 16, 17, 6, 7, 8, 0, 1, 2, 9, 10, 11]`` 

108 """ 

109 unrolled_ip = np.repeat(insert_position, block_size) * block_size 

110 unrolled_ip += np.tile(np.arange(block_size), len(insert_position)) 

111 return unrolled_ip 

112 

113 

114def compute_local_range(comm: MPI.Intracomm, N: np.int64): 

115 """ 

116 Divide a set of `N` objects into `M` partitions, where `M` is 

117 the size of the MPI communicator `comm`. 

118 

119 NOTE: If N is not divisible by the number of ranks, the first `r` 

120 processes gets an extra value 

121 

122 Returns the local range of values 

123 """ 

124 rank = comm.rank 

125 size = comm.size 

126 n = N // size 

127 r = N % size 

128 # First r processes has one extra value 

129 if rank < r: 

130 return [rank * (n + 1), (rank + 1) * (n + 1)] 

131 else: 

132 return [rank * n + r, (rank + 1) * n + r] 

133 

134 

135def index_owner( 

136 comm: MPI.Intracomm, indices: npt.NDArray[np.int64], N: np.int64 

137) -> npt.NDArray[np.int32]: 

138 """ 

139 Find which rank (local to comm) which owns an `index`, given that 

140 data of size `N` has been split equally among the ranks. 

141 

142 NOTE: If `N` is not divisible by the number of ranks, the first `r` 

143 processes gets an extra value. 

144 """ 

145 size = comm.size 

146 assert (indices < N).all() 

147 n = N // size 

148 r = N % size 

149 

150 owner = np.empty_like(indices, dtype=np.int32) 

151 inc_remainder = indices < (n + 1) * r 

152 owner[inc_remainder] = indices[inc_remainder] // (n + 1) 

153 owner[~inc_remainder] = r + (indices[~inc_remainder] - r * (n + 1)) // n 

154 return owner 

155 

156 

157def unroll_dofmap(dofs: npt.NDArray[np.int32], bs: int) -> npt.NDArray[np.int32]: 

158 """ 

159 Given a two-dimensional dofmap of size `(num_cells, num_dofs_per_cell)` 

160 Expand the dofmap by its block size such that the resulting array 

161 is of size `(num_cells, bs*num_dofs_per_cell)` 

162 """ 

163 num_cells, num_dofs_per_cell = dofs.shape 

164 unrolled_dofmap = np.repeat(dofs, bs).reshape(num_cells, num_dofs_per_cell * bs) * bs 

165 unrolled_dofmap += np.tile(np.arange(bs), num_dofs_per_cell) 

166 return unrolled_dofmap 

167 

168 

169def compute_dofmap_pos( 

170 V: dolfinx.fem.FunctionSpace, 

171) -> tuple[npt.NDArray[np.int32], npt.NDArray[np.int32]]: 

172 """ 

173 Compute a map from each owned dof in the dofmap to a single cell owned by the 

174 process, and the relative position of the dof. 

175 

176 :param V: The function space 

177 :returns: The tuple (`cells`, `dof_pos`) where each array is the size of the 

178 number of owned dofs (unrolled for block size) 

179 """ 

180 dofs = V.dofmap.list 

181 mesh = V.mesh 

182 num_owned_cells = mesh.topology.index_map(mesh.topology.dim).size_local 

183 dofmap_bs = V.dofmap.bs 

184 num_owned_dofs = V.dofmap.index_map.size_local * V.dofmap.index_map_bs 

185 

186 local_cell = np.empty( 

187 num_owned_dofs, dtype=np.int32 

188 ) # Local cell index for each dof owned by process 

189 dof_pos = np.empty(num_owned_dofs, dtype=np.int32) # Position in dofmap for said dof 

190 

191 unrolled_dofmap = unroll_dofmap(dofs[:num_owned_cells, :], dofmap_bs) 

192 markers = unrolled_dofmap < num_owned_dofs 

193 local_indices = np.broadcast_to(np.arange(markers.shape[1]), markers.shape) 

194 cell_indicator = np.broadcast_to( 

195 np.arange(num_owned_cells, dtype=np.int32).reshape(-1, 1), 

196 (num_owned_cells, markers.shape[1]), 

197 ) 

198 indicator = unrolled_dofmap[markers].reshape(-1) 

199 local_cell[indicator] = cell_indicator[markers].reshape(-1) 

200 dof_pos[indicator] = local_indices[markers].reshape(-1) 

201 return local_cell, dof_pos 

202 

203 

204def reconstruct_mesh(mesh: dolfinx.mesh.Mesh, coordinate_element_degree: int) -> dolfinx.mesh.Mesh: 

205 """ 

206 Make a copy of a mesh and potentially change the element of the coordinate element. 

207 

208 Note: 

209 The topology is shared with the original mesh but the geometry is reconstructed. 

210 

211 Args: 

212 mesh: Mesh to reconstruct 

213 coordinate_element_degree: Degree to use for coordinate element 

214 

215 Returns: 

216 The new mesh 

217 

218 """ 

219 # Extract cell properties 

220 ud = mesh.ufl_domain() 

221 assert ud is not None 

222 c_el = ud.ufl_coordinate_element() 

223 family = c_el.family_name 

224 lvar = c_el.lagrange_variant 

225 ct = c_el.cell_type 

226 

227 # Create new UFL element 

228 new_c_el = basix.ufl.element( 

229 family, 

230 ct, 

231 coordinate_element_degree, 

232 shape=(mesh.geometry.dim,), 

233 lagrange_variant=lvar, 

234 dtype=mesh.geometry.x.dtype, 

235 ) 

236 

237 # Extract new node coordinates 

238 V_tmp = dolfinx.fem.functionspace(mesh, new_c_el) 

239 gdim = mesh.geometry.dim 

240 x = V_tmp.tabulate_dof_coordinates()[:, :gdim] 

241 

242 # Create new geoemtry 

243 geom_imap = V_tmp.dofmap.index_map 

244 geom_dofmap = V_tmp.dofmap.list 

245 num_nodes_local = geom_imap.size_local + geom_imap.num_ghosts 

246 original_input_indices = geom_imap.local_to_global(np.arange(num_nodes_local, dtype=np.int32)) 

247 coordinate_element = dolfinx.fem.coordinate_element( 

248 mesh.topology.cell_type, coordinate_element_degree, lvar, dtype=mesh.geometry.x.dtype 

249 ) 

250 # Could use create_geometry here when things are fixed 

251 geom = dolfinx.mesh.Geometry( 

252 type(mesh.geometry._cpp_object)( 

253 geom_imap, geom_dofmap, coordinate_element._cpp_object, x, original_input_indices 

254 ) 

255 ) 

256 

257 # Create new mesh 

258 new_top = mesh.topology 

259 cpp_mesh = type(mesh._cpp_object)(mesh.comm, new_top._cpp_object, geom._cpp_object) 

260 return dolfinx.mesh.Mesh(cpp_mesh, ufl.Mesh(new_c_el))