Coverage for  / dolfinx-env / lib / python3.12 / site-packages / io4dolfinx / backends / pyvista / backend.py: 77%

194 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 11:21 +0000

1""" 

2Module that uses pyvista to import grids. 

3""" 

4 

5from typing import Any 

6 

7import numpy as np 

8import numpy.typing as npt 

9 

10try: 

11 import pyvista 

12except ImportError: 

13 raise ModuleNotFoundError("The PyVista-backend requires pyvista") 

14from pathlib import Path 

15 

16from mpi4py import MPI 

17 

18import basix 

19import dolfinx 

20 

21from io4dolfinx.structures import ArrayData, FunctionData, MeshData, MeshTagsData, ReadMeshData 

22from io4dolfinx.utils import check_file_exists 

23 

24from .. import FileMode, ReadMode 

25 

26# Cell types can be found at 

27# https://vtk.org/doc/nightly/html/vtkCellType_8h_source.html 

28_first_order_vtk = { 

29 1: "point", 

30 3: "interval", 

31 5: "triangle", 

32 9: "quadrilateral", 

33 10: "tetrahedron", 

34 12: "hexahedron", 

35} 

36 

37_arbitrary_lagrange_vtk = { 

38 68: "interval", 

39 69: "triangle", 

40 70: "quadrilateral", 

41 71: "tetrahedron", 

42 72: "hexahedron", 

43 73: "prism", 

44 74: "pyramid", 

45} 

46 

47_quadratric_vtk = { 

48 21: "interval", 

49 22: "triangle", 

50 23: "quadrilateral", 

51 24: "tetrahedron", 

52 25: "hexahedron", 

53} 

54 

55 

56read_mode = ReadMode.serial 

57 

58 

59def _cell_degree(ct: str, num_nodes: int) -> int: 

60 if ct == "point": 

61 return 1 

62 elif ct == "interval": 

63 return int(num_nodes - 1) 

64 elif ct == "triangle": 

65 n = (np.sqrt(1 + 8 * num_nodes) - 1) / 2 

66 if 2 * num_nodes != n * (n + 1): 

67 raise ValueError(f"Unknown triangle layout. Number of nodes: {num_nodes}") 

68 return int(n - 1) 

69 elif ct == "tetrahedron": 

70 n = 0 

71 while n * (n + 1) * (n + 2) < 6 * num_nodes: 

72 n += 1 

73 if n * (n + 1) * (n + 2) != 6 * num_nodes: 

74 raise ValueError(f"Unknown tetrahedron layout. Number of nodes: {num_nodes}") 

75 return int(n - 1) 

76 

77 elif ct == "quadrilateral": 

78 n = np.sqrt(num_nodes) 

79 if num_nodes != n * n: 

80 raise ValueError(f"Unknown quadrilateral layout. Number of nodes: {num_nodes}") 

81 return int(n - 1) 

82 elif ct == "hexahedron": 

83 n = np.cbrt(num_nodes) 

84 if num_nodes != n * n * n: 

85 raise ValueError(f"Unknown hexahedron layout. Number of nodes: {num_nodes}") 

86 return int(n - 1) 

87 elif ct == "prism": 

88 if num_nodes == 6: 

89 return 1 

90 elif num_nodes == 15: 

91 return 2 

92 else: 

93 raise ValueError(f"Unknown prism layout. Number of nodes: {num_nodes}") 

94 elif ct == "pyramid": 

95 if num_nodes == 5: 

96 return 1 

97 elif num_nodes == 13: 

98 return 2 

99 else: 

100 raise ValueError(f"Unknown pyramid layout. Number of nodes: {num_nodes}") 

101 else: 

102 raise ValueError(f"Unknown cell type {ct} with {num_nodes=}.") 

103 

104 

105def get_default_backend_args(arguments: dict[str, Any] | None) -> dict[str, Any]: 

106 """Get default backend arguments given a set of input arguments. 

107 

108 Args: 

109 arguments: Input backend arguments 

110 

111 Returns: 

112 Updated backend arguments 

113 """ 

114 args = arguments or {} 

115 return args 

116 

117 

118def read_mesh_data( 

119 filename: Path | str, 

120 comm: MPI.Intracomm, 

121 time: str | float | None = None, 

122 read_from_partition: bool = False, 

123 backend_args: dict[str, Any] | None = None, 

124) -> ReadMeshData: 

125 """Read mesh data from file. 

126 

127 Args: 

128 filename: Path to file to read from 

129 comm: MPI communicator used in storage 

130 time: Time stamp associated with the mesh to read 

131 read_from_partition: Whether to read partition information 

132 backend_args: Arguments to backend 

133 

134 Returns: 

135 Internal data structure for the mesh data read from file 

136 """ 

