Source code for io4dolfinx.backends.vtkhdf.backend

"""
Module that can read the VTKHDF format using h5py.
"""

from pathlib import Path
from typing import Any

from mpi4py import MPI

import basix
import dolfinx
import h5py
import numpy as np
import numpy.typing as npt

from io4dolfinx.structures import ArrayData, FunctionData, MeshData, MeshTagsData, ReadMeshData
from io4dolfinx.utils import check_file_exists, compute_local_range

from .. import FileMode, ReadMode
from ..h5py.backend import convert_file_mode, h5pyfile
from ..pyvista.backend import _arbitrary_lagrange_vtk, _cell_degree, _first_order_vtk

read_mode = ReadMode.parallel

_vtk_hdf_version = np.array([2, 1], dtype=np.int32)


[docs] def get_default_backend_args(arguments: dict[str, Any] | None) -> dict[str, Any]: """Get default backend arguments given a set of input arguments. Args: arguments: Input backend arguments Returns: Updated backend arguments """ args = arguments or {"name": "mesh"} return args
[docs] def find_all_unique_cell_types(comm, cell_types, num_nodes): """ Given a set of cell types and number of nodes per cell, find all unique cell types across all ranks. Args: comm: MPI communicator cell_types: Local cell types num_nodes: Number of nodes per cell Returns: A 2D array where each row corresponds to a cell type (vtk int) and the number of nodes. """ # Combine cell_types, num_nodes as tuple c_hash = np.zeros((2, len(cell_types)), dtype=np.int32) c_hash[0] = cell_types c_hash[1] = num_nodes indexes = np.unique(c_hash.T, axis=0, return_index=True)[1] local_unique_cells = c_hash.T[indexes] all_cell_types = np.vstack(comm.allgather(local_unique_cells)) indexes = np.unique(all_cell_types, axis=0, return_index=True)[1] all_unique_cell_types = all_cell_types[indexes] return all_unique_cell_types
def _decode_bytes_if_needed(value: bytes | str) -> str: """Decode bytes to string if necessary (for h5py compatibility)""" if isinstance(value, bytes): return value.decode("utf-8") return value def _get_vtk_group(h5file, name: str) -> h5py.Group: """ Navigates the VTKHDF group hierarchy to find the specific first UnstructuredGrid with a specific name. Handles both MultiBlockDataSet and direct UnstructuredGrid types. """ hdf = h5file["VTKHDF"] file_type = _decode_bytes_if_needed(hdf.attrs["Type"]) if file_type == "MultiBlockDataSet": ass = hdf["Assembly"] def visitor(path): mesh_group = path.rsplit("/", 1) mesh_name = mesh_group[0] if len(mesh_group) == 1 else mesh_group[1] if mesh_name == name: obj = ass.get(path) # Check attributes carefully if "Type" in obj.attrs.keys(): attr_type = obj.attrs["Type"] if isinstance(attr_type, bytes): attr_type = attr_type.decode("utf-8") if attr_type == "UnstructuredGrid": return path return None mesh_node = ass.visit_links(visitor) if mesh_node is None: raise RuntimeError(f"Could not find unique mesh named '{name}' in Assembly.") return ass[mesh_node] elif file_type == "UnstructuredGrid": return hdf else: raise RuntimeError(f"Not supported file type {file_type}") def _get_time_index(hdf: h5py.Group, time: float | str, filename: str | Path) -> int: """Finds the index of a specific time stamp.""" if "Steps" not in hdf.keys(): raise RuntimeError(f"No timestepping information found in {filename}.") stamps = hdf["Steps"]["Values"][:] pos = np.flatnonzero(np.isclose(stamps, time)) if len(pos) == 0: raise RuntimeError(f"Could not find mesh at t={time} in {filename}.") elif len(pos) > 1: raise RuntimeError(f"Multiple time steps for mesh at t={time} in {filename}") return pos[0]
[docs] def read_mesh_data( filename: Path | str, comm: MPI.Intracomm, time: str | float | None, read_from_partition: bool, backend_args: dict[str, Any] | None, ) -> ReadMeshData: """Read mesh data from file. Args: filename: Path to file to read from comm: MPI communicator used in storage time: Time stamp associated with the mesh to read read_from_partition: Whether to read partition information backend_args: Arguments to backend Returns: Internal data structure for the mesh data read from file """ backend_args = get_default_backend_args(backend_args) check_file_exists(filename) if read_from_partition: raise RuntimeError("Cannot read partition data with VTKHDF") with h5pyfile(filename, "r", comm=comm) as h5file: hdf = _get_vtk_group(h5file, backend_args["name"]) if time is None: num_cells_global = hdf["Types"].size local_cell_range = compute_local_range(comm, num_cells_global) cell_types_local = hdf["Types"][slice(*local_cell_range)] num_points_global = hdf["NumberOfPoints"][0] local_point_range = compute_local_range(comm, num_points_global) points_local = hdf["Points"][slice(*local_point_range)] # Connectivity read offsets = hdf["Offsets"] local_connectivity_offset = offsets[local_cell_range[0] : local_cell_range[1] + 1] topology = hdf["Connectivity"][ local_connectivity_offset[0] : local_connectivity_offset[-1] ] offset = local_connectivity_offset - local_connectivity_offset[0] else: time_index = _get_time_index(hdf, time, filename) stamps = hdf["Steps"]["Values"][:] # Get number of points point_node = hdf["Points"] step_start = hdf["Steps"]["PointOffsets"][time_index] # NOTE: currently, it doesn't seem like we follow: # https://docs.vtk.org/en/latest/vtk_file_formats/vtkhdf_file_format/vtkhdf_specifications.html#temporal-unstructuredgrid-and-polydata # As only one num_points is stored irregardless of time steps added. if hdf["NumberOfPoints"].shape[0] != len(stamps): num_pts = hdf["NumberOfPoints"][0] else: num_pts = hdf["NumberOfPoints"][time_index] lr = compute_local_range(comm, num_pts) points_local = point_node[step_start + lr[0] : step_start + lr[1]] # Get cell-types in step cell_start = hdf["Steps"]["CellOffsets"][time_index] if hdf["NumberOfCells"].shape[0] != len(stamps): num_cells = hdf["NumberOfCells"][0] else: num_cells = hdf["NumberOfCells"][time_index] local_cell_range = compute_local_range(comm, num_cells) cell_types_local = hdf["Types"][ cell_start + local_cell_range[0] : cell_start + local_cell_range[1] ] # Get connectivity in step connectivity_start = hdf["Steps"]["ConnectivityIdOffsets"][time_index] # Connectivity read offsets = hdf["Offsets"] local_connectivity_offset = offsets[ connectivity_start + local_cell_range[0] : connectivity_start + local_cell_range[1] + 1 ] topology = hdf["Connectivity"][ local_connectivity_offset[0] : local_connectivity_offset[-1] ] offset = local_connectivity_offset - local_connectivity_offset[0] # NOTE: Currently we limit ourselfs to a single celltype, as it makes life easier, # other things have to change in `MeshReadData` to support this. num_nodes_per_cell = offset[1:] - offset[:-1] unique_cells = find_all_unique_cell_types(MPI.COMM_WORLD, cell_types_local, num_nodes_per_cell) if unique_cells.shape[0] > 1: raise NotImplementedError("io4dolfinx does not support mixed celltype grids") topology = topology.reshape(-1, num_nodes_per_cell[0]) cell_type, number_of_nodes = unique_cells[0] gtype = backend_args.get("dtype", points_local.dtype) if cell_type in _first_order_vtk.keys(): ct = _first_order_vtk[cell_type] degree = 1 elif cell_type in _arbitrary_lagrange_vtk.keys(): ct = _arbitrary_lagrange_vtk[cell_type] degree = _cell_degree(ct, num_nodes=number_of_nodes) else: raise ValueError(f"Unknown VTK cell type {cell_type} in {filename}") perm = dolfinx.cpp.io.perm_vtk(dolfinx.mesh.to_type(ct), number_of_nodes) topology = topology[:, perm] lvar = int(basix.LagrangeVariant.equispaced) return ReadMeshData( cells=topology, cell_type=ct, x=points_local.astype(gtype), lvar=lvar, degree=degree )
[docs] def read_point_data( filename: Path | str, name: str, comm: MPI.Intracomm, time: float | str | None, backend_args: dict[str, Any] | None, ) -> tuple[np.ndarray, int]: """Read data from the nodes of a mesh. Args: filename: Path to file name: Name of point data comm: Communicator to launch IO on. time: The time stamp backend_args: The backend arguments Returns: Data local to process (contiguous, no mpi comm) and local start range """ backend_args = get_default_backend_args(backend_args) check_file_exists(filename) with h5pyfile(filename, "r", comm=comm) as h5file: hdf = _get_vtk_group(h5file, backend_args["name"]) point_data = hdf["PointData"] assert point_data is not None if name not in point_data.keys(): raise ValueError(f"No point data named {name} in {filename}.") func_node = point_data[name] if time is None: data_shape = func_node.shape[0] lr = compute_local_range(comm, data_shape) data = func_node[slice(*lr)] return data, lr[0] else: time_index = _get_time_index(hdf, time, filename) stamps = hdf["Steps"]["Values"][:] step_start = hdf["Steps"]["PointDataOffsets"][name][time_index] # NOTE: currently, it doesn't seem like we follow: # https://docs.vtk.org/en/latest/vtk_file_formats/vtkhdf_file_format/vtkhdf_specifications.html#temporal-unstructuredgrid-and-polydata # As only one num_points is stored irregardless of time steps added. if hdf["NumberOfPoints"].shape[0] != len(stamps): num_pts = hdf["NumberOfPoints"][0] else: num_pts = hdf["NumberOfPoints"][time_index] lr = compute_local_range(comm, num_pts) return func_node[step_start + lr[0] : step_start + lr[1]], lr[0]
def read_cell_data( filename: Path | str, name: str, comm: MPI.Intracomm, time: str | float | None, backend_args: dict[str, Any] | None, ) -> tuple[npt.NDArray[np.int64], np.ndarray]: backend_args = get_default_backend_args(backend_args) check_file_exists(filename) with h5pyfile(filename, "r", comm=comm) as h5file: hdf = _get_vtk_group(h5file, backend_args["name"]) if "CellData" not in hdf.keys(): raise RuntimeError(f"No cell data found in {filename}.") cell_data = hdf["CellData"] assert cell_data is not None if name not in cell_data.keys(): raise ValueError(f"No cell data with name {name} in {filename}") cell_data_node = cell_data[name] assert cell_data_node is not None if time is None: cell_data_shape = cell_data_node.shape num_cells_global = hdf["Types"].size assert num_cells_global == cell_data_shape[0] local_cell_range = compute_local_range(comm, num_cells_global) data = cell_data_node[slice(*local_cell_range)] else: time_index = _get_time_index(hdf, time, filename) stamps = hdf["Steps"]["Values"][:] cd_start = hdf["Steps"]["CellDataOffsets"][name][time_index] # NOTE: currently, it doesn't seem like we follow: # https://docs.vtk.org/en/latest/vtk_file_formats/vtkhdf_file_format/vtkhdf_specifications.html#temporal-unstructuredgrid-and-polydata # As only one num_points is stored irregardless of time steps added. if hdf["NumberOfCells"].shape[0] != len(stamps): number_of_cells = hdf["NumberOfCells"][0] else: number_of_cells = hdf["NumberOfCells"][time_index] lr = compute_local_range(comm, number_of_cells) data = cell_data_node[cd_start + lr[0] : cd_start + lr[1]] # NOTE: THis could be optimized by hand-coding some communication in # `read_cell_data` on the frontend side md = read_mesh_data( filename, comm, time=time, read_from_partition=False, backend_args=backend_args ) if len(data.shape) == 1: data = data.reshape(-1, 1) return md.cells, data def write_attributes( filename: Path | str, comm: MPI.Intracomm, name: str, attributes: dict[str, np.ndarray], backend_args: dict[str, Any] | None, ): """Write attributes to file. Args: filename: Path to file to write to comm: MPI communicator used in storage name: Name of the attribute group attributes: Dictionary of attributes to write backend_args: Arguments to backend """ raise NotImplementedError("The Pyvista backend cannot write attributes.") def read_attributes( filename: Path | str, comm: MPI.Intracomm, name: str, backend_args: dict[str, Any] | None, ) -> dict[str, Any]: """Read attributes from file. Args: filename: Path to file to read from comm: MPI communicator used in storage name: Name of the attribute group backend_args: Arguments to backend Returns: Dictionary of attributes read from file """ raise NotImplementedError("The Pyvista backend cannot read attributes.") def read_timestamps( filename: Path | str, comm: MPI.Intracomm, function_name: str, backend_args: dict[str, Any] | None, ) -> npt.NDArray[np.float64 | str]: # type: ignore[type-var] """Read timestamps from file. Args: filename: Path to file to read from comm: MPI communicator used in storage function_name: Name of the function to read timestamps for backend_args: Arguments to backend Returns: Numpy array of timestamps read from file """ backend_args = get_default_backend_args(backend_args) check_file_exists(filename) # Temporal data storage as described in # https://docs.vtk.org/en/latest/vtk_file_formats/vtkhdf_file_format/vtkhdf_specifications.html#temporal-data with h5pyfile(filename, "r", comm=comm) as h5file: hdf = h5file["VTKHDF"] if "Steps" in hdf.keys(): timestamps = hdf["Steps"]["Values"][:] # For either point data or cell data, check if function exists, # and check if offsets in time are changing between steps. if "CellData" in hdf.keys() and function_name in hdf["CellData"].keys(): offsets = hdf["Steps"]["CellDataOffsets"][function_name] step_offsets = offsets[:] step_diff = np.flatnonzero(step_offsets[1:] - step_offsets[:-1]) return timestamps[step_diff] elif "PointData" in hdf.keys() and function_name in hdf["PointData"].keys(): offsets = hdf["Steps"]["PointDataOffsets"][function_name] step_offsets = offsets[:] step_diff = np.flatnonzero(step_offsets[1:] - step_offsets[:-1]) # This only finds when the offset changes, does not capture first step return np.hstack([[timestamps[0]], timestamps[step_diff]]) else: raise RuntimeError(f"Function {function_name} is not assoicated with a time-stamp.") else: raise RuntimeError(f"{filename} does not contain time-stepping information.")
[docs] def read_function_names( filename: Path | str, comm: MPI.Intracomm, backend_args: dict[str, Any] | None ) -> list[str]: """Read all function names from a file. Args: filename: Path to file comm: MPI communicator to launch IO on. backend_args: Arguments to backend Returns: A list of function names. """ backend_args = get_default_backend_args(backend_args) check_file_exists(filename) with h5pyfile(filename, "r", comm=comm) as h5file: hdf = h5file["VTKHDF"] function_names = set() if "CellData" in hdf.keys(): for item in hdf["CellData"].keys(): function_names.add(item) if "PointData" in hdf.keys(): for item in hdf["PointData"].keys(): function_names.add(item) return list(function_names)
def _create_dataset( root: h5py.File | h5py.Group, name: str, shape: tuple[int, ...], dtype: npt.DTypeLike, chunks: bool, maxshape: tuple[int | None, ...], mode: str, resize: bool = True, ) -> h5py.Dataset: if name not in root.keys(): mode = "w" if mode == "w": dataset = root.create_dataset( name, shape=shape, dtype=dtype, chunks=chunks, maxshape=maxshape ) elif mode == "a": dataset = root[name] old_shape = dataset.shape # Only resize for dimension if resize: if len(old_shape) == 1: new_shape = (old_shape[0] + shape[0],) else: new_shape = (old_shape[0] + shape[0], *old_shape[1:]) dataset.resize(new_shape) else: raise ValueError(f"Unknown file mode '{mode}' when creating dataset {name} in {root}") return dataset def _create_group(root: h5py.File | h5py.Group, name: str, mode: str) -> h5py.Group: if name not in root.keys(): mode = "w" if mode == "w": # Track order has to be on to make multiblock work: # https://docs.vtk.org/en/latest/vtk_file_formats/vtkhdf_file_format/vtkhdf_specifications.html#partitioneddatasetcollection-and-multiblockdataset group = root.create_group(name, track_order=True) elif mode == "a": group = root[name] else: raise ValueError("Unknown file mode '{h5_mode}'") return group def _compute_append_slice( dataset: h5py.Dataset, input_size: int, original_slice: tuple[int, int] | np.ndarray, mode: str ) -> slice: append_offset = dataset.shape[0] - input_size if mode == "a" else 0 return slice(*(np.asarray(original_slice) + append_offset).astype(np.int64)) def write_mesh( filename: Path | str, comm: MPI.Intracomm, mesh: MeshData, backend_args: dict[str, Any] | None, mode: FileMode, time: float, ): """ Write a mesh to file. Args: comm: MPI communicator used in storage mesh: Internal data structure for the mesh data to save to file filename: Path to file to write to backend_args: Arguments to backend mode: File-mode to store the mesh time: Time stamp associated with the mesh """ h5_mode = convert_file_mode(mode) backend_args = get_default_backend_args(backend_args) name = backend_args["name"] with h5pyfile(filename, h5_mode, comm=comm) as h5file: hdf = _create_group(h5file, "/VTKHDF", h5_mode) hdf.attrs.create("Type", "MultiBlockDataSet") hdf.attrs["Version"] = _vtk_hdf_version mesh_group = _create_group(hdf, name, h5_mode) mesh_group.attrs.create("Type", "UnstructuredGrid") mesh_group.attrs["Version"] = _vtk_hdf_version assembly = _create_group(hdf, "Assembly", h5_mode) mesh_assembly = _create_group(assembly, name, h5_mode) if name not in mesh_assembly.keys(): mesh_assembly[name] = h5py.SoftLink(f"/VTKHDF/{name}") # Write time dependent points number_of_points = _create_dataset( mesh_group, "NumberOfPoints", shape=(1,), dtype=np.int64, chunks=True, maxshape=(None,), mode=h5_mode, ) number_of_points[-1] = mesh.num_nodes_global # Store nodes points = _create_dataset( mesh_group, "Points", shape=(mesh.num_nodes_global, 3), dtype=mesh.local_geometry.dtype, chunks=True, maxshape=(None, 3), mode=h5_mode, ) insert_slice = _compute_append_slice( points, mesh.num_nodes_global, mesh.local_geometry_pos, h5_mode ) points[insert_slice, : mesh.local_geometry.shape[1]] = mesh.local_geometry # NOTE: VTKHDF currently does not support reading time dependent topology in # Paraview: https://gitlab.kitware.com/vtk/vtk/-/issues/19257 # Therefore the following data is not resized time_indep_datasets = {} for key in ["NumberOfCells", "NumberOfConnectivityIds"]: time_indep_datasets[key] = _create_dataset( mesh_group, key, shape=(1,), dtype=np.int64, chunks=True, maxshape=(None,), mode=h5_mode, resize=False, # Resize should really be True ) num_dofs_per_cell = mesh.local_topology.shape[1] time_indep_datasets["NumberOfCells"][-1] = mesh.num_cells_global time_indep_datasets["NumberOfConnectivityIds"][-1] = ( mesh.num_cells_global * num_dofs_per_cell ) # NOTE: The following offsets are currently overwriting the existing topology data. # This is due to the VTKHDF bug. When we switch resize=True this will automatically work # for time dependent topologies # Store topology offsets (single celltype assumption) offsets = _create_dataset( mesh_group, "Offsets", shape=(mesh.num_cells_global + 1,), dtype=np.int64, chunks=True, mode=h5_mode, maxshape=(None,), resize=False, # Resize should really be True ) offset_data = np.arange(0, mesh.local_topology.size + 1, mesh.local_topology.shape[1]) offset_data += num_dofs_per_cell * mesh.local_topology_pos[0] insert_slice = _compute_append_slice( offsets, mesh.num_cells_global + 1, (mesh.local_topology_pos[0], mesh.local_topology_pos[1] + 1), mode=h5_mode, ) offsets[insert_slice] = offset_data del offset_data # Permute and store topology data dx_ct = dolfinx.mesh.to_type(mesh.cell_type) top_perm = np.argsort(dolfinx.cpp.io.perm_vtk(dx_ct, num_dofs_per_cell)) topology_data = mesh.local_topology[:, top_perm].flatten() topology = _create_dataset( mesh_group, "Connectivity", shape=(mesh.num_cells_global * num_dofs_per_cell,), dtype=np.int64, chunks=True, maxshape=(None,), mode=h5_mode, resize=False, # Resize should really be True, see issue below ) # VTKHDFReader issue: https://gitlab.kitware.com/vtk/vtk/-/issues/19257 insert_slice = _compute_append_slice( topology, mesh.num_cells_global * num_dofs_per_cell, np.array(mesh.local_topology_pos) * num_dofs_per_cell, mode=h5_mode, ) topology[insert_slice] = topology_data del topology_data # Store celltypes cell_types = np.full( mesh.local_topology.shape[0], dolfinx.cpp.io.get_vtk_cell_type(dx_ct, dolfinx.mesh.cell_dim(dx_ct)), ) types = _create_dataset( mesh_group, "Types", shape=(mesh.num_cells_global,), dtype=np.uint8, maxshape=(None,), chunks=True, mode=h5_mode, resize=False, # Resize should really be True, see issue below ) # VTKHDFReader issue: https://gitlab.kitware.com/vtk/vtk/-/issues/19257 insert_slice = _compute_append_slice( types, mesh.num_cells_global, mesh.local_topology_pos, h5_mode ) types[insert_slice] = cell_types del cell_types steps = _create_group(mesh_group, "Steps", mode=h5_mode) # First fetch time-steps to see if we have stored this timestep already values = _create_dataset( steps, "Values", shape=(1,), dtype=np.float64, chunks=True, maxshape=(None,), mode=h5_mode, resize=h5_mode == "a", ) if h5_mode == "w": values[0] = time else: existing_steps = values[:-1] if len(np.flatnonzero(np.isclose(existing_steps, time))) > 0: raise RuntimeError(f"Mesh already exists at time {time} in {filename}.") values[-1] = time steps.attrs.create("NSteps", np.int64(len(values)), dtype=np.int64) # Write offset data for current time-step all_parts = {} for key in [ "NumberOfParts", "PartOffsets", "PointOffsets", "CellOffsets", "ConnectivityIdOffsets", ]: all_parts[key] = _create_dataset( steps, key, shape=(1,), dtype=np.int64, chunks=True, maxshape=(None,), mode=h5_mode, ) all_parts["NumberOfParts"][-1] = 1 all_parts["PartOffsets"][-1] = 0 all_parts["PointOffsets"][-1] = points.shape[0] - mesh.num_nodes_global all_parts["CellOffsets"][-1] = types.shape[0] - mesh.num_cells_global all_parts["ConnectivityIdOffsets"][-1] = ( topology.shape[0] - mesh.num_cells_global * num_dofs_per_cell ) # Update cell-data and point-data offsets by copying over data from previous step for key in ["CellDataOffsets", "PointDataOffsets"]: group = _create_group(steps, key, mode=h5_mode) for cd in group.keys(): group[cd].resize(steps.attrs["NSteps"], axis=0) group[cd][-1] = group[cd][-2] # Update Steps in all other parts of the mesh as well for key in mesh_assembly.keys(): if key == name: continue # Copy time-dependent geometry info (NumberOfPoints) from mesh to tag sub_group = mesh_assembly[key] sub_step = sub_group["Steps"] sub_step.attrs["NSteps"] = steps.attrs["NSteps"] # Copy time dependent and partition info from mesh to tag step_copy_keys = ["Values", "PartOffsets", "NumberOfParts"] for key in step_copy_keys: if key in sub_step.keys(): sub_step[key].resize(steps[key].shape) sub_step[key][:] = steps[key][:] else: raise RuntimeError(f"{sub_step.name} should have {key}/") # Append value from previous step for meshtags as they are time-independent append_keys = ["CellOffsets", "ConnectivityIdOffsets"] for key in append_keys: if key in sub_step.keys(): sub_step[key].resize(sub_step[key].size + 1, axis=0) sub_step[key][-1] = sub_step[key][-2] else: raise RuntimeError(f"{sub_step.name} should have {key}/") # Append value from previous step for meshtags celldata for key in ["CellDataOffsets", "PointDataOffsets"]: group = _create_group(sub_step, key, mode=h5_mode) for cd in group.keys(): group[cd].resize(sub_step.attrs["NSteps"], axis=0) group[cd][-1] = group[cd][-2] def write_meshtags( filename: str | Path, comm: MPI.Intracomm, data: MeshTagsData, backend_args: dict[str, Any] | None, ): """Write mesh tags to file. Args: filename: Path to file to write to comm: MPI communicator used in storage data: Internal data structure for the mesh tags to save to file backend_args: Arguments to backend """ backend_args = get_default_backend_args(backend_args) name = backend_args["name"] h5_mode = "a" with h5pyfile(filename, h5_mode, comm=comm) as h5file: hdf = h5file["VTKHDF"] file_type = _decode_bytes_if_needed(hdf.attrs["Type"]) # Check for type of VTKHDF file if file_type != "MultiBlockDataSet": raise ValueError(f"Cannot write meshtags to {filename} with VTK type {file_type}") # We place the mesh-tags in the same subgroup of assembly as the mesh parent_mesh_group = _get_vtk_group(h5file, name) block = parent_mesh_group.parent tag_path = f"/VTKHDF/{name}_{data.name}" if data.name not in block.keys(): block[data.name] = h5py.SoftLink(tag_path) mesh_group = _create_group(hdf, tag_path, mode=h5_mode) mesh_group.attrs.create("Type", "UnstructuredGrid") mesh_group.attrs["Version"] = _vtk_hdf_version cell_data = _create_group(mesh_group, "CellData", mode=h5_mode) assert data.num_entities_global is not None dataset = _create_dataset( cell_data, data.name, shape=(data.num_entities_global,), dtype=data.values.dtype, chunks=True, maxshape=(None,), mode=h5_mode, ) assert data.local_start is not None insert_slice = _compute_append_slice( dataset, data.num_entities_global, np.array([data.local_start, data.local_start + data.indices.shape[0]]), mode=h5_mode, ) dataset[insert_slice] = data.values # NOTE: The following is more or less a copy from write_mesh, # except that we pull out the point storage and use a softlink num_cells = _create_dataset( mesh_group, "NumberOfCells", shape=(1,), dtype=np.int64, chunks=True, maxshape=(None,), mode=h5_mode, resize=False, # Resize should really be True, see issue below ) # VTKHDFReader issue: https://gitlab.kitware.com/vtk/vtk/-/issues/19257 num_cells[-1] = data.num_entities_global # Hardlink data should also follow hardlink for numbering for key in ["Points", "NumberOfPoints"]: if key not in mesh_group.keys(): mesh_group[key] = parent_mesh_group[key] # Single celltype assumption num_dofs_per_cell = data.num_dofs_per_entity number_of_connectivities = _create_dataset( mesh_group, "NumberOfConnectivityIds", shape=(1,), dtype=np.int64, chunks=True, maxshape=(None,), mode=h5_mode, resize=False, # Resize should really be True, see issue below ) # VTKHDFReader issue: https://gitlab.kitware.com/vtk/vtk/-/issues/19257 assert data.num_entities_global is not None and num_dofs_per_cell is not None number_of_connectivities[-1] = data.num_entities_global * num_dofs_per_cell # Store topology offsets (single celltype assumption) offsets = _create_dataset( mesh_group, "Offsets", shape=(data.num_entities_global + 1,), dtype=np.int64, chunks=True, mode=h5_mode, maxshape=(None,), resize=False, # Resize should really be True, see issue below ) # VTKHDFReader issue: https://gitlab.kitware.com/vtk/vtk/-/issues/19257 offset_data = np.arange(0, data.indices.size + 1, data.indices.shape[1]) assert data.local_start is not None offset_data += num_dofs_per_cell * data.local_start insert_slice = _compute_append_slice( offsets, data.num_entities_global + 1, (data.local_start, data.local_start + data.indices.shape[0] + 1), mode=h5_mode, ) offsets[insert_slice] = offset_data del offset_data # Permute and store topology data dx_ct = dolfinx.mesh.to_type(data.cell_type) top_perm = np.argsort(dolfinx.cpp.io.perm_vtk(dx_ct, num_dofs_per_cell)) topology_data = data.indices[:, top_perm].flatten() topology = _create_dataset( mesh_group, "Connectivity", shape=(data.num_entities_global * num_dofs_per_cell,), dtype=np.int64, chunks=True, maxshape=(None,), mode=h5_mode, resize=False, # Resize should really be True, see issue below ) # VTKHDFReader issue: https://gitlab.kitware.com/vtk/vtk/-/issues/19257 insert_slice = _compute_append_slice( topology, data.num_entities_global * num_dofs_per_cell, np.array([data.local_start, data.local_start + data.indices.shape[0]]) * num_dofs_per_cell, mode=h5_mode, ) topology[insert_slice] = topology_data del topology_data # Store celltypes cell_types = np.full( data.indices.shape[0], dolfinx.cpp.io.get_vtk_cell_type(dx_ct, dolfinx.mesh.cell_dim(dx_ct)), ) types = _create_dataset( mesh_group, "Types", shape=(data.num_entities_global,), dtype=np.uint8, maxshape=(None,), chunks=True, mode=h5_mode, resize=False, # Resize should really be True, see issue below ) # VTKHDFReader issue: https://gitlab.kitware.com/vtk/vtk/-/issues/19257 insert_slice = _compute_append_slice( types, data.num_entities_global, (data.local_start, data.local_start + data.indices.shape[0]), h5_mode, ) types[insert_slice] = cell_types del cell_types steps = _create_group(mesh_group, "Steps", mode=h5_mode) # Copy n-step counter steps.attrs.create("NSteps", parent_mesh_group["Steps"].attrs["NSteps"]) # Hardlink data that we know is the same across meshes hardlink_keys = ["NumberOfParts", "PartOffsets", "Values", "PointOffsets"] for key in hardlink_keys: steps[key] = parent_mesh_group["Steps"][key] # Write offset data for current time-step all_parts = {} for key in ["CellOffsets", "ConnectivityIdOffsets"]: all_parts[key] = _create_dataset( steps, key, shape=(1,), dtype=np.int64, chunks=True, maxshape=(None,), mode=h5_mode, ) all_parts["CellOffsets"][-1] = types.shape[0] - data.num_entities_global all_parts["ConnectivityIdOffsets"][-1] = offsets.shape[0] - (data.num_entities_global + 1) # CellData requires an offset cd_off = _create_group(steps, "CellDataOffsets", mode=h5_mode) cd_data = _create_dataset( cd_off, data.name, shape=parent_mesh_group["Steps"]["CellOffsets"].shape, dtype=np.int64, chunks=True, maxshape=(None,), mode=h5_mode, ) cd_data[-1] = types.shape[0] - data.num_entities_global def read_meshtags_data( filename: str | Path, comm: MPI.Intracomm, name: str, backend_args: dict[str, Any] | None, ) -> MeshTagsData: """Read mesh tags from file. Args: filename: Path to file to read from comm: MPI communicator used in storage name: Name of the mesh tags to read backend_args: Arguments to backend Returns: Internal data structure for the mesh tags read from file """ backend_args = get_default_backend_args(backend_args) backend_args.update({"name": name}) # Reuse reading cell-data indices, values = read_cell_data(filename, name, comm, None, backend_args=backend_args) # Read cell-type of grid to get topological dimension with h5pyfile(filename, "r", comm=comm) as h5file: hdf = _get_vtk_group(h5file, backend_args["name"]) num_cells_global = hdf["Types"].size local_cell_range = compute_local_range(comm, num_cells_global) cell_types_local = hdf["Types"][slice(*local_cell_range)] unique_cells = find_all_unique_cell_types(comm, cell_types_local, indices.shape[1]) if unique_cells.shape[0] > 1: raise NotImplementedError("io4dolfinx does not support mixed celltype grids") vtk_cell_type = unique_cells[0][0] if vtk_cell_type in _first_order_vtk.keys(): ct = _first_order_vtk[vtk_cell_type] elif vtk_cell_type in _arbitrary_lagrange_vtk.keys(): ct = _arbitrary_lagrange_vtk[vtk_cell_type] dim = dolfinx.mesh.cell_dim(dolfinx.mesh.to_type(ct)) return MeshTagsData(name=name, values=values.flatten(), indices=indices, dim=dim) def read_dofmap( filename: str | Path, comm: MPI.Intracomm, name: str, backend_args: dict[str, Any] | None, ) -> dolfinx.graph.AdjacencyList: """Read the dofmap of a function with a given name. Args: filename: Path to file to read from comm: MPI communicator used in storage name: Name of the function to read the dofmap for backend_args: Arguments to backend Returns: Dofmap as an AdjacencyList """ raise NotImplementedError("The Pyvista backend cannot make checkpoints.") def read_dofs( filename: str | Path, comm: MPI.Intracomm, name: str, time: float, backend_args: dict[str, Any] | None, ) -> tuple[npt.NDArray[np.float32 | np.float64 | np.complex64 | np.complex128], int]: """Read the dofs (values) of a function with a given name from a given timestep. Args: filename: Path to file to read from comm: MPI communicator used in storage name: Name of the function to read the dofs for time: Time stamp associated with the function to read backend_args: Arguments to backend Returns: Contiguous sequence of degrees of freedom (with respect to input data) and the global starting point on the process. Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc. """ raise NotImplementedError("The Pyvista backend cannot make checkpoints.") def read_cell_perms( comm: MPI.Intracomm, filename: Path | str, backend_args: dict[str, Any] | None ) -> npt.NDArray[np.uint32]: """ Read cell permutation from file with given communicator, Split in continuous chunks based on number of cells in the input data. Args: comm: MPI communicator used in storage filename: Path to file to read from backend_args: Arguments to backend Returns: Contiguous sequence of permutations (with respect to input data) Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc. """ raise NotImplementedError("The VTKHDF backend cannot make checkpoints.") def write_function( filename: Path, comm: MPI.Intracomm, u: FunctionData, time: float, mode: FileMode, backend_args: dict[str, Any] | None, ): """ Write a function to file. Args: comm: MPI communicator used in storage u: Internal data structure for the function data to save to file filename: Path to file to write to time: Time stamp associated with function mode: File-mode to store the function backend_args: Arguments to backend """ raise NotImplementedError("The VTKHDF backend cannot make checkpoints.") def read_legacy_mesh( filename: Path | str, comm: MPI.Intracomm, group: str ) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.floating], str | None]: """Read in the mesh topology, geometry and (optionally) cell type from a legacy DOLFIN HDF5-file. Args: filename: Path to file to read from comm: MPI communicator used in storage group: Group in HDF5 file where mesh is stored Returns: Tuple containing: - Topology as a (num_cells, num_vertices_per_cell) array of global vertex indices - Geometry as a (num_vertices, geometric_dimension) array of vertex coordinates - Cell type as a string (e.g. "tetrahedron") or None if not found """ raise NotImplementedError("The VTKHDF backend cannot read legacy DOLFIN meshes.") def snapshot_checkpoint( filename: Path | str, mode: FileMode, u: dolfinx.fem.Function, backend_args: dict[str, Any] | None, ): """Create a snapshot checkpoint of a dolfinx function. Args: filename: Path to file to read from mode: File-mode to store the function u: dolfinx function to create a snapshot checkpoint for backend_args: Arguments to backend """ raise NotImplementedError("The VTKHDF backend cannot make checkpoints.") def read_hdf5_array( comm: MPI.Intracomm, filename: Path | str, group: str, backend_args: dict[str, Any] | None, ) -> tuple[np.ndarray, int]: """Read an array from an HDF5 file. Args: comm: MPI communicator used in storage filename: Path to file to read from group: Group in HDF5 file where array is stored backend_args: Arguments to backend Returns: Tuple containing: - Numpy array read from file - Global starting point on the process. Process 0 has [0, M), process 1 [M, N), process 2 [N, O) etc. """ raise NotImplementedError("The VTKHDF backend cannot read HDF5 arrays")
[docs] def write_data( filename: Path | str, array_data: ArrayData, comm: MPI.Intracomm, time: str | float | None, mode: FileMode, backend_args: dict[str, Any] | None, ): """Write function to file by interpolating into geometry nodes. Args: filename: Path to file array_data: Data to write to file time: Time stamp mode: Append or write backend_args: The backend arguments """ h5_mode = convert_file_mode(mode) assert h5_mode == "a" backend_args = get_default_backend_args(backend_args) mesh_name = backend_args["name"] extension = array_data.type with h5pyfile(filename, h5_mode, comm=comm) as h5file: hdf = h5file["VTKHDF"] file_type = _decode_bytes_if_needed(hdf.attrs["Type"]) # Check for type of VTKHDF file if file_type != "MultiBlockDataSet": raise ValueError(f"Cannot write meshtags to {filename} with VTK type {file_type}") # Find mesh block to add data to block = _get_vtk_group(h5file, mesh_name) data_group = _create_group(block, f"{extension}Data", mode=h5_mode) dataset = _create_dataset( data_group, array_data.name, shape=array_data.global_shape, dtype=array_data.values.dtype, chunks=True, maxshape=(None, array_data.values.shape[1]), mode=h5_mode, ) insert_slice = _compute_append_slice( dataset, array_data.global_shape[0], np.array( [ array_data.local_range[0], array_data.local_range[0] + array_data.values.shape[0], ] ), mode=h5_mode, ) dataset[insert_slice] = array_data.values steps = _create_group(block, "Steps", mode=h5_mode) pdo = _create_group(steps, f"{extension}DataOffsets", mode=h5_mode) # Check if time step is already in time-stepping of mesh timestamps = steps["Values"][:] assert isinstance(timestamps, np.ndarray) assert isinstance(time, float) time_exists = np.flatnonzero(np.isclose(timestamps, time)) if len(time_exists) > 1: raise ValueError("Time-step has been written multiple times") pdo_u = _create_dataset( pdo, array_data.name, shape=(1,), dtype=np.int64, chunks=True, maxshape=(None,), mode=h5_mode, resize=False, ) if len(time_exists) == 1: idx = time_exists[0] elif len(time_exists) == 0: # No mesh written at step, update mesh offsets # Geometry has to be time-dependent num_points = block["NumberOfPoints"] num_points.resize(len(timestamps) + 1, axis=0) num_points[-1] = num_points[-2] # Even if topology cannot be time dep at the moment, number of cells # must be updated num_cells = block["NumberOfCells"] num_cells.resize(len(timestamps) + 1, axis=0) num_cells[-1] = num_cells[-2] steps.attrs.create("NSteps", block["Steps"].attrs["NSteps"] + 1) step_vals = _create_dataset( steps, "Values", shape=(1,), dtype=np.float64, chunks=True, maxshape=(None,), mode=h5_mode, resize=True, ) step_vals[-1] = time idx = step_vals.size - 1 for key in [ "PartOffsets", "NumberOfParts", "PointOffsets", "CellOffsets", "ConnectivityIdOffsets", ]: comp = steps[key] comp.resize(steps.attrs["NSteps"], axis=0) if comp.size > 1: comp[-1] = comp[-2] else: raise ValueError(f"Time step found multiple times in {filename}") # Update offsets for current variable pdo_u.resize(steps.attrs["NSteps"], axis=0) pdo_u[idx] = dataset.shape[0] - array_data.global_shape[0] # Point and cell data of all other variables has to be updated to be synced for key in [ "CellDataOffsets", "PointDataOffsets", ]: comp = steps[key] num_steps = steps.attrs["NSteps"] for dname, dset in comp.items(): # Only resize other data arrays if dname != array_data.name: # Only resize if it hasn't already been resized if dset.size != num_steps: dset.resize(num_steps, axis=0) # Only update data if we are further than first time step if dset.size > 1: dset[-1] = dset[-2]