from pyadjoint.tape import no_annotations
from pyadjoint.enlisting import Enlist
from pyadjoint_utils import ReducedFunction
from pyadjoint_utils.control import Control
import numpy
import pyadjoint_utils.numpy_adjoint
from pyadjoint.overloaded_type import create_overloaded_object as coo
[docs]class ReducedFunctionNumPy(ReducedFunction):
"""This class implements the reduced function for given function and
controls based on numpy data structures.
This "NumPy version" of the ReducedFunction is created from
an existing ReducedFunction object:
rf_np = ReducedFunctionNumPy(rf = rf)
"""
def __init__(self, reduced_function):
if not isinstance(reduced_function, ReducedFunction):
raise TypeError("reduced_function should be a ReducedFunction")
self.rf = reduced_function
def __getattr__(self, item):
return getattr(self.rf, item)
[docs] def __call__(self, m_array):
"""An implementation of the reduced function evaluation
that accepts the control values as an array of scalars
"""
output = self.rf(self.get_rf_input(m_array))
return self.get_outputs_array(output)
def get_rf_input(self, m_array):
m_copies = [control.copy_data() for control in self.controls]
return self.set_local(m_copies, m_array)
def get_rf_adj_input(self, adj_array):
adj_controls = [Control(bv.output) for bv in self.outputs]
adj_copies = [control.copy_data() for control in adj_controls]
return self.set_adj_local(adj_controls, adj_copies, adj_array)
def set_adj_local(self, adj_controls, adj, adj_array):
"""Use the given numpy array to set the values of the given list of control
values."""
offset = 0
for i, control in enumerate(adj_controls):
adj[i], offset = control.assign_numpy(adj[i], adj_array, offset)
return adj
def set_local(self, m, m_array):
"""Use the given numpy array to set the values of the given list of control
values."""
offset = 0
for i, control in enumerate(self.controls):
m[i], offset = control.assign_numpy(m[i], m_array, offset)
return m
def get_global(self, m):
"""Converts the given list of control values m to a single numpy array."""
m_global = []
for i, v in enumerate(Enlist(m)):
if isinstance(v, Control):
# TODO: Consider if you want this design.
m_global += v.fetch_numpy(v.control)
elif hasattr(v, "_ad_to_list"):
m_global += v._ad_to_list(v)
else:
m_global += self.controls[i].control._ad_to_list(v)
return coo(numpy.array(m_global, dtype="d"))
[docs] @no_annotations
def jac_action(self, m_array): # , forget=True, project=False):
"""An implementation of the reduced function jac_action evaluation
that accepts the controls as an array of scalars.
"""
dJdm = self.rf.jac_action(self.get_rf_input(m_array))
return self.get_outputs_array(dJdm)
# Untested
[docs] @no_annotations
def adj_jac_action(self, adj_inputs): # , forget=True, project=False):
"""An implementation of the reduced functional adjoint evaluation
that returns the derivatives as an array of scalars.
"""
dJdm = self.rf.adj_jac_action(self.get_rf_adj_input(adj_inputs))
dJdm = Enlist(dJdm)
m_global = []
for i, control in enumerate(self.controls):
# This is a little ugly, but we need to go through the control to get to the OverloadedType.
# There is no guarantee that dJdm[i] is an OverloadedType and not a backend type.
m_global += control.fetch_numpy(dJdm[i])
return coo(numpy.array(m_global, dtype="d"))
# Untested
@no_annotations
def hessian(self, m_array, m_dot_array):
"""An implementation of the reduced function hessian action evaluation
that accepts the controls as an array of scalars. If m_array is None,
the Hessian action at the latest forward run is returned."""
# Calling derivative is needed, see i.e. examples/stokes-shape-opt
self.jac_action(m_array)
Hm = self.rf.hessian(m_array, self.get_rf_input(m_dot_array))
Hm = Enlist(Hm)
m_global = []
for i, control in enumerate(self.controls):
# This is a little ugly, but we need to go through the control to get to the OverloadedType.
# There is no guarantee that dJdm[i] is an OverloadedType and not a backend type.
m_global += control.fetch_numpy(Hm[i])
self.rf.tape.reset_variables()
return coo(numpy.array(m_global, dtype="d"))
@staticmethod
def get_identities(controls, pointwise=True):
# Each identity is a tensor with twice the rank of the corresponding control.
identities = []
controls = Enlist(controls)
if not isinstance(pointwise, (list, tuple)):
pointwise = [pointwise] * len(controls)
for i, control in enumerate(controls):
if hasattr(control, "value_shape"):
shape = control.value_shape()
if len(shape) == 0:
shape = (1,)
elif hasattr(control, "shape"):
shape = control.shape
else:
shape = (control._ad_dim(),)
if pointwise[i] and len(shape) > 0:
n = shape[0]
shape = tuple(shape[1:])
identity = numpy.zeros((n,) + shape * 2)
indices = numpy.indices(shape)
diagonal = [idx for idx in indices] * 2
identity[(...,) + tuple(diagonal)] = 1
else:
identity = numpy.zeros(shape * 2)
indices = numpy.indices(shape)
diagonal = [idx for idx in indices] * 2
identity[tuple(diagonal)] = 1
c_identities = [None] * len(controls)
c_identities[i] = identity
identities.append(c_identities)
return controls.delist(identities)
def obj_to_array(self, obj):
return self.get_global(obj)
def get_controls(self, controls=None):
if controls is None:
controls = self.controls
m = [p.tape_value() for p in controls]
return self.obj_to_array(m)
def set_controls(self, array):
m = [p.tape_value() for p in self.controls]
m = self.set_local(m, array)
for control, m_i in zip(self.controls, m):
control.update(m_i)
return m
def get_outputs_array(self, values):
m_global = []
values = Enlist(values)
for i, bv in enumerate(self.outputs):
if values[i] is not None:
m_global += bv.output._ad_to_list(values[i])
else:
m_global += [0] * bv.output._ad_dim()
return coo(numpy.array(m_global, dtype="d"))