Coverage for  / dolfinx-env / lib / python3.12 / site-packages / io4dolfinx / backends / h5py / backend.py: 89%

356 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-26 18:16 +0000

1""" 

2H5py interface to io4dolfinx 

3 

4SPDX License identifier: MIT 

5 

6Copyright: Jørgen S. Dokken, Henrik N.T. Finsberg, Simula Research Laboratory 

7""" 

8 

9import contextlib 

10from pathlib import Path 

11from typing import Any 

12 

13from mpi4py import MPI 

14 

15import dolfinx 

16import h5py 

17import numpy as np 

18import numpy.typing as npt 

19from dolfinx.graph import adjacencylist 

20 

21from ...structures import ArrayData, FunctionData, MeshData, MeshTagsData, ReadMeshData 

22from ...utils import check_file_exists, compute_local_range 

23from .. import FileMode, ReadMode 

24 

25read_mode = ReadMode.parallel 

26 

27# try: 

28# except ModuleNotFoundError: 

29# raise ModuleNotFoundError("This backend requires h5py to be installed.") 

30 

31 

32@contextlib.contextmanager 

33def h5pyfile(h5name, filemode="r", force_serial: bool = False, comm=None): 

34 """Context manager for opening an HDF5 file with h5py. 

35 

36 Args: 

37 h5name: The name of the HDF5 file. 

38 filemode: The file mode. 

39 force_serial: Force serial access to the file. 

40 comm: The MPI communicator 

41 

42 """ 

43 if comm is None: 

44 comm = MPI.COMM_WORLD 

45 

46 if h5py.h5.get_config().mpi and not force_serial: 

47 h5file = h5py.File(h5name, filemode, driver="mpio", comm=comm) 

48 else: 

49 if comm.size > 1 and not force_serial: 

50 raise ValueError( 

51 f"h5py is not installed with MPI support, while using {comm.size} processes.", 

52 "If you really want to do this, turn on the `force_serial` flag.", 

53 ) 

54 h5file = h5py.File(h5name, filemode) 

55 

56 try: 

57 yield h5file 

58 finally: 

59 if h5file.id: 

60 h5file.close() 

61 

62 

63def get_default_backend_args(arguments: dict[str, Any] | None) -> dict[str, Any]: 

64 args = arguments or {"legacy": False} # If meshtags is read from legacy 

65 return args 

66 

67 

68def convert_file_mode(mode: FileMode) -> str: 

69 match mode: 

70 case FileMode.append: 

71 return "a" 

72 case FileMode.read: 

73 return "r" 

74 case FileMode.write: 

75 return "w" 

76 case _: 

77 raise NotImplementedError(f"File mode {mode} not implemented") 

78 

79 

80def write_attributes( 

81 filename: Path | str, 

82 comm: MPI.Intracomm, 

83 name: str, 

84 attributes: dict[str, np.ndarray], 

85 backend_args: dict[str, Any] | None = None, 

86): 

87 """Write attributes to file using H5PY. 

88 

89 Args: 

90 filename: Path to file to write to 

91 comm: MPI communicator used in storage 

92 name: Name of the attributes 

93 attributes: Dictionary of attributes to write to file 

94 engine: ADIOS2 engine to use 

95 """ 

96 filemode = "a" if Path(filename).exists() else "w" 

97 with h5pyfile(filename, filemode=filemode, comm=comm, force_serial=False) as h5file: 

98 if name not in h5file.keys(): 

99 h5file.create_group(name, track_order=True) 

100 group = h5file[name] 

101 

102 for key, val in attributes.items(): 

103 group.attrs[key] = val 

104 

105 

106def read_attributes( 

107 filename: Path | str, 

108 comm: MPI.Intracomm, 

109 name: str, 

110 backend_args: dict[str, Any] | None = None, 

111) -> dict[str, Any]: 

112 """Read attributes from file using H5PY. 

113 

114 Args: 

115 filename: Path to file to read from 

116 comm: MPI communicator used in storage 

117 name: Name of the attributes 

118 Returns: 

119 The attributes 

120 """ 

121 check_file_exists(filename) 

122 output_attrs = {} 

123 with h5pyfile(filename, filemode="r", comm=comm, force_serial=False) as h5file: 

124 for key, val in h5file[name].attrs.items(): 

