from typing import Optional, Union
from judo import random_state
import numpy
from fragile.core.policy import Gaussian
from fragile.core.typing import StateData, StateDict, Tensor
[docs]class ParticleIntegration(Gaussian):
default_outputs = ("actions", "velocities")
default_inputs = {"velocities": {"clone": True}, "dt": {"optional": True}}
@property
def param_dict(self) -> StateDict:
params = super(ParticleIntegration, self).param_dict
if self.swarm is None:
return params
return {**{"velocities": self.swarm.env.param_dict["observs"]}, **params}
[docs] def act(self, inplace: bool = True, **kwargs) -> Union[None, StateData]:
"""Calculate SwarmState containing the data needed to interact with the environment."""
action_input = self._prepare_tensors(**kwargs)
actions_data = self.select_actions(**action_input)
if not isinstance(actions_data, dict):
actions_data = {"actions": actions_data}
actions = actions_data["actions"]
actions_data["actions"] = self.env_bounds.clip(actions)
if inplace:
self.update(**actions_data)
else:
return actions_data
[docs] def select_actions(self, **kwargs) -> Union[Tensor, StateData]:
actions = super(ParticleIntegration, self).select_actions(**kwargs)
dt = self.get("dt", default=1.0, raise_error=False)
velocity_0 = self.get("velocities")
velocities = velocity_0 + actions * dt
actions = velocity_0 + velocities * dt
return {"velocities": velocities, "actions": actions}
[docs] def reset(
self,
inplace: bool = True,
root_walker: Optional[StateData] = None,
states: Optional[StateData] = None,
**kwargs,
) -> Union[None, StateData]:
data = self.select_actions(**kwargs)
if inplace:
self.update(velocities=data["velocities"])
return data
[docs]class ESModel(Gaussian):
"""
The ESModel implements an evolutionary strategy policy.
It mutates randomly some of the coordinates of the best solution found by \
substituting them with a proposal solution. This proposal solution is the \
difference between two random permutations of the best solution found.
It applies a gaussian normal perturbation with a probability given by ``mutation``.
"""
def __init__(
self,
mutation: float = 0.5,
recombination: float = 0.7,
random_step_prob: float = 0.1,
*args,
**kwargs,
):
"""
Initialize a :class:`ESModel`.
Args:
mutation: Probability of mutating a coordinate of the solution vector.
recombination: Step size of the update applied to the best solution found.
random_step_prob: Probability of applying a random normal perturbation.
*args: Passed to the parent :class:`NormalContinuous`.
**kwargs: Passed to the parent :class:`NormalContinuous`.
"""
super(ESModel, self).__init__(*args, **kwargs)
self.mutation = mutation
self.recombination = recombination
self.random_step_prob = random_step_prob
[docs] def sample(
self,
batch_size: int,
model_states=None,
env_states=None,
walkers_states=None,
**kwargs,
):
"""
Calculate the corresponding data to interact with the Environment and \
store it in model states.
Args:
batch_size: Number of new points to the sampled.
model_states: SwarmState corresponding to the environment data.
env_states: SwarmState corresponding to the model data.
walkers_states: SwarmState corresponding to the walkers data.
kwargs: Passed to the :class:`Critic` if any.
Returns:
Tuple containing a tensor with the sampled actions and the new model states variable.
"""
# There is a chance of performing a gaussian perturbation
if random_state.random() < self.random_step_prob:
return super(ESModel, self).sample(
batch_size=batch_size,
env_states=env_states,
model_states=model_states,
**kwargs,
)
observs = (
env_states.observs
if env_states is not None
else numpy.zeros(((batch_size,) + self.shape))
)
has_best = walkers_states is not None and walkers_states.best_state is not None
best = walkers_states.best_state if has_best else observs
# Choose 2 random indices
indexes = numpy.arange(observs.shape[0])
a_rand = self.random_state.permutation(indexes)
b_rand = self.random_state.permutation(indexes)
proposal = best + self.recombination * (observs[a_rand] - observs[b_rand])
# Randomly mutate each coordinate of the original vector
assert observs.shape == proposal.shape
rands = random_state.random(observs.shape)
perturbations = numpy.where(rands < self.mutation, observs, proposal)
# The Environment will sum the observations to perform the step
new_states = perturbations - observs
actions = self.bounds.clip(new_states)
return self.update_states_with_critic(
actions=actions,
batch_size=batch_size,
model_states=model_states,
env_states=env_states,
walkers_states=walkers_states,
**kwargs,
)
[docs]class CMAES(Gaussian):
"""Implementation of CMAES algorithm from https://en.wikipedia.org/wiki/CMA-ES."""
def __init__(self, sigma: float, virtual_reward_fitness: bool = False, *args, **kwargs):
"""
Initialize a :class:`CMAES`.
Args:
sigma: Coordinate-wise standard deviation.
virtual_reward_fitness: Use the virtual reward instead of the environment \
reward to as a fitness measure.
*args: Passed to :class:`NormalContinuous`.__init__.
**kwargs: Passed to :class:`NormalContinuous`.__init__.
"""
super(CMAES, self).__init__(*args, **kwargs)
# Parameters for selection
self.virtual_reward_fitness = virtual_reward_fitness
self.pop_size = None
self.sigma = sigma
self._count_eval = 0 # Number of function evaluations performed
self.x_mean = None # Objective variables initial point
self.old_x_mean = None # x_mean corresponding to the last iteration
# Parameters for adaptation that are kept constant
self.mu_const = None # Number of parents/points for recombination
self.weights_const = None # Array for weighted recombination
self.mu_eff_const = None # variance-effectiveness of sum w_i x_i
self.cum_covm_const = None # Time constant for accumulation for cov_matrix
self.cum_sigma_const = None # Time constant for accumulation for sigma control
self.lr_covrank1_const = None # Learning rate for rank-one update of cov_matrix
self.lr_mu_const = None # Learning rate for rank-mu_const update
self.damp_sigma_const = None # Damping for sigma. Usually close to one.
self.chi_norm_const = None # expectation of ||N(0,I)|| == norm(randn(N,1))
# Dynamic (internal) strategy parameters and constants
self.paths_covm = None # Evolution paths for cov_matrix
self.paths_sigma = None # Evolution paths for sigma
self.coords_matrix = None # Defines the coordinate system
self.scaling_diag = None # Diagonal matrix that defines the scaling
self.cov_matrix = None # Covariance matrix
self.invsqrtC = None # cov_matrix^-1/2
self.n_eigen_eval = None # Tracks update of coords_matrix and scaling_diag
self.artmp = None # Temporal array that contains terms for updating the covariance matrix
self.hsig = None
[docs] def sample(
self,
batch_size: int,
model_states=None,
env_states=None,
walkers_states=None,
**kwargs,
):
"""
Calculate the corresponding data to interact with the Environment and \
store it in model states.
Args:
batch_size: Number of new points to the sampled.
model_states: SwarmState corresponding to the environment data.
env_states: SwarmState corresponding to the model data.
walkers_states: SwarmState corresponding to the walkers data.
kwargs: Passed to the :class:`Critic` if any.
Returns:
Tuple containing a tensor with the sampled actions and the new model states variable.
"""
if model_states is None or walkers_states is None:
return super(CMAES, self).sample(
batch_size=batch_size,
model_states=model_states,
env_states=env_states,
walkers_states=walkers_states,
**kwargs,
)
actions = (
env_states.get("observs")
if self._count_eval > self.pop_size * 2
else model_states.get("actions")
)
fitness = (
walkers_states.get("virtual_rewards")
if self.virtual_reward_fitness
else walkers_states.get("cum_rewards")
)
sorted_fitness = numpy.argsort(fitness)[: self.mu_const]
selected_actions = actions[sorted_fitness].T
self._update_evolution_paths(selected_actions)
self._adapt_covariance_matrix(selected_actions)
self._adapt_sigma()
self._cov_matrix_diagonalization()
actions = self._sample_actions()
return self.update_states_with_critic(
actions=actions,
batch_size=batch_size,
model_states=model_states,
**kwargs,
)
[docs] def _update_evolution_paths(self, actions):
self.old_x_mean = self.x_mean
self.x_mean = numpy.matmul(actions, self.weights_const)
# Update evolution paths
self.paths_sigma = (1 - self.cum_sigma_const) * self.paths_sigma + numpy.sqrt(
self.cum_sigma_const * (2 - self.cum_sigma_const) * self.mu_eff_const,
) * numpy.matmul(self.invsqrtC, (self.x_mean - self.old_x_mean)) / self.sigma
sqrt_term = numpy.sqrt(
1 - (1 - self.cum_sigma_const) ** (2 * self._count_eval / self.pop_size),
)
self.hsig = float(
numpy.linalg.norm(self.paths_sigma) / sqrt_term / self.chi_norm_const
< 1.4 + 2 / (self.n_dims + 1),
)
self.paths_covm = (1 - self.cum_covm_const) * self.paths_covm + self.hsig * numpy.sqrt(
self.cum_covm_const * (2 - self.cum_covm_const) * self.mu_eff_const,
) * (self.x_mean - self.old_x_mean) / self.sigma
[docs] def _adapt_covariance_matrix(self, actions):
self.artmp = (1 / self.sigma) * (actions - numpy.tile(self.old_x_mean, (1, self.mu_const)))
self.cov_matrix = (
(1 - self.lr_covrank1_const - self.lr_mu_const) * self.cov_matrix
+ self.lr_covrank1_const * numpy.matmul(self.paths_covm, self.paths_covm.T)
+ (1 - self.hsig) * self.cum_covm_const * (2 - self.cum_covm_const) * self.cov_matrix
+ self.lr_mu_const
* numpy.matmul(
numpy.matmul(self.artmp, numpy.diag(self.weights_const.flatten())),
self.artmp.T,
)
)
[docs] def _adapt_sigma(self):
self.sigma = self.sigma * numpy.exp(
(self.cum_sigma_const / self.damp_sigma_const)
* (numpy.linalg.norm(self.paths_sigma) / self.chi_norm_const - 1),
)
[docs] def _cov_matrix_diagonalization(self):
# Decomposition of cov_matrix into coords_matrix*diag(scaling_diag.^2)*coords_matrix'
# (diagonalization)
if (
self._count_eval - self.n_eigen_eval
> self.pop_size / (self.lr_covrank1_const + self.lr_mu_const) / self.n_dims / 10
):
self.n_eigen_eval = self._count_eval
self.cov_matrix = numpy.triu(self.cov_matrix) + numpy.triu(self.cov_matrix, 1).T
eigvals, eigvects = numpy.linalg.eig(self.cov_matrix)
self.scaling_diag = numpy.diag(eigvals) # [::-1])
self.coords_matrix = eigvects # [:, ::-1]
self.scaling_diag = numpy.sqrt(numpy.diag(self.scaling_diag)).reshape(-1, 1)
self.invsqrtC = numpy.matmul(
numpy.matmul(self.coords_matrix, numpy.diag(self.scaling_diag.flatten() ** -1)),
self.coords_matrix.T,
)
[docs] def reset(
self,
batch_size: int = 1,
model_states=None,
env_states=None,
*args,
**kwargs,
):
"""
Return a new blank State for a `DiscreteUniform` instance, and a valid \
prediction based on that new state.
Args:
batch_size: Number of walkers that the new model `State`.
model_states: :class:`StatesModel` corresponding to the model data.
env_states: :class:`StatesEnv` containing the environment data.
*args: Passed to `predict`.
**kwargs: Passed to `predict`.
Returns:
New model states containing sampled data.
"""
self.pop_size = batch_size
self._count_eval = 0
self._init_algorithm_params(batch_size)
# Take the first sample from a random uniform distribution
if batch_size is None and env_states is None:
raise ValueError("env_states and batch_size cannot be both None.")
batch_size = batch_size or env_states.n
model_states = model_states or self.create_new_states(batch_size=batch_size)
init_actions = self.random_state.randn(self.mu_const)
self.x_mean = numpy.matmul(init_actions.T, self.weights_const)
actions = self._sample_actions()
model_states.update(actions=actions)
return model_states
[docs] def _sample_actions(self) -> numpy.ndarray:
actions = numpy.empty((self.pop_size, self.n_dims))
for i in range(self.pop_size):
normal_noise = self.random_state.randn(self.n_dims).reshape(-1, 1)
action = self.x_mean + self.sigma * numpy.matmul(
self.coords_matrix,
self.scaling_diag * normal_noise,
)
actions[i, :] = action.flatten()
self._count_eval += 1
return actions
[docs] def _init_algorithm_params(self, batch_size):
self.pop_size = batch_size
self.mu_const = self.pop_size / 2 # Number of parents/points for recombination
self.weights_const = numpy.log(self.mu_const + 0.5) - numpy.log(
numpy.arange(1, self.mu_const + 1),
).reshape((-1, 1))
self.mu_const = int(numpy.floor(self.mu_const))
self.weights_const = self.weights_const / numpy.sum(self.weights_const)
self.mu_eff_const = numpy.sum(self.weights_const) ** 2 / numpy.sum(self.weights_const**2)
# Parameters for adaptation
self.cum_covm_const = (4 + self.mu_eff_const / self.n_dims) / (
self.n_dims + 4 + 2 * self.mu_eff_const / self.n_dims
)
self.cum_sigma_const = (self.mu_eff_const + 2) / (self.n_dims + self.mu_eff_const + 5)
self.lr_covrank1_const = 2 / ((self.n_dims + 1.3) ** 2 + self.mu_eff_const)
self.lr_mu_const = numpy.minimum(
1 - self.lr_covrank1_const,
2
* (self.mu_eff_const - 2 + 1 / self.mu_eff_const)
/ ((self.n_dims + 2) ** 2 + self.mu_eff_const),
)
self.damp_sigma_const = (
1
+ 2 * numpy.maximum(0, numpy.sqrt((self.mu_eff_const - 1) / (self.n_dims + 1)) - 1)
+ self.cum_sigma_const
)
# Dynamic (internal) strategy parameters and constants
self.paths_covm = numpy.zeros((self.n_dims, 1))
self.paths_sigma = numpy.zeros((self.n_dims, 1))
self.coords_matrix = numpy.eye(self.n_dims) # Defines the coordinate system
self.scaling_diag = numpy.ones(
(self.n_dims, 1),
) # Diagonal matrix that defines the scaling
self.cov_matrix = (
self.coords_matrix * numpy.diag(self.scaling_diag**2) * self.coords_matrix.T
) # Covariance matrix
self.invsqrtC = (
self.coords_matrix * numpy.diag(1 / self.scaling_diag.flatten()) * self.coords_matrix.T
)
self.n_eigen_eval = 0 # Tracks update of coords_matrix and scaling_diag
self.chi_norm_const = self.n_dims**0.5 * (
1 - 1 / (4 * self.n_dims) + 1 / (21 * self.n_dims**2)
)