Coverage for  / dolfinx-env / lib / python3.12 / site-packages / io4dolfinx / backends / xdmf / backend.py: 75%

170 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-26 18:16 +0000

1""" 

2Module that uses DOLFINx/H5py to import XDMF files. 

3""" 

4 

5from pathlib import Path 

6from typing import Any 

7from xml.etree import ElementTree 

8 

9from mpi4py import MPI 

10 

11import basix 

12import dolfinx 

13import numpy as np 

14import numpy.typing as npt 

15 

16from io4dolfinx.structures import ArrayData, FunctionData, MeshData, MeshTagsData, ReadMeshData 

17from io4dolfinx.utils import check_file_exists, compute_local_range 

18 

19from .. import FileMode, ReadMode 

20from ..h5py.backend import h5pyfile 

21 

22read_mode = ReadMode.parallel 

23 

24 

25def extract_function_names_and_timesteps(filename: Path | str) -> dict[str, list[str]]: 

26 tree = ElementTree.parse(filename) 

27 root = tree.getroot() 

28 mesh_nodes = root.findall(".//Grid[@CollectionType='Temporal']") 

29 function_names = [] 

30 for mesh in mesh_nodes: 

31 function_names.append(mesh.attrib["Name"]) 

32 

33 time_stamps: dict[str, list[str]] = {name: [] for name in function_names} 

34 for name in function_names: 

35 time_steps = root.findall(f".//Grid[@Name='{name}']") 

36 for time in time_steps: 

37 step = time.find(".//Time") 

38 if step is not None: 

39 val = step.attrib["Value"] 

40 time_stamps[name].append(val) 

41 for name in function_names: 

42 float_steps = np.argsort(np.array(list(set(time_stamps[name])), dtype=np.float64)) 

43 time_stamps[name] = np.array(list(set(time_stamps[name])), dtype=str)[float_steps].tolist() 

44 return time_stamps 

45 

46 

47def get_default_backend_args(arguments: dict[str, Any] | None) -> dict[str, Any]: 

48 """Get default backend arguments given a set of input arguments. 

49 

50 Args: 

51 arguments: Input backend arguments 

52 

53 Returns: 

54 Updated backend arguments 

55 """ 

56 args = arguments or {} 

57 return args 

58 

59 

60def read_mesh_data( 

61 filename: Path | str, 

62 comm: MPI.Intracomm, 

63 time: str | float | None, 

64 read_from_partition: bool, 

65 backend_args: dict[str, Any] | None, 

66) -> ReadMeshData: 

67 """Read mesh data from file. 

68 

69 Args: 

70 filename: Path to file to read from 

71 comm: MPI communicator used in storage 

72 time: Time stamp associated with the mesh to read 

73 read_from_partition: Whether to read partition information 

74 backend_args: Arguments to backend 

75 

76 Returns: 

77 Internal data structure for the mesh data read from file 

78 """ 

79 assert not read_from_partition 

80 check_file_exists(filename) 

81 with dolfinx.io.XDMFFile(comm, filename, "r") as file: 

82 cell_shape, cell_degree = file.read_cell_type() 

83 cells = file.read_topology_data() 

84 x = file.read_geometry_data() 

85 return ReadMeshData( 

86 cells=cells, 

87 cell_type=cell_shape.name, 

88 x=x, 

89 lvar=int(basix.LagrangeVariant.equispaced), 

90 degree=cell_degree, 

91 ) 

92 

93 

94def read_point_data( 

95 filename: Path | str, 

96 name: str, 

97 comm: MPI.Intracomm, 

98 time: float | str | None, 

99 backend_args: dict[str, Any] | None, 

100) -> tuple[np.ndarray, int]: 

101 """Read data from the nodes of a mesh. 

102 

103 Args: 

104 filename: Path to file 

105 name: Name of point data 

106 comm: Communicator to launch IO on. 

107 time: The time stamp 

108 backend_args: The backend arguments 

109 

110 Returns: 

111 Data local to process (contiguous, no mpi comm) and local start range 

112 """ 

113 # Find function with name u in xml tree 

114 check_file_exists(filename) 