125 output_attrs[key] = val 

126 return output_attrs 

127 

128 

129def read_timestamps( 

130 filename: Path | str, 

131 comm: MPI.Intracomm, 

132 function_name: str, 

133 backend_args: dict[str, Any] | None = None, 

134) -> npt.NDArray[np.float64 | str]: # type: ignore[type-var] 

135 """Read time-stamps from a checkpoint file. 

136 

137 Args: 

138 comm: MPI communicator 

139 filename: Path to file 

140 function_name: Name of the function to read time-stamps for 

141 backend_args: Arguments for backend, for instance file type. 

142 

143 Returns: 

144 The time-stamps 

145 """ 

146 check_file_exists(filename) 

147 mesh_name = "mesh" 

148 with h5pyfile(filename, filemode="r", comm=comm, force_serial=False) as h5file: 

149 mesh_directory = h5file[mesh_name] 

150 functions = mesh_directory["functions"] 

151 u = functions[function_name] 

152 timestamps = u.attrs["timestamps"] 

153 return timestamps 

154 

155 

156def write_mesh( 

157 filename: Path | str, 

158 comm: MPI.Intracomm, 

159 mesh: MeshData, 

160 backend_args: dict[str, Any] | None = None, 

161 mode: FileMode = FileMode.write, 

162 time: float = 0.0, 

163): 

164 """Write a mesh to file using H5PY 

165 

166 Args: 

167 comm: MPI communicator used in storage 

168 mesh: Internal data structure for the mesh data to save to file 

169 filename: Path to file to write to. 

170 mode: Mode to use (write or append) 

171 time: Time stamp 

172 """ 

173 backend_args = get_default_backend_args(backend_args) 

174 h5_mode = convert_file_mode(mode) 

175 mesh_name = "mesh" 

176 with h5pyfile(filename, filemode=h5_mode, comm=comm, force_serial=False) as h5file: 

177 if mesh_name in h5file.keys() and h5_mode == "a": 

178 mesh_directory = h5file[mesh_name] 

179 timestamps = mesh_directory.attrs["timestamps"] 

180 if np.isclose(time, timestamps).any(): 

181 raise ValueError("Mesh has already been stored at time={time_stamp}.") 

182 else: 

183 mesh_directory.attrs["timestamps"] = np.append( 

184 mesh_directory.attrs["timestamps"], time 

185 ) 

186 idx = len(mesh_directory.attrs["timestamps"]) - 1 

187 write_topology = False 

188 else: 

189 mesh_directory = h5file.create_group(mesh_name) 

190 mesh_directory.attrs["timestamps"] = np.array([time], dtype=np.float64) 

191 idx = 0 

192 write_topology = True 

193 

194 geometry_group = mesh_directory.create_group(f"{idx}") 

195 

196 # Write geometry data 

197 gdim = mesh.local_geometry.shape[1] 

198 geometry_dataset = geometry_group.create_dataset( 

199 "Points", 

200 [mesh.num_nodes_global, gdim], 

201 dtype=mesh.local_geometry.dtype, 

202 ) 

203 geometry_dataset[slice(*mesh.local_geometry_pos), :] = mesh.local_geometry 

204 

205 # Write static partitioning data 

206 if "PartitioningData" not in mesh_directory.keys() and mesh.store_partition: 

207 assert mesh.partition_range is not None 

208 assert mesh.ownership_array is not None 

209 par_dataset = mesh_directory.create_dataset( 

210 "PartitioningData", [mesh.partition_global], dtype=mesh.ownership_array.dtype 

211 ) 

212 par_dataset[slice(*mesh.partition_range)] = mesh.ownership_array 

213 

214 if "PartitioningOffset" not in mesh_directory.keys() and mesh.store_partition: 

215 assert mesh.ownership_offset is not None 

216 par_dataset = mesh_directory.create_dataset( 

217 "PartitioningOffset", [mesh.num_cells_global + 1], dtype=np.int64 

218 ) 

219 par_dataset[mesh.local_topology_pos[0] : mesh.local_topology_pos[1] + 1] = ( 

220 mesh.ownership_offset 

221 ) 

222 

223 if "PartitionProcesses" not in mesh_directory.attrs.keys() and mesh.store_partition: 

