Coverage for  / dolfinx-env / lib / python3.12 / site-packages / io4dolfinx / backends / pyvista / backend.py: 78%

190 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-26 18:16 +0000

1""" 

2Module that uses pyvista to import grids. 

3""" 

4 

5from typing import Any 

6 

7import numpy as np 

8import numpy.typing as npt 

9 

10try: 

11 import pyvista 

12except ImportError: 

13 raise ModuleNotFoundError("The PyVista-backend requires pyvista") 

14from pathlib import Path 

15 

16from mpi4py import MPI 

17 

18import basix 

19import dolfinx 

20 

21from io4dolfinx.structures import ArrayData, FunctionData, MeshData, MeshTagsData, ReadMeshData 

22from io4dolfinx.utils import check_file_exists 

23 

24from .. import FileMode, ReadMode 

25 

26# Cell types can be found at 

27# https://vtk.org/doc/nightly/html/vtkCellType_8h_source.html 

28_first_order_vtk = { 

29 1: "point", 

30 3: "interval", 

31 5: "triangle", 

32 9: "quadrilateral", 

33 10: "tetrahedron", 

34 12: "hexahedron", 

35} 

36 

37_arbitrary_lagrange_vtk = { 

38 68: "interval", 

39 69: "triangle", 

40 70: "quadrilateral", 

41 71: "tetrahedron", 

42 72: "hexahedron", 

43 73: "prism", 

44 74: "pyramid", 

45} 

46 

47 

48read_mode = ReadMode.serial 

49 

50 

51def _cell_degree(ct: str, num_nodes: int) -> int: 

52 if ct == "point": 

53 return 1 

54 elif ct == "interval": 

55 return int(num_nodes - 1) 

56 elif ct == "triangle": 

57 n = (np.sqrt(1 + 8 * num_nodes) - 1) / 2 

58 if 2 * num_nodes != n * (n + 1): 

59 raise ValueError(f"Unknown triangle layout. Number of nodes: {num_nodes}") 

60 return int(n - 1) 

61 elif ct == "tetrahedron": 

62 n = 0 

63 while n * (n + 1) * (n + 2) < 6 * num_nodes: 

64 n += 1 

65 if n * (n + 1) * (n + 2) != 6 * num_nodes: 

66 raise ValueError(f"Unknown tetrahedron layout. Number of nodes: {num_nodes}") 

67 return int(n - 1) 

68 

69 elif ct == "quadrilateral": 

70 n = np.sqrt(num_nodes) 

71 if num_nodes != n * n: 

72 raise ValueError(f"Unknown quadrilateral layout. Number of nodes: {num_nodes}") 

73 return int(n - 1) 

74 elif ct == "hexahedron": 

75 n = np.cbrt(num_nodes) 

76 if num_nodes != n * n * n: 

77 raise ValueError(f"Unknown hexahedron layout. Number of nodes: {num_nodes}") 

78 return int(n - 1) 

79 elif ct == "prism": 

80 if num_nodes == 6: 

81 return 1 

82 elif num_nodes == 15: 

83 return 2 

84 else: 

85 raise ValueError(f"Unknown prism layout. Number of nodes: {num_nodes}") 

86 elif ct == "pyramid": 

87 if num_nodes == 5: 

88 return 1 

89 elif num_nodes == 13: 

90 return 2 

91 else: 

92 raise ValueError(f"Unknown pyramid layout. Number of nodes: {num_nodes}") 

93 else: 

94 raise ValueError(f"Unknown cell type {ct} with {num_nodes=}.") 

95 

96 

97def get_default_backend_args(arguments: dict[str, Any] | None) -> dict[str, Any]: 

98 """Get default backend arguments given a set of input arguments. 

99 

100 Args: 

101 arguments: Input backend arguments 

102 

103 Returns: 

104 Updated backend arguments 

105 """ 

106 args = arguments or {} 

107 return args 

108 

109 

110def read_mesh_data( 

111 filename: Path | str, 

112 comm: MPI.Intracomm, 

113 time: str | float | None = None, 

114 read_from_partition: bool = False, 

115 backend_args: dict[str, Any] | None = None, 

116) -> ReadMeshData: 

117 """Read mesh data from file. 

118 

119 Args: 

120 filename: Path to file to read from 

121 comm: MPI communicator used in storage 

122 time: Time stamp associated with the mesh to read 

123 read_from_partition: Whether to read partition information 

124 backend_args: Arguments to backend 

125 

126 Returns: 

127 Internal data structure for the mesh data read from file 

128 """ 