115 filename = Path(filename) 

116 

117 tree = ElementTree.parse(filename) 

118 root = tree.getroot() 

119 backend_args = get_default_backend_args(backend_args) 

120 if time is not None: 

121 time_steps = root.findall(f".//Grid[@Name='{name}']") 

122 time_found = False 

123 for time_node in time_steps: 

124 step_node = time_node.find(".//Time") 

125 assert isinstance(step_node, ElementTree.Element) 

126 if step_node.attrib["Value"] == time: 

127 time_found = True 

128 break 

129 func_node = time_node.find(f".//Attribute[@Name='{name}']") 

130 if not time_found: 

131 raise RuntimeError(f"Function {name} at time={time} not found in {filename}") 

132 else: 

133 func_node = root.find(f".//Attribute[@Name='{name}']") 

134 assert isinstance(func_node, ElementTree.Element) 

135 data_node = func_node.find(".//DataItem") 

136 assert isinstance(data_node, ElementTree.Element) 

137 global_shape = data_node.attrib["Dimensions"].split(" ") 

138 func_path = data_node.text 

139 assert isinstance(func_path, str) 

140 data_file, data_loc = func_path.split(":") 

141 data_path = filename.parent / data_file 

142 with h5pyfile(data_path, "r", comm=comm) as h5file: 

143 data = h5file[data_loc] 

144 for s1, s2 in zip(data.shape, global_shape, strict=True): 

145 assert int(s1) == int(s2) 

146 lr = compute_local_range(comm, data.shape[0]) 

147 local_range_start = lr[0] 

148 dataset = data[slice(*lr), :] 

149 return dataset, local_range_start 

150 

151 

152def read_attributes( 

153 filename: Path | str, 

154 comm: MPI.Intracomm, 

155 name: str, 

156 backend_args: dict[str, Any] | None, 

157) -> dict[str, Any]: 

158 """Read attributes from file. 

159 

160 Args: 

161 filename: Path to file to read from 

162 comm: MPI communicator used in storage 

163 name: Name of the attribute group 

164 backend_args: Arguments to backend 

165 

166 Returns: 

167 Dictionary of attributes read from file 

168 """ 

169 raise NotImplementedError("The XDMF backend cannot read attributes.") 

170 

171 

172def read_timestamps( 

173 filename: Path | str, 

174 comm: MPI.Intracomm, 

175 function_name: str, 

176 backend_args: dict[str, Any] | None, 

177) -> npt.NDArray[np.float64 | str]: # type: ignore[type-var] 

178 """Read timestamps from file. 

179 

180 Args: 

181 filename: Path to file to read from 

182 comm: MPI communicator used in storage 

183 function_name: Name of the function to read timestamps for 

184 backend_args: Arguments to backend 

185 

186 Returns: 

187 Numpy array of timestamps read from file 

188 """ 

189 tree = ElementTree.parse(filename) 

190 root = tree.getroot() 

191 time_stamps = [] 

192 time_steps = root.findall(f".//Grid[@Name='{function_name}']") 

193 for time in time_steps: 

194 step = time.find(".//Time") 

195 if step is not None: 

196 val = step.attrib["Value"] 

197 time_stamps.append(val) 

198 float_steps = np.argsort(np.array(list(set(time_stamps)), dtype=np.float64)) 

199 return np.array(list(set(time_stamps)), dtype=str)[float_steps].tolist() 

200 

201 

202def write_attributes( 

203 filename: Path | str, 

204 comm: MPI.Intracomm, 

205 name: str, 

206 attributes: dict[str, np.ndarray], 

207 backend_args: dict[str, Any] | None, 

208): 

209 """Write attributes to file. 

210 

211 Args: 

212 filename: Path to file to write to 

213 comm: MPI communicator used in storage 

214 name: Name of the attribute group 

215 attributes: Dictionary of attributes to write 

216 backend_args: Arguments to backend 

217 """ 

218 raise NotImplementedError("The XDMF backend cannot write attributes.") 

219 

220 