224 mesh_directory.attrs["PartitionProcesses"] = mesh.partition_processes 

225 

226 # Write static data 

227 if write_topology: 

228 mesh_directory.attrs["CellType"] = mesh.cell_type 

229 mesh_directory.attrs["Degree"] = mesh.degree 

230 mesh_directory.attrs["LagrangeVariant"] = mesh.lagrange_variant 

231 num_dofs_per_cell = mesh.local_topology.shape[1] 

232 topology_dataset = mesh_directory.create_dataset( 

233 "Topology", [mesh.num_cells_global, num_dofs_per_cell], dtype=np.int64 

234 ) 

235 topology_dataset[slice(*mesh.local_topology_pos), :] = mesh.local_topology 

236 

237 

238def read_mesh_data( 

239 filename: Path | str, 

240 comm: MPI.Intracomm, 

241 time: str | float | None = 0.0, 

242 read_from_partition: bool = False, 

243 backend_args: dict[str, Any] | None = None, 

244) -> ReadMeshData: 

245 """Read mesh data from h5py based checkpoint files. 

246 

247 Args: 

248 filename: Path to input file 

249 comm: The MPI communciator to distribute the mesh over 

250 time: Time stamp associated with mesh 

251 read_from_partition: Read mesh with partition from file 

252 Returns: 

253 The mesh topology, geometry, UFL domain and partition function 

254 """ 

255 

256 backend_args = get_default_backend_args(backend_args) 

257 

258 with h5pyfile(filename, filemode="r", comm=comm, force_serial=False) as h5file: 

259 if "mesh" not in h5file.keys(): 

260 raise KeyError("Could not find mesh in file") 

261 mesh_group = h5file["mesh"] 

262 timestamps = mesh_group.attrs["timestamps"] 

263 assert time is not None 

264 parent_group = np.flatnonzero(np.isclose(timestamps, time)) 

265 if len(parent_group) != 1: 

266 raise RuntimeError( 

267 f"Time step {time} not found in file, available steps is {timestamps}" 

268 ) 

269 time_group = f"{parent_group[0]:d}" 

270 

271 # Get mesh topology (distributed) 

272 topology = mesh_group["Topology"] 

273 local_range = compute_local_range(comm, topology.shape[0]) 

274 mesh_topology = topology[slice(*local_range), :] 

275 

276 cell_type = mesh_group.attrs["CellType"] 

277 lvar = mesh_group.attrs["LagrangeVariant"] 

278 degree = mesh_group.attrs["Degree"] 

279 

280 geometry_group = mesh_group[time_group] 

281 geometry_dataset = geometry_group["Points"] 

282 x_shape = geometry_dataset.shape 

283 

284 geometry_range = compute_local_range(comm, x_shape[0]) 

285 mesh_geometry = geometry_dataset[slice(*geometry_range), :] 

286 

287 # Check validity of partitioning information 

288 if read_from_partition: 

289 if "PartitionProcesses" not in mesh_group.attrs.keys(): 

290 raise KeyError(f"Partitioning information not found in {filename}") 

291 par_keys = ("PartitioningData", "PartitioningOffset") 

292 for key in par_keys: 

293 if key not in mesh_group.keys(): 

294 raise KeyError(f"Partitioning information not found in {filename}") 

295 

296 par_num_procs = mesh_group.attrs["PartitionProcesses"] 

297 if par_num_procs != comm.size: 

298 raise ValueError(f"Number of processes in file ({par_num_procs})!=({comm.size=})") 

299 

300 # First read in offsets based on the number of cells [0, num_cells_local] 

301 par_offsets = mesh_group["PartitioningOffset"][local_range[0] : local_range[1] + 1] 

302 # Then read the data based of offsets 

303 par_data = mesh_group["PartitioningData"][par_offsets[0] : par_offsets[-1]] 

304 # Then make offsets local 

305 par_offsets[:] -= par_offsets[0] 

306 partition_graph = adjacencylist(par_data, par_offsets.astype(np.int32)) 

307 else: 

308 partition_graph = None 

309 

310 return ReadMeshData( 

311 cells=mesh_topology, 

312 cell_type=cell_type, 

313 x=mesh_geometry, 

314 degree=degree, 

315 lvar=lvar, 

316 partition_graph=partition_graph, 

317 ) 