137 backend_args = get_default_backend_args(backend_args) 

138 check_file_exists(filename) 

139 if read_from_partition: 

140 raise RuntimeError("Cannot read partition data with Pyvista") 

141 cells: npt.NDArray[np.int64] 

142 geom: npt.NDArray[np.float64 | np.float32] 

143 if comm.rank == 0: 

144 in_data = pyvista.read(filename) 

145 if isinstance(in_data, pyvista.UnstructuredGrid): 

146 grid = in_data 

147 elif isinstance(in_data, pyvista.core.composite.MultiBlock): 

148 # To handle multiblock like pvd 

149 if hasattr(pyvista, "_VTK_SNAKE_CASE_STATE"): 

150 pyvista._VTK_SNAKE_CASE_STATE = "allow" 

151 else: 

152 # Compatibility with 0.47 

153 pyvista.core.vtk_snake_case._state = "allow" 

154 number_of_blocks = in_data.number_of_blocks 

155 assert number_of_blocks == 1 

156 b0 = in_data.get_block(0) 

157 assert isinstance(b0, pyvista.UnstructuredGrid) 

158 grid = b0 

159 else: 

160 raise RuntimeError(f"Unknown data type {type(in_data)}") 

161 geom = grid.points 

162 num_cells_global = grid.number_of_cells 

163 cells = grid.cells.reshape(num_cells_global, -1).astype(np.int64) 

164 nodes_per_cell_type = cells[:, 0] 

165 assert np.allclose(nodes_per_cell_type, nodes_per_cell_type[0]), "Single celltype support" 

166 cells = cells[:, 1:].astype(np.int64) 

167 cell_types = grid.celltypes 

168 assert len(np.unique(cell_types)) == 1 

169 if (cell_type := cell_types[0]) in _first_order_vtk.keys(): 

170 cell_type = _first_order_vtk[cell_type] 

171 order = 1 

172 elif cell_type in _quadratric_vtk.keys(): 

173 cell_type = _quadratric_vtk[cell_type] 

174 order = 2 

175 elif cell_type in _arbitrary_lagrange_vtk.keys(): 

176 cell_type = _arbitrary_lagrange_vtk[cell_type] 

177 order = _cell_degree(cell_type, cells.shape[1]) 

178 else: 

179 raise NotImplementedError(f"Cell type {cell_type} not supported in Pyvista backend.") 

180 perm = dolfinx.cpp.io.perm_vtk(dolfinx.mesh.to_type(cell_type), cells.shape[1]) 

181 cells = cells[:, perm] 

182 lvar = int(basix.LagrangeVariant.equispaced) 

183 gtype = backend_args.get("dtype", geom.dtype) 

184 order, lvar, nodes_per_cell, cell_type, gtype, gdim = comm.bcast( 

185 (order, lvar, cells.shape[1], cell_type, gtype, geom.shape[1]), root=0 

186 ) 

187 else: 

188 order, lvar, nodes_per_cell, cell_type, gtype, gdim = comm.bcast(None, root=0) 

189 geom = np.zeros((0, gdim), dtype=gtype) 

190 cells = np.zeros((0, nodes_per_cell), dtype=np.int64) 

191 

192 return ReadMeshData( 

193 cells=cells, cell_type=cell_type, x=geom.astype(gtype), lvar=lvar, degree=order 

194 ) 

195 

196 

197def read_point_data( 

198 filename: Path | str, 

199 name: str, 

200 comm: MPI.Intracomm, 

201 time: float | str | None, 

202 backend_args: dict[str, Any] | None, 

203) -> tuple[np.ndarray, int]: 

204 """Read data from the nodes of a mesh. 

205 

206 Args: 

207 filename: Path to file 

208 name: Name of point data 

209 comm: Communicator to launch IO on. 

210 time: The time stamp 

211 backend_args: The backend arguments 

212 

213 Returns: 

214 Data local to process (contiguous, no mpi comm) and local start range 

215 """ 

216 dataset: np.ndarray 

217 if MPI.COMM_WORLD.rank == 0: 

218 in_data = pyvista.read(filename) 

219 if isinstance(in_data, pyvista.UnstructuredGrid): 

220 grid = in_data 

221 elif isinstance(in_data, pyvista.core.composite.MultiBlock): 

222 # To handle multiblock like pvd 

223 if hasattr(pyvista, "_VTK_SNAKE_CASE_STATE"): 

224 pyvista._VTK_SNAKE_CASE_STATE = "allow" 

225 else: 

226 # Compatibility with 0.47 