221def write_mesh( 

222 filename: Path | str, 

223 comm: MPI.Intracomm, 

224 mesh: MeshData, 

225 backend_args: dict[str, Any] | None, 

226 mode: FileMode, 

227 time: float, 

228): 

229 """ 

230 Write a mesh to file. 

231 

232 Args: 

233 comm: MPI communicator used in storage 

234 mesh: Internal data structure for the mesh data to save to file 

235 filename: Path to file to write to 

236 backend_args: Arguments to backend 

237 mode: File-mode to store the mesh 

238 time: Time stamp associated with the mesh 

239 """ 

240 raise NotImplementedError("The XDMF backend cannot write meshes.") 

241 

242 

243def write_meshtags( 

244 filename: str | Path, 

245 comm: MPI.Intracomm, 

246 data: MeshTagsData, 

247 backend_args: dict[str, Any] | None, 

248): 

249 """Write mesh tags to file. 

250 

251 Args: 

252 filename: Path to file to write to 

253 comm: MPI communicator used in storage 

254 data: Internal data structure for the mesh tags to save to file 

255 backend_args: Arguments to backend 

256 """ 

257 raise NotImplementedError("The XDMF backend cannot write meshtags.") 

258 

259 

260def read_meshtags_data( 

261 filename: str | Path, 

262 comm: MPI.Intracomm, 

263 name: str, 

264 backend_args: dict[str, Any] | None, 

265) -> MeshTagsData: 

266 """Read mesh tags from file. 

267 

268 Args: 

269 filename: Path to file to read from 

270 comm: MPI communicator used in storage 

271 name: Name of the mesh tags to read 

272 backend_args: Arguments to backend 

273 

274 Returns: 

275 Internal data structure for the mesh tags read from file 

276 """ 

277 raise NotImplementedError("The XDMF backend cannot read meshtags.") 

278 

279 

280def read_dofmap( 

281 filename: str | Path, 

282 comm: MPI.Intracomm, 

283 name: str, 

284 backend_args: dict[str, Any] | None, 

285) -> dolfinx.graph.AdjacencyList: 

286 """Read the dofmap of a function with a given name. 

287 

288 Args: 

289 filename: Path to file to read from 

290 comm: MPI communicator used in storage 

291 name: Name of the function to read the dofmap for 

292 backend_args: Arguments to backend 

293 

294 Returns: 

295 Dofmap as an AdjacencyList 

296 """ 

297 raise NotImplementedError("The XDMF backend cannot make checkpoints.") 

298 

299 

300def read_dofs( 

301 filename: str | Path, 

302 comm: MPI.Intracomm, 

303 name: str, 

304 time: float, 

305 backend_args: dict[str, Any] | None, 

306) -> tuple[npt.NDArray[np.float32 | np.float64 | np.complex64 | np.complex128], int]: 

307 """Read the dofs (values) of a function with a given name from a given timestep. 

308 

309 Args: 

310 filename: Path to file to read from 

311 comm: MPI communicator used in storage 

312 name: Name of the function to read the dofs for 

313 time: Time stamp associated with the function to read 

314 backend_args: Arguments to backend 

315 

316 Returns: 

317 Contiguous sequence of degrees of freedom (with respect to input data) 

318 and the global starting point on the process. 

319 Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc. 

320 """ 

321 raise NotImplementedError("The XDMF backend cannot make checkpoints.") 

322 

323 

324def read_cell_perms( 

325 comm: MPI.Intracomm, filename: Path | str, backend_args: dict[str, Any] | None 

326) -> npt.NDArray[np.uint32]: 

327 """ 

328 Read cell permutation from file with given communicator, 

329 Split in continuous chunks based on number of cells in the input data. 

330 

331 Args: 

332 comm: MPI communicator used in storage 

333 filename: Path to file to read from 

334 backend_args: Arguments to backend 

335 

336 Returns: 

337 Contiguous sequence of permutations (with respect to input data) 

338 Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc. 

339 """ 

340 raise NotImplementedError("The XDMF backend cannot make checkpoints.") 

341 

342 

343def write_function( 

344 filename: Path, 

345 comm: MPI.Intracomm, 

346 u: FunctionData, 

347 time: float, 

348 mode: FileMode, 

349 backend_args: dict[str, Any] | None, 

350): 