129 backend_args = get_default_backend_args(backend_args) 

130 check_file_exists(filename) 

131 if read_from_partition: 

132 raise RuntimeError("Cannot read partition data with Pyvista") 

133 cells: npt.NDArray[np.int64] 

134 geom: npt.NDArray[np.float64 | np.float32] 

135 if comm.rank == 0: 

136 in_data = pyvista.read(filename) 

137 if isinstance(in_data, pyvista.UnstructuredGrid): 

138 grid = in_data 

139 elif isinstance(in_data, pyvista.core.composite.MultiBlock): 

140 # To handle multiblock like pvd 

141 if hasattr(pyvista, "_VTK_SNAKE_CASE_STATE"): 

142 pyvista._VTK_SNAKE_CASE_STATE = "allow" 

143 else: 

144 # Compatibility with 0.47 

145 pyvista.core.vtk_snake_case._state = "allow" 

146 number_of_blocks = in_data.number_of_blocks 

147 assert number_of_blocks == 1 

148 b0 = in_data.get_block(0) 

149 assert isinstance(b0, pyvista.UnstructuredGrid) 

150 grid = b0 

151 else: 

152 raise RuntimeError(f"Unknown data type {type(in_data)}") 

153 geom = grid.points 

154 num_cells_global = grid.number_of_cells 

155 cells = grid.cells.reshape(num_cells_global, -1).astype(np.int64) 

156 nodes_per_cell_type = cells[:, 0] 

157 assert np.allclose(nodes_per_cell_type, nodes_per_cell_type[0]), "Single celltype support" 

158 cells = cells[:, 1:].astype(np.int64) 

159 cell_types = grid.celltypes 

160 assert len(np.unique(cell_types)) == 1 

161 if (cell_type := cell_types[0]) in _first_order_vtk.keys(): 

162 cell_type = _first_order_vtk[cell_type] 

163 order = 1 

164 elif cell_type in _arbitrary_lagrange_vtk.keys(): 

165 cell_type = _arbitrary_lagrange_vtk[cell_type] 

166 order = _cell_degree(cell_type, cells.shape[1]) 

167 perm = dolfinx.cpp.io.perm_vtk(dolfinx.mesh.to_type(cell_type), cells.shape[1]) 

168 cells = cells[:, perm] 

169 lvar = int(basix.LagrangeVariant.equispaced) 

170 gtype = backend_args.get("dtype", geom.dtype) 

171 order, lvar, nodes_per_cell, cell_type, gtype, gdim = comm.bcast( 

172 (order, lvar, cells.shape[1], cell_type, gtype, geom.shape[1]), root=0 

173 ) 

174 else: 

175 order, lvar, nodes_per_cell, cell_type, gtype, gdim = comm.bcast(None, root=0) 

176 geom = np.zeros((0, gdim), dtype=gtype) 

177 cells = np.zeros((0, nodes_per_cell), dtype=np.int64) 

178 

179 return ReadMeshData( 

180 cells=cells, cell_type=cell_type, x=geom.astype(gtype), lvar=lvar, degree=order 

181 ) 

182 

183 

184def read_point_data( 

185 filename: Path | str, 

186 name: str, 

187 comm: MPI.Intracomm, 

188 time: float | str | None, 

189 backend_args: dict[str, Any] | None, 

190) -> tuple[np.ndarray, int]: 

191 """Read data from the nodes of a mesh. 

192 

193 Args: 

194 filename: Path to file 

195 name: Name of point data 

196 comm: Communicator to launch IO on. 

197 time: The time stamp 

198 backend_args: The backend arguments 

199 

200 Returns: 

201 Data local to process (contiguous, no mpi comm) and local start range 

202 """ 

203 dataset: np.ndarray 

204 if MPI.COMM_WORLD.rank == 0: 

205 in_data = pyvista.read(filename) 

206 if isinstance(in_data, pyvista.UnstructuredGrid): 

207 grid = in_data 

208 elif isinstance(in_data, pyvista.core.composite.MultiBlock): 

209 # To handle multiblock like pvd 

210 if hasattr(pyvista, "_VTK_SNAKE_CASE_STATE"): 

211 pyvista._VTK_SNAKE_CASE_STATE = "allow" 

212 else: 

213 # Compatibility with 0.47 

214 pyvista.core.vtk_snake_case._state = "allow" 

215 number_of_blocks = in_data.number_of_blocks 

216 assert number_of_blocks == 1 

217 b0 = in_data.get_block(0) 

218 assert isinstance(b0, pyvista.UnstructuredGrid) 