318 

319 

320def write_meshtags( 

321 filename: str | Path, 

322 comm: MPI.Intracomm, 

323 data: MeshTagsData, 

324 backend_args: dict[str, Any] | None = None, 

325): 

326 """Write mesh tags to file. 

327 

328 Args: 

329 filename: Path to file to write to 

330 comm: MPI communicator used in storage 

331 data: Internal data structure for the mesh tags to save to file 

332 backend_args: Arguments to backend 

333 """ 

334 backend_args = get_default_backend_args(backend_args) 

335 

336 with h5pyfile(filename, filemode="a", comm=comm, force_serial=False) as h5file: 

337 if "mesh" not in h5file.keys(): 

338 raise KeyError("Could not find mesh in file") 

339 mesh_group = h5file["mesh"] 

340 if "tags" not in mesh_group.keys(): 

341 tags = mesh_group.create_group("tags") 

342 else: 

343 tags = mesh_group["tags"] 

344 if data.name in tags.keys(): 

345 raise KeyError(f"MeshTags with {data.name=} already exists in this file") 

346 tag = tags.create_group(data.name) 

347 

348 # Add topology 

349 topology = tag.create_dataset( 

350 "Topology", shape=[data.num_entities_global, data.num_dofs_per_entity], dtype=np.int64 

351 ) 

352 assert data.local_start is not None 

353 topology[data.local_start : data.local_start + len(data.indices), :] = data.indices 

354 

355 # Add cell_type attribute 

356 tag.attrs["CellType"] = data.cell_type 

357 

358 # Add values 

359 values = tag.create_dataset( 

360 "Values", shape=[data.num_entities_global], dtype=data.values.dtype 

361 ) 

362 values[data.local_start : data.local_start + len(data.indices)] = data.values 

363 

364 # Add dimension 

365 tag.attrs["dim"] = data.dim 

366 

367 

368def read_meshtags_data( 

369 filename: str | Path, comm: MPI.Intracomm, name: str, backend_args: dict[str, Any] | None = None 

370) -> MeshTagsData: 

371 """Read mesh tags from file. 

372 

373 Args: 

374 filename: Path to file to read from 

375 comm: MPI communicator used in storage 

376 name: Name of the mesh tags to read 

377 backend_args: Arguments to backend. If "legacy_dolfin" is supplied as argument 

378 the HDF5 file is assumed to have been made with DOLFIN 

379 

380 Returns: 

381 Internal data structure for the mesh tags read from file 

382 """ 

383 backend_args = get_default_backend_args(backend_args) 

384 legacy = backend_args["legacy"] 

385 with h5pyfile(filename, filemode="r", comm=comm, force_serial=False) as h5file: 

386 if legacy: 

387 if name not in h5file.keys(): 

388 raise RuntimeError(f"MeshTag {name} not found in {filename}.") 

389 mesh = h5file[name] 

390 topology = mesh["topology"] 

391 cell_type = topology.attrs["celltype"] 

392 if isinstance(cell_type, np.bytes_): 

393 cell_type = cell_type.decode("utf-8") 

394 dim = dolfinx.mesh.cell_dim(dolfinx.mesh.to_type(cell_type)) 

395 values = mesh["values"] 

396 else: 

397 if "mesh" not in h5file.keys(): 

398 raise KeyError("No mesh found") 

399 mesh = h5file["mesh"] 

400 if "tags" not in mesh.keys(): 

401 raise KeyError("Could not find 'tags' in file, are you sure this is a checkpoint?") 

402 tags = mesh["tags"] 

403 if name not in tags.keys(): 

404 raise KeyError(f"Could not find {name} in '/mesh/tags/' in {filename}") 

405 tag = tags[name] 

406 

407 dim = tag.attrs["dim"] 

408 topology = tag["Topology"] 

409 values = tag["Values"] 

410 num_entities_global = topology.shape[0] 

411 topology_range = compute_local_range(comm, num_entities_global) 

412 indices = topology[slice(*topology_range), :].astype(np.int64) 

413 vals = values[slice(*topology_range)].astype(np.int32) 

414 return MeshTagsData(name=name, values=vals, indices=indices, dim=dim) 

415 

416 