351 """ 

352 Write a function to file. 

353 

354 Args: 

355 comm: MPI communicator used in storage 

356 u: Internal data structure for the function data to save to file 

357 filename: Path to file to write to 

358 time: Time stamp associated with function 

359 mode: File-mode to store the function 

360 backend_args: Arguments to backend 

361 """ 

362 raise NotImplementedError("The XDMF backend cannot make checkpoints.") 

363 

364 

365def read_legacy_mesh( 

366 filename: Path | str, comm: MPI.Intracomm, group: str 

367) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.floating], str | None]: 

368 """Read in the mesh topology, geometry and (optionally) cell type from a 

369 legacy DOLFIN HDF5-file. 

370 

371 Args: 

372 filename: Path to file to read from 

373 comm: MPI communicator used in storage 

374 group: Group in HDF5 file where mesh is stored 

375 

376 Returns: 

377 Tuple containing: 

378 - Topology as a (num_cells, num_vertices_per_cell) array of global vertex indices 

379 - Geometry as a (num_vertices, geometric_dimension) array of vertex coordinates 

380 - Cell type as a string (e.g. "tetrahedron") or None if not found 

381 """ 

382 raise NotImplementedError("The XDMF backend cannot read legacy DOLFIN meshes.") 

383 

384 

385def snapshot_checkpoint( 

386 filename: Path | str, 

387 mode: FileMode, 

388 u: dolfinx.fem.Function, 

389 backend_args: dict[str, Any] | None, 

390): 

391 """Create a snapshot checkpoint of a dolfinx function. 

392 

393 Args: 

394 filename: Path to file to read from 

395 mode: File-mode to store the function 

396 u: dolfinx function to create a snapshot checkpoint for 

397 backend_args: Arguments to backend 

398 """ 

399 raise NotImplementedError("The XDMF backend cannot make checkpoints.") 

400 

401 

402def read_hdf5_array( 

403 comm: MPI.Intracomm, 

404 filename: Path | str, 

405 group: str, 

406 backend_args: dict[str, Any] | None, 

407) -> tuple[np.ndarray, int]: 

408 """Read an array from an HDF5 file. 

409 

410 Args: 

411 comm: MPI communicator used in storage 

412 filename: Path to file to read from 

413 group: Group in HDF5 file where array is stored 

414 backend_args: Arguments to backend 

415 

416 Returns: 

417 Tuple containing: 

418 - Numpy array read from file 

419 - Global starting point on the process. 

420 Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc. 

421 """ 

422 raise NotImplementedError("The XDMF backend cannot read HDF5 arrays") 

423 

424 

425def read_function_names( 

426 filename: Path | str, comm: MPI.Intracomm, backend_args: dict[str, Any] | None 

427) -> list[str]: 

428 """Read all function names from a file. 

429 

430 Args: 

431 filename: Path to file 

432 comm: MPI communicator to launch IO on. 

433 backend_args: Arguments to backend 

434 

435 Returns: 

436 A list of function names. 

437 """ 

438 # Find all functions in xml tree 

439 check_file_exists(filename) 

440 filename = Path(filename) 

441 

442 tree = ElementTree.parse(filename) 

443 root = tree.getroot() 

444 backend_args = get_default_backend_args(backend_args) 

445 # Functions in checkpoint format 

446 checkpoint_funcs = root.findall(".//Attribute[@ItemType='FiniteElementFunction']") 

447 names = [func.attrib["Name"] for func in checkpoint_funcs] 

448 # Temporal funcs 

449 temporal_funcs = root.findall(".//Grid[@GridType='Collection']") 

450 for func in temporal_funcs: 

451 names.append(func.attrib["Name"]) 

452 return list(set(names)) 

453 

454 

455def read_cell_data( 

456 filename: Path | str, 

457 name: str, 

458 comm: MPI.Intracomm, 

459 time: str | float | None, 

460 backend_args: dict[str, Any] | None, 

461) -> tuple[npt.NDArray[np.int64], np.ndarray]: 