219 grid = b0 

220 

221 dataset = grid.point_data[name] 

222 if len(dataset.shape) == 1: 

223 num_components = 1 

224 dataset = dataset.reshape(-1, num_components) 

225 else: 

226 num_components = dataset.shape[1] 

227 if np.issubdtype(dataset.dtype, np.integer): 

228 gtype = grid.points.dtype 

229 dataset = dataset.astype(gtype) 

230 else: 

231 gtype = dataset.dtype 

232 num_components, gtype = comm.bcast((num_components, gtype), root=0) 

233 local_range_start = 0 

234 else: 

235 num_components, gtype = comm.bcast(None, root=0) 

236 dataset = np.zeros((0, num_components), dtype=gtype) 

237 local_range_start = 0 

238 

239 return dataset, int(local_range_start) 

240 

241 

242def read_cell_data( 

243 filename: Path | str, 

244 name: str, 

245 comm: MPI.Intracomm, 

246 time: str | float | None, 

247 backend_args: dict[str, Any] | None, 

248) -> tuple[npt.NDArray[np.int64], np.ndarray]: 

249 dataset: np.ndarray 

250 topology: np.ndarray 

251 if MPI.COMM_WORLD.rank == 0: 

252 in_data = pyvista.read(filename) 

253 if isinstance(in_data, pyvista.UnstructuredGrid): 

254 grid = in_data 

255 elif isinstance(in_data, pyvista.core.composite.MultiBlock): 

256 # To handle multiblock like pvd 

257 if hasattr(pyvista, "_VTK_SNAKE_CASE_STATE"): 

258 pyvista._VTK_SNAKE_CASE_STATE = "allow" 

259 else: 

260 # Compatibility with 0.47 

261 pyvista.core.vtk_snake_case._state = "allow" 

262 number_of_blocks = in_data.number_of_blocks 

263 assert number_of_blocks == 1 

264 b0 = in_data.get_block(0) 

265 assert isinstance(b0, pyvista.UnstructuredGrid) 

266 grid = b0 

267 

268 dataset = grid.cell_data[name] 

269 if len(dataset.shape) == 1: 

270 num_components = 1 

271 dataset = dataset.reshape(-1, num_components) 

272 else: 

273 num_components = dataset.shape[1] 

274 

275 if np.issubdtype(dataset.dtype, np.integer): 

276 gtype = in_data.points.dtype 

277 dataset = dataset.astype(gtype) 

278 else: 

279 gtype = dataset.dtype 

280 num_components, gtype = comm.bcast((num_components, gtype), root=0) 

281 else: 

282 num_components, gtype = comm.bcast(None, root=0) 

283 dataset = np.zeros((0, num_components), dtype=gtype) 

284 _time = float(time) if time is not None else None 

285 topology = read_mesh_data(filename, comm, _time, False, backend_args=None).cells 

286 return topology, dataset 

287 

288 

289def write_attributes( 

290 filename: Path | str, 

291 comm: MPI.Intracomm, 

292 name: str, 

293 attributes: dict[str, np.ndarray], 

294 backend_args: dict[str, Any] | None, 

295): 

296 """Write attributes to file. 

297 

298 Args: 

299 filename: Path to file to write to 

300 comm: MPI communicator used in storage 

301 name: Name of the attribute group 

302 attributes: Dictionary of attributes to write 

303 backend_args: Arguments to backend 

304 """ 

305 raise NotImplementedError("The Pyvista backend cannot write attributes.") 

306 

307 

308def read_attributes( 

309 filename: Path | str, 

310 comm: MPI.Intracomm, 

311 name: str, 

312 backend_args: dict[str, Any] | None, 

313) -> dict[str, Any]: 

314 """Read attributes from file. 

315 

316 Args: 

317 filename: Path to file to read from 

318 comm: MPI communicator used in storage 

319 name: Name of the attribute group 

320 backend_args: Arguments to backend 

321 

322 Returns: 

323 Dictionary of attributes read from file 

324 """ 

325 raise NotImplementedError("The Pyvista backend cannot read attributes.") 

326 

327 

328def read_timestamps( 

329 filename: Path | str, 

330 comm: MPI.Intracomm, 

331 function_name: str, 

332 backend_args: dict[str, Any] | None, 

333) -> npt.NDArray[np.float64 | str]: # type: ignore[type-var] 

334 """Read timestamps from file. 

335 

336 Args: 

337 filename: Path to file to read from 

338 comm: MPI communicator used in storage 

339 function_name: Name of the function to read timestamps for 

340 backend_args: Arguments to backend 

341 

342 Returns: 

343 Numpy array of timestamps read from file 

344 """ 

