Source code for fragile.optimize.env

from typing import Callable, Tuple

import judo
from judo import Backend, Bounds, tensor, typing
import numpy
import numpy as np
from scipy.optimize import Bounds as ScipyBounds, minimize

from fragile.core.env import Function
from fragile.core.fractalai import relativize
from fragile.core.typing import StateData, StateDict
from fragile.optimize.utils import GradFuncWrapper


[docs]class Minimizer: """Apply ``scipy.optimize.minimize`` to a :class:`Function`."""
[docs] def __init__(self, function: Function, bounds=None, **kwargs): """ Initialize a :class:`Minimizer`. Args: function: :class:`Function` that will be minimized. bounds: :class:`Bounds` defining the domain of the minimization \ process. If it is ``None`` the :class:`Function` :class:`Bounds` \ will be used. *args: Passed to ``scipy.optimize.minimize``. **kwargs: Passed to ``scipy.optimize.minimize``. """ self.env = function self.function = function.function self.bounds = self.env.bounds if bounds is None else bounds self.kwargs = kwargs
[docs] def minimize(self, x: typing.Tensor): """ Apply ``scipy.optimize.minimize`` to a single point. Args: x: Array representing a single point of the function to be minimized. Returns: Optimization result object returned by ``scipy.optimize.minimize``. """ def _optimize(_x): try: _x = _x.reshape((1,) + _x.shape) y = self.function(_x) except (ZeroDivisionError, RuntimeError): y = numpy.inf return y bounds = ScipyBounds( ub=judo.to_numpy(self.bounds.high) if self.bounds is not None else None, lb=judo.to_numpy(self.bounds.low) if self.bounds is not None else None, ) return minimize(_optimize, x, bounds=bounds, **self.kwargs)
[docs] def minimize_point(self, x: typing.Tensor) -> Tuple[typing.Tensor, typing.Scalar]: """ Minimize the target function passing one starting point. Args: x: Array representing a single point of the function to be minimized. Returns: Tuple containing a numpy array representing the best solution found, \ and the numerical value of the function at that point. """ optim_result = self.minimize(x) point = tensor(optim_result["x"]) reward = tensor(float(optim_result["fun"])) return point, reward
[docs] def minimize_batch(self, x: typing.Tensor) -> Tuple[typing.Tensor, typing.Tensor]: """ Minimize a batch of points. Args: x: Array representing a batch of points to be optimized, stacked \ across the first dimension. Returns: Tuple of arrays containing the local optimum found for each point, \ and an array with the values assigned to each of the points found. """ x = judo.to_numpy(judo.copy(x)) with Backend.use_backend("numpy"): result = judo.zeros_like(x) rewards = judo.zeros((x.shape[0], 1)) for i in range(x.shape[0]): new_x, reward = self.minimize_point(x[i, :]) result[i, :] = new_x rewards[i, :] = float(reward) self.bounds.high = tensor(self.bounds.high) self.bounds.low = tensor(self.bounds.low) result, rewards = tensor(result), tensor(rewards) return result, rewards
[docs]class MinimizerWrapper(Function): """ Wrapper that applies a local minimization process to the observations \ returned by a :class:`Function`. """
[docs] def __init__(self, function: Function, **kwargs): """ Initialize a :class:`MinimizerWrapper`. Args: function: :class:`Function` to be minimized after each step. *args: Passed to the internal :class:`Optimizer`. **kwargs: Passed to the internal :class:`Optimizer`. """ self.unwrapped = function self.minimizer = Minimizer(function=self.unwrapped, **kwargs)
@property def shape(self) -> Tuple[int, ...]: """Return the shape of the wrapped environment.""" return self.unwrapped.shape @property def function(self) -> Callable: """Return the function of the wrapped environment.""" return self.unwrapped.function @property def bounds(self) -> Bounds: """Return the bounds of the wrapped environment.""" return self.unwrapped.bounds @property def custom_domain_check(self) -> Callable: """Return the custom_domain_check of the wrapped environment.""" return self.unwrapped.custom_domain_check
[docs] def __getattr__(self, item): return getattr(self.unwrapped, item)
[docs] def __repr__(self): return self.unwrapped.__repr__()
[docs] def setup(self, swarm): self.unwrapped.setup(swarm)
[docs] def make_transitions(self, inplace: bool = True, **kwargs): """ Perform a local optimization process to the observations returned after \ calling ``make_transitions`` on the wrapped :class:`Function`. """ function_data = self.unwrapped.make_transitions(inplace=False, **kwargs) new_points, rewards = self.minimizer.minimize_batch(function_data.get("observs")) # new_points, rewards = tensor(new_points), tensor(rewards) oobs = self.calculate_oobs(new_points, rewards) new_data = dict(observs=new_points, rewards=rewards.flatten(), oobs=oobs) if inplace: self.update(**new_data) else: return new_data
[docs]class ParticleSimulation(Function): default_inputs = { "actions": {}, "observs": {"clone": True}, "velocities": {"clone": True}, "oobs": {"optional": True}, "time": {"clone": True}, } default_outputs = "observs", "rewards", "oobs", "velocities" def __init__(self, step_size=1.0, steps=10, **kwargs): self.step_size = step_size self.steps = steps super(ParticleSimulation, self).__init__(**kwargs) # self.grad_func = GradFuncWrapper(lambda theta: self.function(numpy.array([theta]))[0]) self.grad_func = GradFuncWrapper(self.function)
[docs] @classmethod def from_instance( cls, function: Function, step_size: float = 1, steps=1, x0=None, start_same_pos: bool = False, ): return cls( function=function.function, bounds=function.bounds, custom_domain_check=function.custom_domain_check, actions_as_perturbations=True, step_size=step_size, steps=steps, x0=x0, start_same_pos=start_same_pos, )
@property def param_dict(self) -> StateDict: param_dict = super(ParticleSimulation, self).param_dict param_dict["velocities"] = dict(param_dict["observs"]) param_dict["kinetic_energy"] = dict(param_dict["rewards"]) param_dict["total_energy"] = dict(param_dict["rewards"]) param_dict["time"] = dict(param_dict["rewards"]) return param_dict
[docs] def calculate_forces(self, observs, **kwargs): compas = observs[np.random.permutation(np.arange(len(observs)))] deltas = self.bounds.pbc_distance(observs, compas) distances = np.linalg.norm(deltas, axis=1) vectors = (deltas) / np.clip(distances.reshape(-1, 1), 1e-4, np.inf) r2 = relativize(distances) r2i = 1.0 / (r2**2) r6i = r2i * r2i * r2i epot = r6i * (r6i - 1.0) * 4 forces = epot.reshape(-1, 1) * vectors return forces
[docs] def fai_leapfrog(self, init_pos, init_momentum, grad, step_size, function, force=None): """Perfom a leapfrog jump in the Hamiltonian space INPUTS ------ init_pos: ndarray[float, ndim=1] initial parameter position init_momentum: ndarray[float, ndim=1] initial momentum grad: float initial gradient value step_size: float step size function: callable it should return the log probability and gradient evaluated at theta logp, grad = function(theta) OUTPUTS ------- new_position: ndarray[float, ndim=1] new parameter position new_momentum: ndarray[float, ndim=1] new momentum gradprime: float new gradient new_potential: float new lnp """ potential, grad = function(init_pos) lj_force = 0 # self.calculate_forces(init_pos) pot = relativize(potential).reshape(-1, 1) grad = (grad / numpy.linalg.norm(grad, axis=1).reshape(-1, 1)) * pot # make half step in init_momentum new_momentum = init_momentum + 0.5 * step_size * (lj_force - grad + force) # make new step in theta new_position = init_pos + step_size * new_momentum new_position = self.bounds.pbc(new_position) # compute new gradient new_potential, gradprime = function(new_position) new_pot = relativize(new_potential).reshape(-1, 1) gradprime = (gradprime / numpy.linalg.norm(gradprime, axis=1).reshape(-1, 1)) * new_pot # make half step in init_momentum again new_force = 0 # self.calculate_forces(new_position) new_momentum = new_momentum + 0.5 * step_size * (new_force - gradprime + force) return new_position, new_momentum, gradprime, new_potential
[docs] def step(self, actions, observs, velocities, **kwargs) -> StateData: """ Sum the target action to the observations to obtain the new points, and \ evaluate the reward and boundary conditions. Returns: Dictionary containing the information of the new points evaluated. ``{"states": new_points, "observs": new_points, "rewards": typing.Scalar array, \ "oobs": boolean array}`` """ for _ in range(self.steps): _, grad = self.grad_func(observs) data = self.fai_leapfrog( init_pos=observs, init_momentum=velocities, grad=grad, step_size=self.step_size, function=self.grad_func, force=0, ) new_position, velocities, gradprime, new_potential = data observs = self.bounds.pbc(judo.tensor(new_position)) return dict( observs=observs, velocities=judo.tensor(velocities), rewards=judo.tensor(new_potential), oobs=judo.zeros(len(new_potential), dtype=judo.bool), )