Source code for animate.interpolation

"""
Driver functions for mesh-to-mesh data transfer.
"""

import firedrake
import numpy as np
import ufl
from firedrake.functionspaceimpl import FiredrakeDualSpace, WithGeometry
from firedrake.petsc import PETSc
from firedrake.supermeshing import assemble_mixed_mass_matrix
from petsc4py import PETSc as petsc4py
from pyop2 import op2

from animate.quality import QualityMeasure
from animate.utility import (
    assemble_mass_matrix,
    cofunction2function,
    function2cofunction,
)

__all__ = ["transfer", "interpolate", "project", "clement_interpolant"]


[docs] @PETSc.Log.EventDecorator() def transfer(source, target_space, transfer_method="project", **kwargs): r""" Overload functions :func:`firedrake.__future__.interpolate` and :func:`firedrake.projection.project` to account for the case of two mixed function spaces defined on different meshes and for the adjoint interpolation operator when applied to :class:`firedrake.cofunction.Cofunction`\s. Note that the "interpolate" option works straightforwardly with MPI parallelism, whereas the "project" option can be difficult to set up to make use of this. :arg source: the function to be transferred :type source: :class:`firedrake.function.Function` or :class:`firedrake.cofunction.Cofunction` :arg target_space: the function space which we seek to transfer onto, or the function or cofunction to use as the target :type target_space: :class:`firedrake.functionspaceimpl.FunctionSpace`, :class:`firedrake.function.Function` or :class:`firedrake.cofunction.Cofunction` :kwarg transfer_method: the method to use for the transfer. Options are "interpolate" (default) and "project" :type transfer_method: str :returns: the transferred function :rtype: :class:`firedrake.function.Function` or :class:`firedrake.cofunction.Cofunction` Extra keyword arguments are passed to :func:`firedrake.__future__.interpolate` or :func:`firedrake.projection.project`. """ if transfer_method not in ("interpolate", "project"): raise ValueError( f"Invalid transfer method: {transfer_method}." " Options are 'interpolate' and 'project'." ) if not isinstance(source, (firedrake.Function, firedrake.Cofunction)): raise NotImplementedError( f"Can only currently {transfer_method} Functions and Cofunctions." ) if isinstance(target_space, WithGeometry): target = firedrake.Function(target_space) elif isinstance(target_space, (firedrake.Cofunction, firedrake.Function)): target = target_space else: raise TypeError( "Second argument must be a FunctionSpace, Function, or Cofunction." ) if isinstance(source, firedrake.Cofunction): return _transfer_adjoint(source, target, transfer_method, **kwargs) elif source.function_space() == target.function_space(): return target.assign(source) else: return _transfer_forward(source, target, transfer_method, **kwargs)
[docs] @PETSc.Log.EventDecorator() def interpolate(source, target_space, **kwargs): r""" Overload function :func:`firedrake.__future__.interpolate` to account for the case of two mixed function spaces defined on different meshes and for the adjoint interpolation operator when applied to :class:`firedrake.cofunction.Cofunction`\s. :arg source: the function to be transferred :type source: :class:`firedrake.function.Function` or :class:`firedrake.cofunction.Cofunction` :arg target_space: the function space which we seek to transfer onto, or the function or cofunction to use as the target :type target_space: :class:`firedrake.functionspaceimpl.FunctionSpace`, :class:`firedrake.function.Function` or :class:`firedrake.cofunction.Cofunction` :returns: the transferred function :rtype: :class:`firedrake.function.Function` or :class:`firedrake.cofunction.Cofunction` Extra keyword arguments are passed to :func:`firedrake.__future__.interpolate` """ return transfer(source, target_space, transfer_method="interpolate", **kwargs)
[docs] @PETSc.Log.EventDecorator() def project(source, target_space, **kwargs): r""" Overload function :func:`firedrake.projection.project` to account for the case of two mixed function spaces defined on different meshes and for the adjoint projection operator when applied to :class:`firedrake.cofunction.Cofunction`\s. For details on the approach for achieving boundedness through mass lumping and post-processing, see :cite:`Farrell:2009`. :arg source: the function to be transferred :type source: :class:`firedrake.function.Function` or :class:`firedrake.cofunction.Cofunction` :arg target_space: the function space which we seek to transfer onto, or the function or cofunction to use as the target :type target_space: :class:`firedrake.functionspaceimpl.FunctionSpace`, :class:`firedrake.function.Function` or :class:`firedrake.cofunction.Cofunction` :returns: the transferred function :rtype: :class:`firedrake.function.Function` or :class:`firedrake.cofunction.Cofunction` :kwarg bounded: apply mass lumping to the mass matrix to ensure boundedness :type bounded: :class:`bool` Extra keyword arguments are passed to :func:`firedrake.projection.project`. """ return transfer(source, target_space, transfer_method="project", **kwargs)
# TODO: Reimplement by introducing a LumpedSupermeshProjector subclass of # firedrake.projection.SupermeshProjector (#123) # TODO: Implement minimal diffusion correction (#124) def _supermesh_project(source, target, bounded=False): Vs = source.function_space() Vt = target.function_space() element_t = Vt.ufl_element() if bounded and (element_t.family(), element_t.degree()) != ("Lagrange", 1): raise ValueError("Mass lumping is not recommended for spaces other than P1.") # Create a linear system using the lumped mass matrix for the target space mixed_mass = assemble_mixed_mass_matrix(Vs, Vt) ksp = petsc4py.KSP().create() ksp.setOperators(assemble_mass_matrix(Vt, lumped=bounded)) # Solve the linear system with source.dat.vec_ro as s, target.dat.vec_wo as t: rhs = t.copy() mixed_mass.mult(s, rhs) ksp.solve(rhs, t) @PETSc.Log.EventDecorator() def _transfer_forward(source, target, transfer_method, **kwargs): """ Apply mesh-to-mesh transfer operator to a Function. This function extends the functionality of :func:`firedrake.__future__.interpolate` and :func:`firedrake.projection.project` to account for mixed spaces. :arg source: the Function to be transferred :type source: :class:`firedrake.function.Function` :arg target: the Function which we seek to transfer onto :type target: :class:`firedrake.function.Function` :kwarg transfer_method: the method to use for the transfer. Options are "interpolate" (default) and "project" :type transfer_method: str :kwarg bounded: apply mass lumping to the mass matrix to ensure boundedness (project only) :type bounded: :class:`bool` :returns: the transferred Function :rtype: :class:`firedrake.function.Function` Extra keyword arguments are passed to :func:`firedrake.__future__.interpolate` or :func:`firedrake.projection.project`. """ is_project = transfer_method == "project" bounded = is_project and kwargs.pop("bounded", False) Vs = source.function_space() Vt = target.function_space() _validate_matching_spaces(Vs, Vt) assert isinstance(target, firedrake.Function) if hasattr(Vt, "num_sub_spaces"): for s, t in zip(source.subfunctions, target.subfunctions): if transfer_method == "interpolate": t.interpolate(s, **kwargs) elif transfer_method == "project": if bounded: _supermesh_project(s, t, bounded=True) else: t.project(s, **kwargs) else: raise ValueError( f"Invalid transfer method: {transfer_method}." " Options are 'interpolate' and 'project'." ) else: if transfer_method == "interpolate": target.interpolate(source, **kwargs) elif transfer_method == "project": if bounded: _supermesh_project(source, target, bounded=True) else: target.project(source, **kwargs) else: raise ValueError( f"Invalid transfer method: {transfer_method}." " Options are 'interpolate' and 'project'." ) return target @PETSc.Log.EventDecorator() def _transfer_adjoint(target_b, source_b, transfer_method, **kwargs): """ Apply an adjoint mesh-to-mesh transfer operator to a Cofunction. :arg target_b: seed Cofunction from the target space of the forward projection :type target_b: :class:`firedrake.cofunction.Cofunction` :arg source_b: output Cofunction from the source space of the forward projection :type source_b: :class:`firedrake.cofunction.Cofunction` :kwarg transfer_method: the method to use for the transfer. Options are "interpolate" (default) and "project" :type transfer_method: str :kwarg bounded: apply mass lumping to the mass matrix to ensure boundedness (project only) :type bounded: :class:`bool` :returns: the transferred Cofunction :rtype: :class:`firedrake.cofunction.Cofunction` Extra keyword arguments are passed to :func:`firedrake.__future__.interpolate` or :func:`firedrake.projection.project`. """ is_project = transfer_method == "project" bounded = is_project and kwargs.pop("bounded", False) # Map to Functions to apply the adjoint transfer if not isinstance(target_b, firedrake.Function): target_b = cofunction2function(target_b) if not isinstance(source_b, firedrake.Function): source_b = cofunction2function(source_b) Vt = target_b.function_space() Vs = source_b.function_space() if Vs == Vt: source_b.assign(target_b) return function2cofunction(source_b) _validate_matching_spaces(Vs, Vt) if hasattr(Vs, "num_sub_spaces"): target_b_split = target_b.subfunctions source_b_split = source_b.subfunctions else: target_b_split = [target_b] source_b_split = [source_b] # Apply adjoint transfer operator to each component for i, (t_b, s_b) in enumerate(zip(target_b_split, source_b_split)): if transfer_method == "interpolate": raise NotImplementedError( "Adjoint of interpolation operator not implemented." ) # TODO (#113) elif transfer_method == "project": ksp = petsc4py.KSP().create() ksp.setOperators(assemble_mass_matrix(t_b.function_space(), lumped=bounded)) mixed_mass = assemble_mixed_mass_matrix(Vt[i], Vs[i]) with t_b.dat.vec_ro as tb, s_b.dat.vec_wo as sb: residual = tb.copy() ksp.solveTranspose(tb, residual) mixed_mass.mult(residual, sb) # NOTE: already transposed above else: raise ValueError( f"Invalid transfer method: {transfer_method}." " Options are 'interpolate' and 'project'." ) # Map back to a Cofunction return function2cofunction(source_b) def _validate_matching_spaces(Vs, Vt): if hasattr(Vs, "num_sub_spaces"): if not hasattr(Vt, "num_sub_spaces"): raise ValueError( "Source space has multiple components but target space does not." ) if Vs.num_sub_spaces() != Vt.num_sub_spaces(): raise ValueError( "Inconsistent numbers of components in source and target spaces:" f" {Vs.num_sub_spaces()} vs. {Vt.num_sub_spaces()}." ) elif hasattr(Vt, "num_sub_spaces"): raise ValueError( "Target space has multiple components but source space does not." )
[docs] @PETSc.Log.EventDecorator() def clement_interpolant(source, target_space=None, boundary=False): r""" Compute the Clement interpolant of a :math:`\mathbb P0` source field, i.e. take the volume average over neighbouring cells at each vertex. See :cite:`Clement:1975`. :arg source: the :math:`\mathbb P0` source field :kwarg target_space: the :math:`\mathbb P1` space to interpolate into :kwarg boundary: interpolate over boundary facets or cells? """ if not isinstance(source, (firedrake.Cofunction, firedrake.Function)): raise TypeError(f"Expected Cofunction or Function, got '{type(source)}'.") # Map Cofunctions to Functions for the interpolation is_cofunction = isinstance(source, firedrake.Cofunction) if is_cofunction: data = source.dat.data_with_halos source = firedrake.Function(source.function_space().dual()) source.dat.data_with_halos[:] = data # Process source space Vs = source.function_space() Vs_e = Vs.ufl_element() if not (Vs_e.family() == "Discontinuous Lagrange" and Vs_e.degree() == 0): raise ValueError("Source function provided must be from a P0 space.") rank = len(Vs.value_shape) if rank not in (0, 1, 2): raise ValueError(f"Rank-{rank + 1} tensors are not supported.") mesh = Vs.mesh() dim = mesh.topological_dimension() # Process target space Vt = target_space if Vt is None: Vt = { 0: firedrake.FunctionSpace, 1: firedrake.VectorFunctionSpace, 2: firedrake.TensorFunctionSpace, }[rank](mesh, "CG", 1) elif isinstance(Vt, FiredrakeDualSpace): Vt = Vt.dual() else: is_cofunction = False Vt_e = Vt.ufl_element() if not (Vt_e.family() == "Lagrange" and Vt_e.degree() == 1): raise ValueError("Target space provided must be P1.") target = firedrake.Function(Vt) # Scalar P0 and P1 spaces to hold volumes, etc. P0 = firedrake.FunctionSpace(mesh, "DG", 0) P1 = firedrake.FunctionSpace(mesh, "CG", 1) # Determine target domain tdomain = { 0: "{[i]: 0 <= i < t.dofs}", 1: f"{{[i, j]: 0 <= i < t.dofs and 0 <= j < {dim}}}", 2: f"{{[i, j, k]: 0 <= i < t.dofs and 0 <= j < {dim} and 0 <= k < {dim}}}", }[rank] # Compute the patch volume at each vertex if not boundary: dX = ufl.dx(domain=mesh) volume = QualityMeasure(mesh, python=True)("volume") # Compute patch volume patch_volume = firedrake.Function(P1) domain = "{[i]: 0 <= i < patch.dofs}" instructions = "patch[i] = patch[i] + vol[0]" keys = {"vol": (volume, op2.READ), "patch": (patch_volume, op2.RW)} firedrake.par_loop((domain, instructions), dX, keys) # Take weighted average instructions = { 0: "t[i] = t[i] + v[0] * s[0]", 1: "t[i, j] = t[i, j] + v[0] * s[0, j]", 2: ( f"t[i, {dim} * j + k] =" f" t[i, {dim} * j + k] + v[0] * s[0, {dim} * j + k]" ), }[rank] keys = { "s": (source, op2.READ), "v": (volume, op2.READ), "t": (target, op2.RW), } firedrake.par_loop((tdomain, instructions), dX, keys) else: dX = ufl.ds(domain=mesh) # Indicate appropriate boundary bnd_indicator = firedrake.Function(P1) firedrake.DirichletBC(P1, 1, "on_boundary").apply(bnd_indicator) # Determine facet area for boundary edges v = firedrake.TestFunction(P0) u = firedrake.TrialFunction(P0) bnd_volume = firedrake.Function(P0) mass_term = v * u * dX rhs = v * ufl.FacetArea(mesh) * dX sp = {"snes_type": "ksponly", "ksp_type": "preonly", "pc_type": "jacobi"} firedrake.solve(mass_term == rhs, bnd_volume, solver_parameters=sp) # Compute patch volume patch_volume = firedrake.Function(P1) domain = "{[i]: 0 <= i < patch.dofs}" instructions = "patch[i] = patch[i] + indicator[i] * bnd_vol[0]" keys = { "bnd_vol": (bnd_volume, op2.READ), "indicator": (bnd_indicator, op2.READ), "patch": (patch_volume, op2.RW), } firedrake.par_loop((domain, instructions), dX, keys) # Take weighted average instructions = { 0: "t[i] = t[i] + v[0] * b[i] * s[0]", 1: "t[i, j] = t[i, j] + v[0] * b[i] * s[0, j]", 2: ( f"t[i, {dim} * j + k] = " f"t[i, {dim} * j + k] + v[0] * b[i] * s[0, {dim} * j + k]" ), }[rank] keys = { "s": (source, op2.READ), "v": (bnd_volume, op2.READ), "b": (bnd_indicator, op2.READ), "t": (target, op2.RW), } firedrake.par_loop((tdomain, instructions), dX, keys) # Divide by patch volume and ensure finite target.interpolate(target / patch_volume) target.dat.data_with_halos[:] = np.nan_to_num( target.dat.data_with_halos, posinf=0, neginf=0 ) # Map back to Cofunction, if one was passed originally if is_cofunction: data = target.dat.data_with_halos target = firedrake.Cofunction(target.function_space().dual()) target.dat.data_with_halos[:] = data return target