Source code for toughio._mesh._helpers

from copy import deepcopy

import meshio
import numpy

from .._common import filetype_from_filename, register_format
from ._mesh import from_meshio

__all__ = [
    "read",
    "write",
    "read_time_series",
    "write_time_series",
]


_extension_to_filetype = {}
_reader_map = {}
_writer_map = {}
_materials = ["material", "gmsh:physical", "medit:ref"]


def register(file_format, extensions, reader, writer=None, material=None):
    """Register a new format."""
    register_format(
        fmt=file_format,
        ext_to_fmt=_extension_to_filetype,
        reader_map=_reader_map,
        writer_map=_writer_map,
        extensions=extensions,
        reader=reader,
        writer=writer,
    )
    if material:
        _materials.append(material)


def get_material_key(cell_data):
    """Get key of material data in cell_data."""
    for k in cell_data.keys():
        if k in _materials:
            return k
    return None


def read(filename, file_format=None, **kwargs):
    """
    Read unstructured mesh from file.

    Parameters
    ----------
    filename : str
        Input file name.
    file_format : str or None, optional, default None
        Input file format.

    Other Parameters
    ----------------
    label_length : int or None, optional, default None
        Only if ``file_format = "tough"``. Number of characters in cell labels.

    Returns
    -------
    toughio.Mesh
        Imported mesh.

    """
    # Check file format
    if not isinstance(filename, str):
        raise TypeError()
    fmt = (
        file_format
        if file_format
        else filetype_from_filename(filename, _extension_to_filetype)
    )

    # Call custom readers
    if fmt in _reader_map.keys():
        mesh = _reader_map[fmt](filename, **kwargs)
        if fmt not in {"tough", "pickle"}:
            mesh.cell_data = {
                k: numpy.concatenate(v) for k, v in mesh.cell_data.items()
            }
            key = get_material_key(mesh.cell_data)
            if key:
                mesh.cell_data["material"] = mesh.cell_data.pop(key)
    else:
        mesh = meshio.read(filename, file_format)
        mesh = from_meshio(mesh)
    return mesh


def write(filename, mesh, file_format=None, **kwargs):
    """
    Write unstructured mesh to file.

    Parameters
    ----------
    filename : str
        Output file name.
    mesh : toughio.Mesh
        Mesh to export.
    file_format : str or None, optional, default None
        Output file format.

    Other Parameters
    ----------------
    nodal_distance : str ('line' or 'orthogonal'), optional, default 'line'
        Only if ``file_format = "tough"``. Method to calculate connection nodal distances:
        - 'line': distance between node and common face along connecting line (distance is not normal),
        - 'orthogonal' : distance between node and its orthogonal projection onto common face (shortest distance).
    material_name : dict or None, default None
        Only if ``file_format = "tough"``. Rename cell material.
    material_end : str, array_like or None, default None
        Only if ``file_format = "tough"``. Move cells to bottom of block 'ELEME' if their materials is in `material_end`.
    incon : bool, optional, default False
        Only if ``file_format = "tough"``. If `True`, initial conditions will be written in file `INCON`.
    protocol : integer, optional, default `pickle.HIGHEST_PROTOCOL`
        Only if ``file_format = "pickle"``. :mod:`pickle` protocol version.

    """
    # Check file format
    if not isinstance(filename, str):
        raise TypeError()
    fmt = (
        file_format
        if file_format
        else filetype_from_filename(filename, _extension_to_filetype)
    )

    # Call custom writer
    if fmt in _writer_map.keys():
        if fmt not in {"tough", "pickle"}:
            mesh = deepcopy(mesh)
            mesh.cell_data = {k: mesh.split(v) for k, v in mesh.cell_data.items()}
        _writer_map[fmt](filename, mesh, **kwargs)
    else:
        mesh = mesh.to_meshio()
        meshio.write(filename, mesh, file_format=file_format, **kwargs)