227 pyvista.core.vtk_snake_case._state = "allow" 

228 number_of_blocks = in_data.number_of_blocks 

229 assert number_of_blocks == 1 

230 b0 = in_data.get_block(0) 

231 assert isinstance(b0, pyvista.UnstructuredGrid) 

232 grid = b0 

233 

234 dataset = grid.point_data[name] 

235 if len(dataset.shape) == 1: 

236 num_components = 1 

237 dataset = dataset.reshape(-1, num_components) 

238 else: 

239 num_components = dataset.shape[1] 

240 if np.issubdtype(dataset.dtype, np.integer): 

241 gtype = grid.points.dtype 

242 dataset = dataset.astype(gtype) 

243 else: 

244 gtype = dataset.dtype 

245 num_components, gtype = comm.bcast((num_components, gtype), root=0) 

246 local_range_start = 0 

247 else: 

248 num_components, gtype = comm.bcast(None, root=0) 

249 dataset = np.zeros((0, num_components), dtype=gtype) 

250 local_range_start = 0 

251 

252 return dataset, int(local_range_start) 

253 

254 

255def read_cell_data( 

256 filename: Path | str, 

257 name: str, 

258 comm: MPI.Intracomm, 

259 time: str | float | None, 

260 backend_args: dict[str, Any] | None, 

261) -> tuple[npt.NDArray[np.int64], np.ndarray]: 

262 dataset: np.ndarray 

263 topology: np.ndarray 

264 if MPI.COMM_WORLD.rank == 0: 

265 in_data = pyvista.read(filename) 

266 if isinstance(in_data, pyvista.UnstructuredGrid): 

267 grid = in_data 

268 elif isinstance(in_data, pyvista.core.composite.MultiBlock): 

269 # To handle multiblock like pvd 

270 if hasattr(pyvista, "_VTK_SNAKE_CASE_STATE"): 

271 pyvista._VTK_SNAKE_CASE_STATE = "allow" 

272 else: 

273 # Compatibility with 0.47 

274 pyvista.core.vtk_snake_case._state = "allow" 

275 number_of_blocks = in_data.number_of_blocks 

276 assert number_of_blocks == 1 

277 b0 = in_data.get_block(0) 

278 assert isinstance(b0, pyvista.UnstructuredGrid) 

279 grid = b0 

280 

281 dataset = grid.cell_data[name] 

282 if len(dataset.shape) == 1: 

283 num_components = 1 

284 dataset = dataset.reshape(-1, num_components) 

285 else: 

286 num_components = dataset.shape[1] 

287 

288 if np.issubdtype(dataset.dtype, np.integer): 

289 gtype = in_data.points.dtype 

290 dataset = dataset.astype(gtype) 

291 else: 

292 gtype = dataset.dtype 

293 num_components, gtype = comm.bcast((num_components, gtype), root=0) 

294 else: 

295 num_components, gtype = comm.bcast(None, root=0) 

296 dataset = np.zeros((0, num_components), dtype=gtype) 

297 _time = float(time) if time is not None else None 

298 topology = read_mesh_data(filename, comm, _time, False, backend_args=None).cells 

299 return topology, dataset 

300 

301 

302def write_attributes( 

303 filename: Path | str, 

304 comm: MPI.Intracomm, 

305 name: str, 

306 attributes: dict[str, np.ndarray], 

307 backend_args: dict[str, Any] | None, 

308): 

309 """Write attributes to file. 

310 

311 Args: 

312 filename: Path to file to write to 

313 comm: MPI communicator used in storage 

314 name: Name of the attribute group 

315 attributes: Dictionary of attributes to write 

316 backend_args: Arguments to backend 

317 """ 

318 raise NotImplementedError("The Pyvista backend cannot write attributes.") 

319 

320 

321def read_attributes( 

322 filename: Path | str, 

323 comm: MPI.Intracomm, 

324 name: str, 

325 backend_args: dict[str, Any] | None, 

326) -> dict[str, Any]: 

327 """Read attributes from file. 

328 

329 Args: 

330 filename: Path to file to read from 

331 comm: MPI communicator used in storage 

332 name: Name of the attribute group 

333 backend_args: Arguments to backend 

334 

335 Returns: 

336 Dictionary of attributes read from file 

337 """ 

338 raise NotImplementedError("The Pyvista backend cannot read attributes.") 

339 

340 

341def read_timestamps( 

342 filename: Path | str, 

343 comm: MPI.Intracomm, 

344 function_name: str, 

345 backend_args: dict[str, Any] | None, 

346) -> npt.NDArray[np.float64 | str]: # type: ignore[type-var] 