462 """Read data from the cells of a mesh. 

463 

464 Args: 

465 filename: Path to file 

466 name: Name of point data 

467 comm: Communicator to launch IO on. 

468 time: The time stamp 

469 backend_args: The backend arguments 

470 Returns: 

471 A tuple (topology, dofs) where topology contains the 

472 vertex indices of the cells, dofs the degrees of 

473 freedom within that cell. 

474 """ 

475 # Find function with name u in xml tree 

476 check_file_exists(filename) 

477 filename = Path(filename) 

478 

479 tree = ElementTree.parse(filename) 

480 root = tree.getroot() 

481 backend_args = get_default_backend_args(backend_args) 

482 if time is not None: 

483 time_steps = root.findall(f".//Grid[@Name='{name}']") 

484 time_found = False 

485 for time_node in time_steps: 

486 step_node = time_node.find(".//Time") 

487 assert isinstance(step_node, ElementTree.Element) 

488 if np.isclose(float(step_node.attrib["Value"]), time): 

489 time_found = True 

490 break 

491 func_node = time_node.find(f".//Attribute[@Name='{name}']") 

492 if not time_found: 

493 raise RuntimeError(f"Function {name} at time={time} not found in {filename}") 

494 else: 

495 func_node = root.find(f".//Attribute[@Name='{name}']") 

496 assert func_node is not None 

497 if func_node.attrib["ItemType"] == "FiniteElementFunction": 

498 if (family := func_node.attrib["ElementFamily"]) != "DG" or ( 

499 degree := int(func_node.attrib["ElementDegree"]) 

500 ) != 0: 

501 raise ValueError( 

502 f"Cannot read in finite element function ({family}, {degree}) as cell data." 

503 ) 

504 # Get vector sub-element 

505 vec_el = None 

506 for node in func_node.iter(): 

507 comp = node.text 

508 assert comp is not None 

509 if "vector" in comp: 

510 vec_el = node 

511 break 

512 assert vec_el is not None 

513 dof_dimensions = np.array(vec_el.attrib["Dimensions"].split(" "), dtype=np.int32) 

514 vtxt = vec_el.text 

515 assert vtxt is not None 

516 vector_file, vector_h5path = vtxt.split(":") 

517 

518 grid_node = root.find(f".//Attribute[@Name='{name}']/..") 

519 assert grid_node is not None 

520 topology = grid_node.find("./Topology") 

521 assert topology is not None 

522 ttext = topology[0].text 

523 assert ttext is not None 

524 topology_file, topology_h5path = ttext.split(":") 

525 

526 with h5pyfile(filename.parent / vector_file, "r", comm=comm) as h5_mesh: 

527 data_loc = h5_mesh[vector_h5path] 

528 data_shape = data_loc.shape[0] 

529 assert int(np.prod(data_shape)) == int(np.prod(dof_dimensions)) 

530 local_range = compute_local_range(comm, data_shape) 

531 vec_dofs = data_loc[slice(*local_range)] 

532 

533 with h5pyfile(filename.parent / topology_file, "r", comm=comm) as h5_top: 

534 data_loc = h5_top[topology_h5path] 

535 top_data_shape = data_loc.shape[0] 

536 assert dof_dimensions[0] == top_data_shape 

537 local_range = compute_local_range(comm, data_shape) 

538 top_dofs = data_loc[slice(*local_range)].astype(np.int64) 

539 

540 return top_dofs, vec_dofs 

541 else: 

542 raise NotImplementedError("Not implemented yet.") 

543 

544 

545def write_data( 

546 filename: Path | str, 

547 point_data: ArrayData, 

548 comm: MPI.Intracomm, 

549 time: str | float | None, 

550 mode: FileMode, 

551 backend_args: dict[str, Any] | None, 

552): 

553 """Write a 2D-array to file (distributed across proceses with MPI). 

554 

555 

556 Args: 

557 filename: Path to file 

558 point_data: Data to write to file 

559 time: Time stamp 

560 mode: Append or write 

561 backend_args: The backend arguments 

562 """ 

563 raise NotImplementedError("XDMF has not implemented this yet")