345 raise NotImplementedError("The Pyvista backend cannot read timestamps.") 

346 

347 

348def read_function_names( 

349 filename: Path | str, comm: MPI.Intracomm, backend_args: dict[str, Any] | None 

350) -> list[str]: 

351 """Read all function names from a file. 

352 

353 Args: 

354 filename: Path to file 

355 comm: MPI communicator to launch IO on. 

356 backend_args: Arguments to backend 

357 

358 Returns: 

359 A list of function names. 

360 """ 

361 in_data = pyvista.read(filename) 

362 if isinstance(in_data, pyvista.UnstructuredGrid): 

363 grid = in_data 

364 elif isinstance(in_data, pyvista.core.composite.MultiBlock): 

365 # To handle multiblock like pvd 

366 if hasattr(pyvista, "_VTK_SNAKE_CASE_STATE"): 

367 pyvista._VTK_SNAKE_CASE_STATE = "allow" 

368 else: 

369 # Compatibility with 0.47 

370 pyvista.core.vtk_snake_case._state = "allow" 

371 number_of_blocks = in_data.number_of_blocks 

372 assert number_of_blocks == 1 

373 b0 = in_data.get_block(0) 

374 assert isinstance(b0, pyvista.UnstructuredGrid) 

375 grid = b0 

376 

377 point_data = list(grid.point_data.keys()) 

378 cell_data = list(grid.cell_data.keys()) 

379 return point_data + cell_data 

380 

381 

382def write_mesh( 

383 filename: Path | str, 

384 comm: MPI.Intracomm, 

385 mesh: MeshData, 

386 backend_args: dict[str, Any] | None, 

387 mode: FileMode, 

388 time: float, 

389): 

390 """ 

391 Write a mesh to file. 

392 

393 Args: 

394 comm: MPI communicator used in storage 

395 mesh: Internal data structure for the mesh data to save to file 

396 filename: Path to file to write to 

397 backend_args: Arguments to backend 

398 mode: File-mode to store the mesh 

399 time: Time stamp associated with the mesh 

400 """ 

401 raise NotImplementedError("The Pyvista backend cannot write meshes.") 

402 

403 

404def write_meshtags( 

405 filename: str | Path, 

406 comm: MPI.Intracomm, 

407 data: MeshTagsData, 

408 backend_args: dict[str, Any] | None, 

409): 

410 """Write mesh tags to file. 

411 

412 Args: 

413 filename: Path to file to write to 

414 comm: MPI communicator used in storage 

415 data: Internal data structure for the mesh tags to save to file 

416 backend_args: Arguments to backend 

417 """ 

418 raise NotImplementedError("The Pyvista backend cannot write meshtags.") 

419 

420 

421def read_meshtags_data( 

422 filename: str | Path, 

423 comm: MPI.Intracomm, 

424 name: str, 

425 backend_args: dict[str, Any] | None, 

426) -> MeshTagsData: 

427 """Read mesh tags from file. 

428 

429 Args: 

430 filename: Path to file to read from 

431 comm: MPI communicator used in storage 

432 name: Name of the mesh tags to read 

433 backend_args: Arguments to backend 

434 

435 Returns: 

436 Internal data structure for the mesh tags read from file 

437 """ 

438 raise NotImplementedError("The Pyvista backend cannot read meshtags.") 

439 

440 

441def read_dofmap( 

442 filename: str | Path, 

443 comm: MPI.Intracomm, 

444 name: str, 

445 backend_args: dict[str, Any] | None, 

446) -> dolfinx.graph.AdjacencyList: 

447 """Read the dofmap of a function with a given name. 

448 

449 Args: 

450 filename: Path to file to read from 

451 comm: MPI communicator used in storage 

452 name: Name of the function to read the dofmap for 

453 backend_args: Arguments to backend 

454 

455 Returns: 

456 Dofmap as an AdjacencyList 

457 """ 

458 raise NotImplementedError("The Pyvista backend cannot make checkpoints.") 

459 

460 

461def read_dofs( 

462 filename: str | Path, 

463 comm: MPI.Intracomm, 

464 name: str, 

465 time: float, 

466 backend_args: dict[str, Any] | None, 

467) -> tuple[npt.NDArray[np.float32 | np.float64 | np.complex64 | np.complex128], int]: 