347 """Read timestamps from file. 

348 

349 Args: 

350 filename: Path to file to read from 

351 comm: MPI communicator used in storage 

352 function_name: Name of the function to read timestamps for 

353 backend_args: Arguments to backend 

354 

355 Returns: 

356 Numpy array of timestamps read from file 

357 """ 

358 raise NotImplementedError("The Pyvista backend cannot read timestamps.") 

359 

360 

361def read_function_names( 

362 filename: Path | str, comm: MPI.Intracomm, backend_args: dict[str, Any] | None 

363) -> list[str]: 

364 """Read all function names from a file. 

365 

366 Args: 

367 filename: Path to file 

368 comm: MPI communicator to launch IO on. 

369 backend_args: Arguments to backend 

370 

371 Returns: 

372 A list of function names. 

373 """ 

374 in_data = pyvista.read(filename) 

375 if isinstance(in_data, pyvista.UnstructuredGrid): 

376 grid = in_data 

377 elif isinstance(in_data, pyvista.core.composite.MultiBlock): 

378 # To handle multiblock like pvd 

379 if hasattr(pyvista, "_VTK_SNAKE_CASE_STATE"): 

380 pyvista._VTK_SNAKE_CASE_STATE = "allow" 

381 else: 

382 # Compatibility with 0.47 

383 pyvista.core.vtk_snake_case._state = "allow" 

384 number_of_blocks = in_data.number_of_blocks 

385 assert number_of_blocks == 1 

386 b0 = in_data.get_block(0) 

387 assert isinstance(b0, pyvista.UnstructuredGrid) 

388 grid = b0 

389 

390 point_data = list(grid.point_data.keys()) 

391 cell_data = list(grid.cell_data.keys()) 

392 return point_data + cell_data 

393 

394 

395def write_mesh( 

396 filename: Path | str, 

397 comm: MPI.Intracomm, 

398 mesh: MeshData, 

399 backend_args: dict[str, Any] | None, 

400 mode: FileMode, 

401 time: float, 

402): 

403 """ 

404 Write a mesh to file. 

405 

406 Args: 

407 comm: MPI communicator used in storage 

408 mesh: Internal data structure for the mesh data to save to file 

409 filename: Path to file to write to 

410 backend_args: Arguments to backend 

411 mode: File-mode to store the mesh 

412 time: Time stamp associated with the mesh 

413 """ 

414 raise NotImplementedError("The Pyvista backend cannot write meshes.") 

415 

416 

417def write_meshtags( 

418 filename: str | Path, 

419 comm: MPI.Intracomm, 

420 data: MeshTagsData, 

421 backend_args: dict[str, Any] | None, 

422): 

423 """Write mesh tags to file. 

424 

425 Args: 

426 filename: Path to file to write to 

427 comm: MPI communicator used in storage 

428 data: Internal data structure for the mesh tags to save to file 

429 backend_args: Arguments to backend 

430 """ 

431 raise NotImplementedError("The Pyvista backend cannot write meshtags.") 

432 

433 

434def read_meshtags_data( 

435 filename: str | Path, 

436 comm: MPI.Intracomm, 

437 name: str, 

438 backend_args: dict[str, Any] | None, 

439) -> MeshTagsData: 

440 """Read mesh tags from file. 

441 

442 Args: 

443 filename: Path to file to read from 

444 comm: MPI communicator used in storage 

445 name: Name of the mesh tags to read 

446 backend_args: Arguments to backend 

447 

448 Returns: 

449 Internal data structure for the mesh tags read from file 

450 """ 

451 raise NotImplementedError("The Pyvista backend cannot read meshtags.") 

452 

453 

454def read_dofmap( 

455 filename: str | Path, 

456 comm: MPI.Intracomm, 

457 name: str, 

458 backend_args: dict[str, Any] | None, 

459) -> dolfinx.graph.AdjacencyList: 

460 """Read the dofmap of a function with a given name. 

461 

462 Args: 

463 filename: Path to file to read from 

464 comm: MPI communicator used in storage 

465 name: Name of the function to read the dofmap for 

466 backend_args: Arguments to backend 

467 

468 Returns: 

469 Dofmap as an AdjacencyList 

470 """ 

471 raise NotImplementedError("The Pyvista backend cannot make checkpoints.") 

472 

473 

474def read_dofs( 

475 filename: str | Path, 

476 comm: MPI.Intracomm, 

477 name: str, 

478 time: float, 

479 backend_args: dict[str, Any] | None, 

480) -> tuple[npt.NDArray[np.float32 | np.float64 | np.complex64 | np.complex128], int]: 