417def read_dofmap( 

418 filename: str | Path, comm: MPI.Intracomm, name: str, backend_args: dict[str, Any] | None 

419) -> dolfinx.graph.AdjacencyList: 

420 """Read the dofmap of a function with a given name. 

421 

422 Args: 

423 filename: Path to file to read from 

424 comm: MPI communicator used in storage 

425 name: Name of the function to read the dofmap for 

426 backend_args: Arguments to backend 

427 

428 Returns: 

429 Dofmap as an {py:class}`dolfinx.graph.AdjacencyList` 

430 """ 

431 backend_args = {} if backend_args is None else backend_args 

432 with h5pyfile(filename, filemode="r", comm=comm, force_serial=False) as h5file: 

433 # If dofmap is read with full path, it is passed through backend_args 

434 dofmap_key = backend_args.get("dofmap", None) 

435 if dofmap_key is None: 

436 mesh_name = "mesh" # Prepare for multiple meshes 

437 if mesh_name not in h5file.keys(): 

438 raise KeyError(f"No mesh '{mesh_name}' found in {filename}") 

439 mesh = h5file[mesh_name] 

440 if "functions" not in mesh.keys(): 

441 raise KeyError(f"No functions stored in '{mesh_name}' in {filename}") 

442 functions = mesh["functions"] 

443 if name not in functions.keys(): 

444 raise KeyError( 

445 f"No function with name '{name}' on '{mesh_name}' stored in {filename}" 

446 ) 

447 function = functions[name] 

448 offset_key = "dofmap_offsets" 

449 dofmap_key = "dofmap" 

450 offsets = function[offset_key] 

451 dofmap = function[dofmap_key] 

452 else: 

453 offset_key = backend_args["offsets"] 

454 dofmap = h5file[dofmap_key] 

455 offsets = h5file[offset_key] 

456 

457 num_cells = offsets.shape[0] - 1 

458 local_range = compute_local_range(comm, num_cells) 

459 

460 # First read in offsets based on the number of cells [0, num_cells_local] 

461 glob_offsets = offsets[local_range[0] : local_range[1] + 1].flatten().astype(np.int64) 

462 

463 # Then read the data based of offsets 

464 dofmap_data = dofmap[glob_offsets[0] : glob_offsets[-1]].flatten() 

465 

466 # Then make offsets local 

467 loc_offsets = (glob_offsets - glob_offsets[0]).astype(np.int32) 

468 return adjacencylist(dofmap_data, loc_offsets) 

469 

470 

471def read_dofs( 

472 filename: str | Path, 

473 comm: MPI.Intracomm, 

474 name: str, 

475 time: float, 

476 backend_args: dict[str, Any] | None, 

477) -> tuple[npt.NDArray[np.float32 | np.float64 | np.complex64 | np.complex128], int]: 

478 """Read the dofs (values) of a function with a given name from a given timestep. 

479 

480 Args: 

481 filename: Path to file to read from 

482 comm: MPI communicator used in storage 

483 name: Name of the function to read the dofs for 

484 time: Time stamp associated with the function to read 

485 backend_args: Arguments to backend 

486 

487 Returns: 

488 Contiguous sequence of degrees of freedom (with respect to input data) 

489 and the global starting point on the process. 

490 Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc. 

491 """ 

492 with h5pyfile(filename, filemode="r", comm=comm, force_serial=False) as h5file: 

493 mesh_name = "mesh" # Prepare for multiple meshes 

494 if mesh_name not in h5file.keys(): 

495 raise RuntimeError(f"No mesh '{mesh_name}' found in {filename}") 

496 mesh = h5file[mesh_name] 

497 if "functions" not in mesh.keys(): 

498 raise RuntimeError(f"No functions stored in '{mesh_name}' in {filename}") 

499 functions = mesh["functions"] 

500 if name not in functions.keys(): 

501 raise RuntimeError( 

502 f"No function with name '{name}' on '{mesh_name}' stored in {filename}" 

503 ) 

504 function = functions[name] 

505 timestamps = function.attrs["timestamps"] 

506 idx = np.flatnonzero(np.isclose(timestamps, time)) 

507 if len(idx) != 1: 

508 raise RuntimeError("Could not find {name}(t={time}) on grid {mesh_name} in {filename}.") 

509 u_t = function[f"{idx[0]:d}"] 

