Source code for movement.mover

import abc
from warnings import warn

import firedrake
import firedrake.exceptions as fexc
import numpy as np
import ufl
from firedrake.cython.dmcommon import create_section
from firedrake.petsc import PETSc

from movement.tangling import MeshTanglingChecker

__all__ = ["PrimeMover"]


[docs] class PrimeMover(abc.ABC): """ Base class for all mesh movers. """ def __init__( self, mesh, monitor_function=None, raise_convergence_errors=True, tangling_check=True, quadrature_degree=None, ): r""" :arg mesh: the physical mesh :type mesh: :class:`firedrake.mesh.MeshGeometry` :arg monitor_function: a Python function which takes a mesh as input :type monitor_function: :class:`~.Callable` :kwarg raise_convergence_errors: convergence error handling behaviour: if `True` then :class:`~.ConvergenceError`\s are raised, else warnings are raised and the program is allowed to continue :type raise_convergence_errors: :class:`bool` :kwarg tangling_check: check whether the mesh has tangled elements (on by default) :type tangling_check: :class:`bool` :kwarg quadrature_degree: quadrature degree to be passed to Firedrakes measures :type quadrature_degree: :class:`int` """ self.mesh = firedrake.Mesh(mesh.coordinates.copy(deepcopy=True)) self.monitor_function = monitor_function if not raise_convergence_errors: warn( f"{type(self)}.move called with raise_convergence_errors=False." " Beware: this option can produce poor quality meshes!", stacklevel=1, ) self.raise_convergence_errors = raise_convergence_errors self.dim = self.mesh.topological_dimension() self.gdim = self.mesh.geometric_dimension() # DMPlex setup self.plex = self.mesh.topology_dm self.vertex_indices = self.plex.getDepthStratum(0) self.edge_indices = self.plex.getDepthStratum(1) entity_dofs = np.zeros(self.dim + 1, dtype=np.int32) entity_dofs[0] = self.gdim self._coordinate_section = create_section(self.mesh, entity_dofs)[0] dm_coords = self.plex.getCoordinateDM() dm_coords.setDefaultSection(self._coordinate_section) try: self._local_coordinates_vec = dm_coords.createLocalVec() self._update_plex_coordinates() except ValueError: warn("Cannot update DMPlex coordinates for periodic meshes.", stacklevel=1) self._local_coordinates_vec = None self.dx = firedrake.dx(domain=self.mesh, degree=quadrature_degree) self.ds = firedrake.ds(domain=self.mesh, degree=quadrature_degree) self.dS = firedrake.dS(domain=self.mesh, degree=quadrature_degree) self._create_function_spaces() self._create_functions() self._all_boundary_segments = self.mesh.exterior_facets.unique_markers # Utilities if tangling_check: self.tangling_checker = MeshTanglingChecker( self.mesh, raise_error=raise_convergence_errors ) def _create_function_spaces(self): self.coord_space = self.mesh.coordinates.function_space() self.P0 = firedrake.FunctionSpace(self.mesh, "DG", 0) def _create_functions(self): self.x = firedrake.Function(self.mesh.coordinates, name="Physical coordinates") self.xi = firedrake.Function( self.mesh.coordinates, name="Computational coordinates" ) self.v = firedrake.Function(self.coord_space, name="Mesh velocity") self.volume = firedrake.Function(self.P0, name="Mesh volume") self.volume.interpolate(ufl.CellVolume(self.mesh)) def _convergence_message(self, iterations=None): """ Report solver convergence. :kwarg iterations: number of iterations before reaching convergence :type iterations: :class:`int` """ msg = "Solver converged" if iterations: msg += f" in {iterations} iteration{plural(iterations)}" PETSc.Sys.Print(f"{msg}.") def _exception(self, msg, exception=None, error_type=fexc.ConvergenceError): """ Raise an error or warning as indicated by the :attr:`raise_convergence_error` option. :arg msg: message for the error/warning report :type msg: :class:`str` :kwarg exception: original exception that was triggered :type exception: :class:`~.Exception` object :kwarg error_type: error class to use :type error_type: :class:`~.Exception` class """ exc_type = error_type if self.raise_convergence_errors else Warning if exception: raise exc_type(msg) from exception else: raise exc_type(msg) def _convergence_error(self, iterations=None, exception=None): """ Raise an error or warning for a solver fail as indicated by the :attr:`raise_convergence_error` option. :kwarg iterations: number of iterations before failure :type iterations: :class:`int` :kwarg exception: original exception that was triggered :type exception: :class:`~.Exception` """ msg = "Solver failed to converge" if iterations: msg += f" in {iterations} iteration{plural(iterations)}" self._exception(f"{msg}.", exception=exception) def _divergence_error(self, iterations=None, exception=None): """ Raise an error or warning for a solver divergence as indicated by the :attr:`raise_convergence_error` option. :kwarg iterations: number of iterations before failure :type iterations: :class:`int` :kwarg exception: original exception that was triggered :type exception: :class:`~.Exception` """ msg = "Solver diverged" if iterations: msg += f" after {iterations} iteration{plural(iterations)}" self._exception(f"{msg}.", exception=exception) def _update_plex_coordinates(self): """ Update the underlying DMPlex coordinates with the coordinates of the Firedrake mesh. """ if self._local_coordinates_vec is None: raise ValueError("Cannot update DMPlex coordinates for periodic meshes.") self._local_coordinates_vec.array[:] = np.reshape( self.mesh.coordinates.dat.data_with_halos, self._local_coordinates_vec.array.shape, ) self.plex.setCoordinatesLocal(self._local_coordinates_vec) def _coordinate_offset(self, index): """ Map the index of a DMPlex coordinate to the coordinate index in Firedrake. :arg index: DMPlex coordinate index :type index: :class:`int` """ return self._coordinate_section.getOffset(index) // self.dim def _edge_offset(self, index): """ Map the index of a DMPlex edge to the edge index in Firedrake. :arg index: DMPlex edge index :type index: :class:`int` """ if not hasattr(self, "_edge_vector_section"): entity_dofs = np.zeros(self.dim + 1, dtype=np.int32) entity_dofs[1] = 1 self._edge_vector_section = create_section(self.mesh, entity_dofs)[0] return self._edge_vector_section.getOffset(index) @property def volume_ratio(self): """ :return: the ratio of the largest and smallest element volumes. :rtype: :class:`float` """ volume_array = self.volume.vector().gather() return volume_array.max() / volume_array.min() @property def coefficient_of_variation(self): """ :return: the coefficient of variation (σ/μ) of element volumes. :rtype: :class:`float` """ volume_array = self.volume.vector().gather() mean = volume_array.sum() / volume_array.size return np.sqrt(np.sum((volume_array - mean) ** 2) / volume_array.size) / mean
[docs] @abc.abstractmethod def move(self): """ Move the mesh according to the method of choice. """ pass # pragma: no cover
[docs] def to_physical_coordinates(self): r""" Switch coordinates to correspond to the physical mesh :class:`\mathcal{H}_P`. """ self.mesh.coordinates.assign(self.x)
[docs] def to_computational_coordinates(self): r""" Switch coordinates to correspond to the computational mesh :class:`\mathcal{H}_C`. """ self.mesh.coordinates.assign(self.xi)
[docs] def plural(iterations): """ :return: 's' if `iterations` should be referred to in the plural sense :rtype: :class:`str` """ return "s" if iterations != 1 else ""