468 """Read the dofs (values) of a function with a given name from a given timestep. 

469 

470 Args: 

471 filename: Path to file to read from 

472 comm: MPI communicator used in storage 

473 name: Name of the function to read the dofs for 

474 time: Time stamp associated with the function to read 

475 backend_args: Arguments to backend 

476 

477 Returns: 

478 Contiguous sequence of degrees of freedom (with respect to input data) 

479 and the global starting point on the process. 

480 Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc. 

481 """ 

482 raise NotImplementedError("The Pyvista backend cannot make checkpoints.") 

483 

484 

485def read_cell_perms( 

486 comm: MPI.Intracomm, filename: Path | str, backend_args: dict[str, Any] | None 

487) -> npt.NDArray[np.uint32]: 

488 """ 

489 Read cell permutation from file with given communicator, 

490 Split in continuous chunks based on number of cells in the input data. 

491 

492 Args: 

493 comm: MPI communicator used in storage 

494 filename: Path to file to read from 

495 backend_args: Arguments to backend 

496 

497 Returns: 

498 Contiguous sequence of permutations (with respect to input data) 

499 Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc. 

500 """ 

501 raise NotImplementedError("The Pyvista backend cannot make checkpoints.") 

502 

503 

504def write_function( 

505 filename: Path, 

506 comm: MPI.Intracomm, 

507 u: FunctionData, 

508 time: float, 

509 mode: FileMode, 

510 backend_args: dict[str, Any] | None, 

511): 

512 """ 

513 Write a function to file. 

514 

515 Args: 

516 comm: MPI communicator used in storage 

517 u: Internal data structure for the function data to save to file 

518 filename: Path to file to write to 

519 time: Time stamp associated with function 

520 mode: File-mode to store the function 

521 backend_args: Arguments to backend 

522 """ 

523 raise NotImplementedError("The Pyvista backend cannot make checkpoints.") 

524 

525 

526def read_legacy_mesh( 

527 filename: Path | str, comm: MPI.Intracomm, group: str 

528) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.floating], str | None]: 

529 """Read in the mesh topology, geometry and (optionally) cell type from a 

530 legacy DOLFIN HDF5-file. 

531 

532 Args: 

533 filename: Path to file to read from 

534 comm: MPI communicator used in storage 

535 group: Group in HDF5 file where mesh is stored 

536 

537 Returns: 

538 Tuple containing: 

539 - Topology as a (num_cells, num_vertices_per_cell) array of global vertex indices 

540 - Geometry as a (num_vertices, geometric_dimension) array of vertex coordinates 

541 - Cell type as a string (e.g. "tetrahedron") or None if not found 

542 """ 

543 raise NotImplementedError("The Pyvista backend cannot read legacy DOLFIN meshes.") 

544 

545 

546def snapshot_checkpoint( 

547 filename: Path | str, 

548 mode: FileMode, 

549 u: dolfinx.fem.Function, 

550 backend_args: dict[str, Any] | None, 

551): 

552 """Create a snapshot checkpoint of a dolfinx function. 

553 

554 Args: 

555 filename: Path to file to read from 

556 mode: File-mode to store the function 

557 u: dolfinx function to create a snapshot checkpoint for 

558 backend_args: Arguments to backend 

559 """ 

560 raise NotImplementedError("The Pyvista backend cannot make checkpoints.") 

561 

562 

563def read_hdf5_array( 

564 comm: MPI.Intracomm, 

565 filename: Path | str, 

566 group: str, 

567 backend_args: dict[str, Any] | None, 

568) -> tuple[np.ndarray, int]: 

569 """Read an array from an HDF5 file. 

570 

571 Args: 

572 comm: MPI communicator used in storage 

573 filename: Path to file to read from 

574 group: Group in HDF5 file where array is stored 

575 backend_args: Arguments to backend 

576 

577 Returns: 

578 Tuple containing: 

579 - Numpy array read from file 

580 - Global starting point on the process. 

581 Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc. 

582 """ 

583 raise NotImplementedError("The Pyvista backend cannot read HDF5 arrays") 

584 

585 

586def write_data( 

587 filename: Path | str, 

588 array_data: ArrayData, 

589 comm: MPI.Intracomm, 

590 time: str | float | None, 

591 mode: FileMode, 

592 backend_args: dict[str, Any] | None, 

593): 

594 """Write a 2D-array to file (distributed across proceses with MPI). 

595 

596 Args: 

597 filename: Path to file 

598 array_data: Data to write to file 

599 comm: MPI communicator to open the file with 

600 time: Time stamp 

601 mode: Append or write 

602 backend_args: The backend arguments 

603 """ 

604 raise NotImplementedError("The pyvista backend does not support writing point data")