510 data_group = u_t["array"] 

511 num_dofs_global = data_group.shape[0] 

512 local_range = compute_local_range(comm, num_dofs_global) 

513 local_array = data_group[slice(*local_range)] 

514 return local_array, local_range[0] 

515 

516 

517def read_cell_perms( 

518 comm: MPI.Intracomm, filename: Path | str, backend_args: dict[str, Any] | None 

519) -> npt.NDArray[np.uint32]: 

520 """ 

521 Read cell permutation from file with given communicator, 

522 Split in continuous chunks based on number of cells in the input data. 

523 

524 Args: 

525 comm: MPI communicator used in storage 

526 filename: Path to file to read from 

527 backend_args: Arguments to backend 

528 

529 Returns: 

530 Contiguous sequence of permutations (with respect to input data) 

531 Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc. 

532 """ 

533 

534 with h5pyfile(filename, filemode="r", comm=comm, force_serial=False) as h5file: 

535 mesh_name = "mesh" # Prepare for multiple meshes 

536 if mesh_name not in h5file.keys(): 

537 raise RuntimeError(f"No mesh '{mesh_name}' found in {filename}") 

538 mesh = h5file[mesh_name] 

539 data_group = mesh["CellPermutations"] 

540 num_cells_global = data_group.shape[0] 

541 local_range = compute_local_range(comm, num_cells_global) 

542 local_array = data_group[slice(*local_range)] 

543 return local_array 

544 

545 

546def write_function( 

547 filename: str | Path, 

548 comm: MPI.Intracomm, 

549 u: FunctionData, 

550 time: float, 

551 mode: FileMode, 

552 backend_args: dict[str, Any] | None = None, 

553): 

554 """Write a function to file. 

555 

556 Args: 

557 comm: MPI communicator used in storage 

558 u: Internal data structure for the function data to save to file 

559 filename: Path to file to write to 

560 time: Time stamp associated with function 

561 mode: File-mode to store the function 

562 backend_args: Arguments to backend 

563 """ 

564 

565 mesh_name = "mesh" # Prepare for multiple meshes 

566 backend_args = get_default_backend_args(backend_args) 

567 h5_mode = convert_file_mode(mode) 

568 with h5pyfile(filename, filemode=h5_mode, comm=comm, force_serial=False) as h5file: 

569 cell_permutations_exist = False 

570 dofmap_exists = False 

571 dofmap_offsets_exists = False 

572 if h5_mode == "a": 

573 if mesh_name not in h5file.keys(): 

574 mesh = h5file.create_group(mesh_name) 

575 else: 

576 mesh = h5file[mesh_name] 

577 

578 cell_permutations_exist = "CellPermutations" in mesh.keys() 

579 

580 if "functions" not in mesh.keys(): 

581 functions = mesh.create_group("functions") 

582 else: 

583 functions = mesh["functions"] 

584 

585 if u.name not in functions.keys(): 

586 function = functions.create_group(u.name) 

587 else: 

588 function = functions[u.name] 

589 

590 dofmap_exists = "dofmap" in function.keys() 

591 dofmap_offsets_exists = "dofmap_offsets" in function.keys() 

592 

593 if not cell_permutations_exist: 

594 cell_perms = mesh.create_dataset( 

595 "CellPermutations", shape=[u.num_cells_global], dtype=np.uint32 

596 ) 

597 cell_perms[slice(*u.local_cell_range)] = u.cell_permutations 

598 

599 if not dofmap_exists: 

600 dofmap = function.create_dataset( 

601 "dofmap", shape=[u.global_dofs_in_dofmap], dtype=np.int64 

602 ) 

603 dofmap[slice(*u.dofmap_range)] = u.dofmap_array 

604 

605 if not dofmap_offsets_exists: 

606 dofmap_offsets = function.create_dataset( 

607 "dofmap_offsets", shape=[u.num_cells_global + 1], dtype=np.int64 

608 ) 

609 dofmap_offsets[u.local_cell_range[0] : u.local_cell_range[1] + 1] = u.dofmap_offsets 

610 

611 # Write timestamp 

612 if "timestamps" in function.attrs.keys(): 

613 timestamps = function.attrs["timestamps"] 

614 if np.isclose(time, timestamps).any(): 