[docs]def read_time_series(filename): """ Read time series from XDMF file. Parameters ---------- filename : str Input file name. Returns ------- list of namedtuple (type, data) Grid cell data. list of dict Data associated to grid points for each time step. list of dict Data associated to grid cells for each time step. array_like Time step values. """ from ._common import get_meshio_version, get_new_meshio_cells if not isinstance(filename, str): raise ValueError() point_data, cell_data, time_steps = [], [], [] if get_meshio_version() < (3,): reader = meshio.XdmfTimeSeriesReader(filename) points, cells = reader.read_points_cells() for k in range(reader.num_steps): t, pdata, cdata = reader.read_data(k) _, cdata = get_new_meshio_cells(cells, cdata) point_data.append(pdata) cell_data.append(cdata) time_steps.append(t) cells = get_new_meshio_cells(cells) else: with meshio.xdmf.TimeSeriesReader(filename) as reader: points, cells = reader.read_points_cells() for k in range(reader.num_steps): t, pdata, cdata = reader.read_data(k) point_data.append(pdata) cell_data.append(cdata) time_steps.append(t) # Concatenate cell data arrays for cdata in cell_data: for k in cdata.keys(): cdata[k] = numpy.concatenate(cdata[k]) return points, cells, point_data, cell_data, time_steps
[docs]def write_time_series( filename, points, cells, point_data=None, cell_data=None, time_steps=None, ): """ Write time series given points and cells data. Parameters ---------- filename : str Output file name. points : ndarray Grid points array. cells : list of namedtuple (type, data) Grid cell data. point_data : list of dict or None, optional, default None Data associated to grid points for each time step. cell_data : list of dict or None, optional, default None Data associated to grid cells for each time step. time_steps : array_like, optional, default None Time step values. """ from ._common import get_meshio_version, get_old_meshio_cells if not isinstance(filename, str): raise TypeError() if point_data is not None and not isinstance(point_data, (list, tuple)): raise TypeError() if cell_data is not None and not isinstance(cell_data, (list, tuple)): raise TypeError() if time_steps is not None and not isinstance( time_steps, (list, tuple, numpy.ndarray) ): raise TypeError() if not (point_data or cell_data): raise ValueError("Provide at least point_data or cell_data.") else: nt = len(point_data) if point_data else len(cell_data) if point_data and len(point_data) != nt: raise ValueError("Inconsistent number of point data.") if cell_data and len(cell_data) != nt: raise ValueError("Inconsistent number of cell data.") if time_steps is not None and len(time_steps) != nt: raise ValueError("Inconsistent number of time steps.") point_data = point_data if point_data else [{}] * nt cell_data = cell_data if cell_data else [{}] * nt time_steps = time_steps if time_steps is not None else list(range(nt)) # Split cell data arrays sizes = numpy.cumsum([len(c.data) for c in cells[:-1]]) cell_data = [ {k: numpy.split(v, sizes) for k, v in cdata.items()} for cdata in cell_data ] # Sort data with time steps idx = numpy.argsort(time_steps) point_data = [point_data[i] for i in idx] cell_data = [cell_data[i] for i in idx] time_steps = [time_steps[i] for i in idx] # Write XDMF def write_data(writer, points, cells, point_data, cell_data, time_steps): writer.write_points_cells(points, cells) for tstep, pdata, cdata in zip(time_steps, point_data, cell_data): writer.write_data(tstep, point_data=pdata, cell_data=cdata) if get_meshio_version() < (3,): writer = meshio.XdmfTimeSeriesWriter(filename) tmp = [get_old_meshio_cells(cells, cdata) for cdata in cell_data] cells = tmp[0][0] cell_data = [cell[1] for cell in tmp] write_data(writer, points, cells, point_data, cell_data, time_steps) else: with meshio.xdmf.TimeSeriesWriter(filename) as writer: write_data(writer, points, cells, point_data, cell_data, time_steps)