Coverage for / dolfinx-env / lib / python3.12 / site-packages / io4dolfinx / backends / pyvista / backend.py: 78%
190 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-26 18:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-26 18:16 +0000
1"""
2Module that uses pyvista to import grids.
3"""
5from typing import Any
7import numpy as np
8import numpy.typing as npt
10try:
11 import pyvista
12except ImportError:
13 raise ModuleNotFoundError("The PyVista-backend requires pyvista")
14from pathlib import Path
16from mpi4py import MPI
18import basix
19import dolfinx
21from io4dolfinx.structures import ArrayData, FunctionData, MeshData, MeshTagsData, ReadMeshData
22from io4dolfinx.utils import check_file_exists
24from .. import FileMode, ReadMode
26# Cell types can be found at
27# https://vtk.org/doc/nightly/html/vtkCellType_8h_source.html
28_first_order_vtk = {
29 1: "point",
30 3: "interval",
31 5: "triangle",
32 9: "quadrilateral",
33 10: "tetrahedron",
34 12: "hexahedron",
35}
37_arbitrary_lagrange_vtk = {
38 68: "interval",
39 69: "triangle",
40 70: "quadrilateral",
41 71: "tetrahedron",
42 72: "hexahedron",
43 73: "prism",
44 74: "pyramid",
45}
48read_mode = ReadMode.serial
51def _cell_degree(ct: str, num_nodes: int) -> int:
52 if ct == "point":
53 return 1
54 elif ct == "interval":
55 return int(num_nodes - 1)
56 elif ct == "triangle":
57 n = (np.sqrt(1 + 8 * num_nodes) - 1) / 2
58 if 2 * num_nodes != n * (n + 1):
59 raise ValueError(f"Unknown triangle layout. Number of nodes: {num_nodes}")
60 return int(n - 1)
61 elif ct == "tetrahedron":
62 n = 0
63 while n * (n + 1) * (n + 2) < 6 * num_nodes:
64 n += 1
65 if n * (n + 1) * (n + 2) != 6 * num_nodes:
66 raise ValueError(f"Unknown tetrahedron layout. Number of nodes: {num_nodes}")
67 return int(n - 1)
69 elif ct == "quadrilateral":
70 n = np.sqrt(num_nodes)
71 if num_nodes != n * n:
72 raise ValueError(f"Unknown quadrilateral layout. Number of nodes: {num_nodes}")
73 return int(n - 1)
74 elif ct == "hexahedron":
75 n = np.cbrt(num_nodes)
76 if num_nodes != n * n * n:
77 raise ValueError(f"Unknown hexahedron layout. Number of nodes: {num_nodes}")
78 return int(n - 1)
79 elif ct == "prism":
80 if num_nodes == 6:
81 return 1
82 elif num_nodes == 15:
83 return 2
84 else:
85 raise ValueError(f"Unknown prism layout. Number of nodes: {num_nodes}")
86 elif ct == "pyramid":
87 if num_nodes == 5:
88 return 1
89 elif num_nodes == 13:
90 return 2
91 else:
92 raise ValueError(f"Unknown pyramid layout. Number of nodes: {num_nodes}")
93 else:
94 raise ValueError(f"Unknown cell type {ct} with {num_nodes=}.")
97def get_default_backend_args(arguments: dict[str, Any] | None) -> dict[str, Any]:
98 """Get default backend arguments given a set of input arguments.
100 Args:
101 arguments: Input backend arguments
103 Returns:
104 Updated backend arguments
105 """
106 args = arguments or {}
107 return args
110def read_mesh_data(
111 filename: Path | str,
112 comm: MPI.Intracomm,
113 time: str | float | None = None,
114 read_from_partition: bool = False,
115 backend_args: dict[str, Any] | None = None,
116) -> ReadMeshData:
117 """Read mesh data from file.
119 Args:
120 filename: Path to file to read from
121 comm: MPI communicator used in storage
122 time: Time stamp associated with the mesh to read
123 read_from_partition: Whether to read partition information
124 backend_args: Arguments to backend
126 Returns:
127 Internal data structure for the mesh data read from file
128 """
129 backend_args = get_default_backend_args(backend_args)
130 check_file_exists(filename)
131 if read_from_partition:
132 raise RuntimeError("Cannot read partition data with Pyvista")
133 cells: npt.NDArray[np.int64]
134 geom: npt.NDArray[np.float64 | np.float32]
135 if comm.rank == 0:
136 in_data = pyvista.read(filename)
137 if isinstance(in_data, pyvista.UnstructuredGrid):
138 grid = in_data
139 elif isinstance(in_data, pyvista.core.composite.MultiBlock):
140 # To handle multiblock like pvd
141 if hasattr(pyvista, "_VTK_SNAKE_CASE_STATE"):
142 pyvista._VTK_SNAKE_CASE_STATE = "allow"
143 else:
144 # Compatibility with 0.47
145 pyvista.core.vtk_snake_case._state = "allow"
146 number_of_blocks = in_data.number_of_blocks
147 assert number_of_blocks == 1
148 b0 = in_data.get_block(0)
149 assert isinstance(b0, pyvista.UnstructuredGrid)
150 grid = b0
151 else:
152 raise RuntimeError(f"Unknown data type {type(in_data)}")
153 geom = grid.points
154 num_cells_global = grid.number_of_cells
155 cells = grid.cells.reshape(num_cells_global, -1).astype(np.int64)
156 nodes_per_cell_type = cells[:, 0]
157 assert np.allclose(nodes_per_cell_type, nodes_per_cell_type[0]), "Single celltype support"
158 cells = cells[:, 1:].astype(np.int64)
159 cell_types = grid.celltypes
160 assert len(np.unique(cell_types)) == 1
161 if (cell_type := cell_types[0]) in _first_order_vtk.keys():
162 cell_type = _first_order_vtk[cell_type]
163 order = 1
164 elif cell_type in _arbitrary_lagrange_vtk.keys():
165 cell_type = _arbitrary_lagrange_vtk[cell_type]
166 order = _cell_degree(cell_type, cells.shape[1])
167 perm = dolfinx.cpp.io.perm_vtk(dolfinx.mesh.to_type(cell_type), cells.shape[1])
168 cells = cells[:, perm]
169 lvar = int(basix.LagrangeVariant.equispaced)
170 gtype = backend_args.get("dtype", geom.dtype)
171 order, lvar, nodes_per_cell, cell_type, gtype, gdim = comm.bcast(
172 (order, lvar, cells.shape[1], cell_type, gtype, geom.shape[1]), root=0
173 )
174 else:
175 order, lvar, nodes_per_cell, cell_type, gtype, gdim = comm.bcast(None, root=0)
176 geom = np.zeros((0, gdim), dtype=gtype)
177 cells = np.zeros((0, nodes_per_cell), dtype=np.int64)
179 return ReadMeshData(
180 cells=cells, cell_type=cell_type, x=geom.astype(gtype), lvar=lvar, degree=order
181 )
184def read_point_data(
185 filename: Path | str,
186 name: str,
187 comm: MPI.Intracomm,
188 time: float | str | None,
189 backend_args: dict[str, Any] | None,
190) -> tuple[np.ndarray, int]:
191 """Read data from the nodes of a mesh.
193 Args:
194 filename: Path to file
195 name: Name of point data
196 comm: Communicator to launch IO on.
197 time: The time stamp
198 backend_args: The backend arguments
200 Returns:
201 Data local to process (contiguous, no mpi comm) and local start range
202 """
203 dataset: np.ndarray
204 if MPI.COMM_WORLD.rank == 0:
205 in_data = pyvista.read(filename)
206 if isinstance(in_data, pyvista.UnstructuredGrid):
207 grid = in_data
208 elif isinstance(in_data, pyvista.core.composite.MultiBlock):
209 # To handle multiblock like pvd
210 if hasattr(pyvista, "_VTK_SNAKE_CASE_STATE"):
211 pyvista._VTK_SNAKE_CASE_STATE = "allow"
212 else:
213 # Compatibility with 0.47
214 pyvista.core.vtk_snake_case._state = "allow"
215 number_of_blocks = in_data.number_of_blocks
216 assert number_of_blocks == 1
217 b0 = in_data.get_block(0)
218 assert isinstance(b0, pyvista.UnstructuredGrid)
219 grid = b0
221 dataset = grid.point_data[name]
222 if len(dataset.shape) == 1:
223 num_components = 1
224 dataset = dataset.reshape(-1, num_components)
225 else:
226 num_components = dataset.shape[1]
227 if np.issubdtype(dataset.dtype, np.integer):
228 gtype = grid.points.dtype
229 dataset = dataset.astype(gtype)
230 else:
231 gtype = dataset.dtype
232 num_components, gtype = comm.bcast((num_components, gtype), root=0)
233 local_range_start = 0
234 else:
235 num_components, gtype = comm.bcast(None, root=0)
236 dataset = np.zeros((0, num_components), dtype=gtype)
237 local_range_start = 0
239 return dataset, int(local_range_start)
242def read_cell_data(
243 filename: Path | str,
244 name: str,
245 comm: MPI.Intracomm,
246 time: str | float | None,
247 backend_args: dict[str, Any] | None,
248) -> tuple[npt.NDArray[np.int64], np.ndarray]:
249 dataset: np.ndarray
250 topology: np.ndarray
251 if MPI.COMM_WORLD.rank == 0:
252 in_data = pyvista.read(filename)
253 if isinstance(in_data, pyvista.UnstructuredGrid):
254 grid = in_data
255 elif isinstance(in_data, pyvista.core.composite.MultiBlock):
256 # To handle multiblock like pvd
257 if hasattr(pyvista, "_VTK_SNAKE_CASE_STATE"):
258 pyvista._VTK_SNAKE_CASE_STATE = "allow"
259 else:
260 # Compatibility with 0.47
261 pyvista.core.vtk_snake_case._state = "allow"
262 number_of_blocks = in_data.number_of_blocks
263 assert number_of_blocks == 1
264 b0 = in_data.get_block(0)
265 assert isinstance(b0, pyvista.UnstructuredGrid)
266 grid = b0
268 dataset = grid.cell_data[name]
269 if len(dataset.shape) == 1:
270 num_components = 1
271 dataset = dataset.reshape(-1, num_components)
272 else:
273 num_components = dataset.shape[1]
275 if np.issubdtype(dataset.dtype, np.integer):
276 gtype = in_data.points.dtype
277 dataset = dataset.astype(gtype)
278 else:
279 gtype = dataset.dtype
280 num_components, gtype = comm.bcast((num_components, gtype), root=0)
281 else:
282 num_components, gtype = comm.bcast(None, root=0)
283 dataset = np.zeros((0, num_components), dtype=gtype)
284 _time = float(time) if time is not None else None
285 topology = read_mesh_data(filename, comm, _time, False, backend_args=None).cells
286 return topology, dataset
289def write_attributes(
290 filename: Path | str,
291 comm: MPI.Intracomm,
292 name: str,
293 attributes: dict[str, np.ndarray],
294 backend_args: dict[str, Any] | None,
295):
296 """Write attributes to file.
298 Args:
299 filename: Path to file to write to
300 comm: MPI communicator used in storage
301 name: Name of the attribute group
302 attributes: Dictionary of attributes to write
303 backend_args: Arguments to backend
304 """
305 raise NotImplementedError("The Pyvista backend cannot write attributes.")
308def read_attributes(
309 filename: Path | str,
310 comm: MPI.Intracomm,
311 name: str,
312 backend_args: dict[str, Any] | None,
313) -> dict[str, Any]:
314 """Read attributes from file.
316 Args:
317 filename: Path to file to read from
318 comm: MPI communicator used in storage
319 name: Name of the attribute group
320 backend_args: Arguments to backend
322 Returns:
323 Dictionary of attributes read from file
324 """
325 raise NotImplementedError("The Pyvista backend cannot read attributes.")
328def read_timestamps(
329 filename: Path | str,
330 comm: MPI.Intracomm,
331 function_name: str,
332 backend_args: dict[str, Any] | None,
333) -> npt.NDArray[np.float64 | str]: # type: ignore[type-var]
334 """Read timestamps from file.
336 Args:
337 filename: Path to file to read from
338 comm: MPI communicator used in storage
339 function_name: Name of the function to read timestamps for
340 backend_args: Arguments to backend
342 Returns:
343 Numpy array of timestamps read from file
344 """
345 raise NotImplementedError("The Pyvista backend cannot read timestamps.")
348def read_function_names(
349 filename: Path | str, comm: MPI.Intracomm, backend_args: dict[str, Any] | None
350) -> list[str]:
351 """Read all function names from a file.
353 Args:
354 filename: Path to file
355 comm: MPI communicator to launch IO on.
356 backend_args: Arguments to backend
358 Returns:
359 A list of function names.
360 """
361 in_data = pyvista.read(filename)
362 if isinstance(in_data, pyvista.UnstructuredGrid):
363 grid = in_data
364 elif isinstance(in_data, pyvista.core.composite.MultiBlock):
365 # To handle multiblock like pvd
366 if hasattr(pyvista, "_VTK_SNAKE_CASE_STATE"):
367 pyvista._VTK_SNAKE_CASE_STATE = "allow"
368 else:
369 # Compatibility with 0.47
370 pyvista.core.vtk_snake_case._state = "allow"
371 number_of_blocks = in_data.number_of_blocks
372 assert number_of_blocks == 1
373 b0 = in_data.get_block(0)
374 assert isinstance(b0, pyvista.UnstructuredGrid)
375 grid = b0
377 point_data = list(grid.point_data.keys())
378 cell_data = list(grid.cell_data.keys())
379 return point_data + cell_data
382def write_mesh(
383 filename: Path | str,
384 comm: MPI.Intracomm,
385 mesh: MeshData,
386 backend_args: dict[str, Any] | None,
387 mode: FileMode,
388 time: float,
389):
390 """
391 Write a mesh to file.
393 Args:
394 comm: MPI communicator used in storage
395 mesh: Internal data structure for the mesh data to save to file
396 filename: Path to file to write to
397 backend_args: Arguments to backend
398 mode: File-mode to store the mesh
399 time: Time stamp associated with the mesh
400 """
401 raise NotImplementedError("The Pyvista backend cannot write meshes.")
404def write_meshtags(
405 filename: str | Path,
406 comm: MPI.Intracomm,
407 data: MeshTagsData,
408 backend_args: dict[str, Any] | None,
409):
410 """Write mesh tags to file.
412 Args:
413 filename: Path to file to write to
414 comm: MPI communicator used in storage
415 data: Internal data structure for the mesh tags to save to file
416 backend_args: Arguments to backend
417 """
418 raise NotImplementedError("The Pyvista backend cannot write meshtags.")
421def read_meshtags_data(
422 filename: str | Path,
423 comm: MPI.Intracomm,
424 name: str,
425 backend_args: dict[str, Any] | None,
426) -> MeshTagsData:
427 """Read mesh tags from file.
429 Args:
430 filename: Path to file to read from
431 comm: MPI communicator used in storage
432 name: Name of the mesh tags to read
433 backend_args: Arguments to backend
435 Returns:
436 Internal data structure for the mesh tags read from file
437 """
438 raise NotImplementedError("The Pyvista backend cannot read meshtags.")
441def read_dofmap(
442 filename: str | Path,
443 comm: MPI.Intracomm,
444 name: str,
445 backend_args: dict[str, Any] | None,
446) -> dolfinx.graph.AdjacencyList:
447 """Read the dofmap of a function with a given name.
449 Args:
450 filename: Path to file to read from
451 comm: MPI communicator used in storage
452 name: Name of the function to read the dofmap for
453 backend_args: Arguments to backend
455 Returns:
456 Dofmap as an AdjacencyList
457 """
458 raise NotImplementedError("The Pyvista backend cannot make checkpoints.")
461def read_dofs(
462 filename: str | Path,
463 comm: MPI.Intracomm,
464 name: str,
465 time: float,
466 backend_args: dict[str, Any] | None,
467) -> tuple[npt.NDArray[np.float32 | np.float64 | np.complex64 | np.complex128], int]:
468 """Read the dofs (values) of a function with a given name from a given timestep.
470 Args:
471 filename: Path to file to read from
472 comm: MPI communicator used in storage
473 name: Name of the function to read the dofs for
474 time: Time stamp associated with the function to read
475 backend_args: Arguments to backend
477 Returns:
478 Contiguous sequence of degrees of freedom (with respect to input data)
479 and the global starting point on the process.
480 Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc.
481 """
482 raise NotImplementedError("The Pyvista backend cannot make checkpoints.")
485def read_cell_perms(
486 comm: MPI.Intracomm, filename: Path | str, backend_args: dict[str, Any] | None
487) -> npt.NDArray[np.uint32]:
488 """
489 Read cell permutation from file with given communicator,
490 Split in continuous chunks based on number of cells in the input data.
492 Args:
493 comm: MPI communicator used in storage
494 filename: Path to file to read from
495 backend_args: Arguments to backend
497 Returns:
498 Contiguous sequence of permutations (with respect to input data)
499 Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc.
500 """
501 raise NotImplementedError("The Pyvista backend cannot make checkpoints.")
504def write_function(
505 filename: Path,
506 comm: MPI.Intracomm,
507 u: FunctionData,
508 time: float,
509 mode: FileMode,
510 backend_args: dict[str, Any] | None,
511):
512 """
513 Write a function to file.
515 Args:
516 comm: MPI communicator used in storage
517 u: Internal data structure for the function data to save to file
518 filename: Path to file to write to
519 time: Time stamp associated with function
520 mode: File-mode to store the function
521 backend_args: Arguments to backend
522 """
523 raise NotImplementedError("The Pyvista backend cannot make checkpoints.")
526def read_legacy_mesh(
527 filename: Path | str, comm: MPI.Intracomm, group: str
528) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.floating], str | None]:
529 """Read in the mesh topology, geometry and (optionally) cell type from a
530 legacy DOLFIN HDF5-file.
532 Args:
533 filename: Path to file to read from
534 comm: MPI communicator used in storage
535 group: Group in HDF5 file where mesh is stored
537 Returns:
538 Tuple containing:
539 - Topology as a (num_cells, num_vertices_per_cell) array of global vertex indices
540 - Geometry as a (num_vertices, geometric_dimension) array of vertex coordinates
541 - Cell type as a string (e.g. "tetrahedron") or None if not found
542 """
543 raise NotImplementedError("The Pyvista backend cannot read legacy DOLFIN meshes.")
546def snapshot_checkpoint(
547 filename: Path | str,
548 mode: FileMode,
549 u: dolfinx.fem.Function,
550 backend_args: dict[str, Any] | None,
551):
552 """Create a snapshot checkpoint of a dolfinx function.
554 Args:
555 filename: Path to file to read from
556 mode: File-mode to store the function
557 u: dolfinx function to create a snapshot checkpoint for
558 backend_args: Arguments to backend
559 """
560 raise NotImplementedError("The Pyvista backend cannot make checkpoints.")
563def read_hdf5_array(
564 comm: MPI.Intracomm,
565 filename: Path | str,
566 group: str,
567 backend_args: dict[str, Any] | None,
568) -> tuple[np.ndarray, int]:
569 """Read an array from an HDF5 file.
571 Args:
572 comm: MPI communicator used in storage
573 filename: Path to file to read from
574 group: Group in HDF5 file where array is stored
575 backend_args: Arguments to backend
577 Returns:
578 Tuple containing:
579 - Numpy array read from file
580 - Global starting point on the process.
581 Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc.
582 """
583 raise NotImplementedError("The Pyvista backend cannot read HDF5 arrays")
586def write_data(
587 filename: Path | str,
588 array_data: ArrayData,
589 comm: MPI.Intracomm,
590 time: str | float | None,
591 mode: FileMode,
592 backend_args: dict[str, Any] | None,
593):
594 """Write a 2D-array to file (distributed across proceses with MPI).
596 Args:
597 filename: Path to file
598 array_data: Data to write to file
599 comm: MPI communicator to open the file with
600 time: Time stamp
601 mode: Append or write
602 backend_args: The backend arguments
603 """
604 raise NotImplementedError("The pyvista backend does not support writing point data")