615 raise RuntimeError("FUnction has already been stored at time={time_stamp}.") 

616 else: 

617 function.attrs["timestamps"] = np.append(function.attrs["timestamps"], time) 

618 else: 

619 function.attrs["timestamps"] = np.array([time]) 

620 idx = len(function.attrs["timestamps"]) - 1 

621 

622 data_group = function.create_group(f"{idx:d}") 

623 array = data_group.create_dataset("array", shape=[u.num_dofs_global], dtype=u.values.dtype) 

624 array[slice(*u.dof_range)] = u.values 

625 

626 

627def read_legacy_mesh( 

628 filename: Path | str, comm: MPI.Intracomm, group: str 

629) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.floating], str | None]: 

630 """Read in the mesh topology, geometry and (optionally) cell type from a 

631 legacy DOLFIN HDF5-file. 

632 

633 Args: 

634 filename: Path to file to read from 

635 comm: MPI communicator used in storage 

636 group: Group in HDF5 file where mesh is stored 

637 

638 Returns: 

639 Tuple containing: 

640 - Topology as a (num_cells, num_vertices_per_cell) array of global vertex indices 

641 - Geometry as a (num_vertices, geometric_dimension) array of vertex coordinates 

642 - Cell type as a string (e.g. "tetrahedron") or None if not found 

643 """ 

644 with h5pyfile(filename, filemode="r", comm=comm, force_serial=False) as h5file: 

645 if group not in h5file.keys(): 

646 raise KeyError(f"Could not find {group} in {filename}.") 

647 mesh = h5file[group] 

648 if "topology" not in mesh.keys(): 

649 raise KeyError(f"Could not find '{group}/topology' in {filename}.") 

650 

651 topology = mesh["topology"] 

652 shape = topology.shape 

653 local_range = compute_local_range(comm, shape[0]) 

654 mesh_topology = topology[slice(*local_range)].astype(np.int64) 

655 

656 # Get mesh cell type 

657 cell_type = None 

658 if "celltype" in topology.attrs.keys(): 

659 cell_type = topology.attrs["celltype"] 

660 if isinstance(cell_type, np.bytes_): 

661 cell_type = cell_type.decode("utf-8") 

662 

663 for geometry_key in ["geometry", "coordinates"]: 

664 if geometry_key in mesh.keys(): 

665 break 

666 else: 

667 raise KeyError( 

668 "Could not find geometry in '/mesh/geometry' or '/mesh/coordinates'" 

669 + f" in {filename}." 

670 ) 

671 geometry = mesh[geometry_key] 

672 g_shape = geometry.shape 

673 local_g_range = compute_local_range(comm, g_shape[0]) 

674 mesh_geometry = geometry[slice(*local_g_range)] 

675 

676 return mesh_topology, mesh_geometry, cell_type 

677 

678 

679def read_hdf5_array( 

680 comm: MPI.Intracomm, 

681 filename: Path | str, 

682 group: str, 

683 backend_args: dict[str, Any] | None = None, 

684) -> tuple[np.ndarray, int]: 

685 """Read an array from an HDF5 file. 

686 

687 Args: 

688 comm: MPI communicator used in storage 

689 filename: Path to file to read from 

690 group: Group in HDF5 file where array is stored 

691 backend_args: Arguments to backend 

692 

693 Returns: 

694 Tuple containing: 

695 - Numpy array read from file 

696 - Global starting point on the process. 

697 Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc. 

698 """ 

699 with h5pyfile(filename, filemode="r", comm=comm, force_serial=False) as h5file: 

700 data = h5file[group] 

701 shape = data.shape[0] 

702 local_range = compute_local_range(comm, shape) 

703 out_data = data[slice(*local_range)].flatten() 

704 return out_data, local_range[0] 

705 

706 

707def snapshot_checkpoint( 

708 filename: Path | str, 

709 mode: FileMode, 

710 u: dolfinx.fem.Function, 

711 backend_args: dict[str, Any] | None, 

712): 

713 """Create a snapshot checkpoint of a dolfinx function. 

714 

715 Args: 

716 filename: Path to file to read from 

717 mode: File-mode to store the function 

718 u: dolfinx function to create a snapshot checkpoint for 

719 backend_args: Arguments to backend 

720 """ 

721 comm = u.function_space.mesh.comm 