481 """Read the dofs (values) of a function with a given name from a given timestep. 

482 

483 Args: 

484 filename: Path to file to read from 

485 comm: MPI communicator used in storage 

486 name: Name of the function to read the dofs for 

487 time: Time stamp associated with the function to read 

488 backend_args: Arguments to backend 

489 

490 Returns: 

491 Contiguous sequence of degrees of freedom (with respect to input data) 

492 and the global starting point on the process. 

493 Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc. 

494 """ 

495 raise NotImplementedError("The Pyvista backend cannot make checkpoints.") 

496 

497 

498def read_cell_perms( 

499 comm: MPI.Intracomm, filename: Path | str, backend_args: dict[str, Any] | None 

500) -> npt.NDArray[np.uint32]: 

501 """ 

502 Read cell permutation from file with given communicator, 

503 Split in continuous chunks based on number of cells in the input data. 

504 

505 Args: 

506 comm: MPI communicator used in storage 

507 filename: Path to file to read from 

508 backend_args: Arguments to backend 

509 

510 Returns: 

511 Contiguous sequence of permutations (with respect to input data) 

512 Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc. 

513 """ 

514 raise NotImplementedError("The Pyvista backend cannot make checkpoints.") 

515 

516 

517def write_function( 

518 filename: Path, 

519 comm: MPI.Intracomm, 

520 u: FunctionData, 

521 time: float, 

522 mode: FileMode, 

523 backend_args: dict[str, Any] | None, 

524): 

525 """ 

526 Write a function to file. 

527 

528 Args: 

529 comm: MPI communicator used in storage 

530 u: Internal data structure for the function data to save to file 

531 filename: Path to file to write to 

532 time: Time stamp associated with function 

533 mode: File-mode to store the function 

534 backend_args: Arguments to backend 

535 """ 

536 raise NotImplementedError("The Pyvista backend cannot make checkpoints.") 

537 

538 

539def read_legacy_mesh( 

540 filename: Path | str, comm: MPI.Intracomm, group: str 

541) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.floating], str | None]: 

542 """Read in the mesh topology, geometry and (optionally) cell type from a 

543 legacy DOLFIN HDF5-file. 

544 

545 Args: 

546 filename: Path to file to read from 

547 comm: MPI communicator used in storage 

548 group: Group in HDF5 file where mesh is stored 

549 

550 Returns: 

551 Tuple containing: 

552 - Topology as a (num_cells, num_vertices_per_cell) array of global vertex indices 

553 - Geometry as a (num_vertices, geometric_dimension) array of vertex coordinates 

554 - Cell type as a string (e.g. "tetrahedron") or None if not found 

555 """ 

556 raise NotImplementedError("The Pyvista backend cannot read legacy DOLFIN meshes.") 

557 

558 

559def snapshot_checkpoint( 

560 filename: Path | str, 

561 mode: FileMode, 

562 u: dolfinx.fem.Function, 

563 backend_args: dict[str, Any] | None, 

564): 

565 """Create a snapshot checkpoint of a dolfinx function. 

566 

567 Args: 

568 filename: Path to file to read from 

569 mode: File-mode to store the function 

570 u: dolfinx function to create a snapshot checkpoint for 

571 backend_args: Arguments to backend 

572 """ 

573 raise NotImplementedError("The Pyvista backend cannot make checkpoints.") 

574 

575 

576def read_hdf5_array( 

577 comm: MPI.Intracomm, 

578 filename: Path | str, 

579 group: str, 

580 backend_args: dict[str, Any] | None, 

581) -> tuple[np.ndarray, int]: 

582 """Read an array from an HDF5 file. 

583 

584 Args: 

585 comm: MPI communicator used in storage 

586 filename: Path to file to read from 

587 group: Group in HDF5 file where array is stored 

588 backend_args: Arguments to backend 

589 

590 Returns: 

591 Tuple containing: 

592 - Numpy array read from file 

593 - Global starting point on the process. 

594 Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc. 

595 """ 

596 raise NotImplementedError("The Pyvista backend cannot read HDF5 arrays") 

597 

598 

599def write_data( 

600 filename: Path | str, 

601 array_data: ArrayData, 

602 comm: MPI.Intracomm, 

603 time: str | float | None, 

604 mode: FileMode, 

605 backend_args: dict[str, Any] | None, 

606): 

607 """Write a 2D-array to file (distributed across proceses with MPI). 

608 

609 Args: 

610 filename: Path to file 

611 array_data: Data to write to file 

612 comm: MPI communicator to open the file with 

613 time: Time stamp 

614 mode: Append or write 

615 backend_args: The backend arguments 

616 """ 

617 raise NotImplementedError("The pyvista backend does not support writing point data")