722 dofmap = u.function_space.dofmap 

723 local_range = np.array(dofmap.index_map.local_range) * dofmap.index_map_bs 

724 num_dofs_local = local_range[1] - local_range[0] 

725 num_dofs_global = dofmap.index_map.size_global * dofmap.index_map_bs 

726 h5mode = convert_file_mode(mode) 

727 if h5mode == "w": 

728 with h5pyfile(filename, filemode=h5mode, comm=comm, force_serial=False) as h5file: 

729 local_dofs = u.x.array[:num_dofs_local].copy() 

730 data = h5file.create_group("snapshot") 

731 dataset = data.create_dataset("dofs", shape=num_dofs_global, dtype=local_dofs.dtype) 

732 dataset[slice(*local_range)] = local_dofs 

733 elif h5mode == "r": 

734 with h5pyfile(filename, filemode=h5mode, comm=comm, force_serial=False) as h5file: 

735 data = h5file["snapshot"]["dofs"] 

736 assert data.shape[0] == num_dofs_global 

737 u.x.array[:num_dofs_local] = data[slice(*local_range)] 

738 u.x.scatter_forward() 

739 

740 

741def read_function_names( 

742 filename: Path | str, comm: MPI.Intracomm, backend_args: dict[str, Any] | None 

743) -> list[str]: 

744 """Read all function names from a file. 

745 

746 Args: 

747 filename: Path to file 

748 comm: MPI communicator to launch IO on. 

749 backend_args: Arguments to backend 

750 

751 Returns: 

752 A list of function names. 

753 """ 

754 check_file_exists(filename) 

755 backend_args = get_default_backend_args(backend_args) 

756 with h5pyfile(filename, filemode="r", comm=comm, force_serial=False) as h5file: 

757 mesh_name = "mesh" # Prepare for multiple meshes 

758 if mesh_name not in h5file.keys(): 

759 raise RuntimeError(f"No mesh '{mesh_name}' found in {filename}") 

760 mesh = h5file[mesh_name] 

761 if "functions" not in mesh.keys(): 

762 return [] 

763 else: 

764 return list(mesh["functions"].keys()) 

765 

766 

767def read_point_data( 

768 filename: Path | str, 

769 name: str, 

770 comm: MPI.Intracomm, 

771 time: float | str | None, 

772 backend_args: dict[str, Any] | None, 

773) -> tuple[np.ndarray, int]: 

774 """Read data from the nodes of a mesh. 

775 

776 Args: 

777 filename: Path to file 

778 name: Name of point data 

779 comm: Communicator to launch IO on. 

780 time: The time stamp 

781 backend_args: The backend arguments 

782 

783 Returns: 

784 Data local to process (contiguous, no mpi comm) and local start range 

785 """ 

786 raise NotImplementedError("The h5py backend cannot read point data.") 

787 

788 

789def read_cell_data( 

790 filename: Path | str, 

791 name: str, 

792 comm: MPI.Intracomm, 

793 time: str | float | None, 

794 backend_args: dict[str, Any] | None, 

795) -> tuple[npt.NDArray[np.int64], np.ndarray]: 

796 """Read data from the cells of a mesh. 

797 

798 Args: 

799 filename: Path to file 

800 name: Name of point data 

801 comm: Communicator to launch IO on. 

802 time: The time stamp 

803 backend_args: The backend arguments 

804 Returns: 

805 A tuple (topology, dofs) where topology contains the 

806 vertex indices of the cells, dofs the degrees of 

807 freedom within that cell. 

808 """ 

809 raise NotImplementedError("The h5py backend does not support reading cell data.") 

810 

811 

812def write_data( 

813 filename: Path | str, 

814 array_data: ArrayData, 

815 comm: MPI.Intracomm, 

816 time: str | float | None, 

817 mode: FileMode, 

818 backend_args: dict[str, Any] | None, 

819): 

820 """Write a 2D-array to file (distributed across proceses with MPI). 

821 

822 Args: 

823 filename: Path to file 

824 array_data: Data to write to file 

825 comm: MPI communicator to open the file with 

826 time: Time-stamp for data. 

827 mode: Append or write 

828 backend_args: The backend arguments 

829 """ 

830 raise NotImplementedError("H5PY has not implemented this yet")