from collections import defaultdict
from typing import Any, Dict, Optional, Tuple, Union
import judo
from judo import dtype, random_state, tensor
from fragile.core.api_classes import WalkersAPI, WalkersMetric
from fragile.core.fractalai import fai_iteration, relativize
from fragile.core.typing import InputDict, StateData, StateDict, Tensor
[docs]class SimpleWalkers(WalkersAPI):
default_inputs = {
"observs": {},
"oobs": {"optional": True},
"is_won": {"optional": True},
"rewards": {},
"scores": {"clone": True},
}
default_param_dict = {"scores": {"dtype": dtype.float32}}
default_outputs = tuple(["scores"])
def __init__(
self,
accumulate_reward: bool = True,
score_scale: float = 1.0,
diversity_scale: float = 1.0,
minimize: bool = False,
**kwargs,
):
self.score_scale = score_scale
self.diversity_scale = diversity_scale
self.accumulate_reward = accumulate_reward
self.minimize = minimize
super(SimpleWalkers, self).__init__(**kwargs)
[docs] def run_epoch(
self,
observs,
rewards,
scores,
oobs=None,
inplace: bool = True,
**kwargs,
) -> StateData:
scores = rewards + scores if self.accumulate_reward else rewards
sign_scores = -1.0 * scores if self.minimize else scores
compas_ix, will_clone = fai_iteration(
observs=observs,
rewards=sign_scores,
oobs=oobs,
dist_coef=self.diversity_scale,
reward_coef=self.score_scale,
)
if inplace:
self.clone_walkers(compas_clone=compas_ix, will_clone=will_clone)
return {"scores": scores}
[docs]def l2_norm(x: Tensor, y: Tensor) -> Tensor:
"""
Compute the L2 norm between two tensors.
Args:
x (Tensor): The first tensor.
y (Tensor): The second tensor.
Returns:
Tensor: The L2 norm between the two input tensors.
"""
return judo.sqrt(judo.sum((x - y) ** 2, 1))
[docs]class ScoreMetric(WalkersMetric):
"""
A base class for score metrics in a swarm of walkers.
Attributes:
default_param_dict (dict): Default parameters dictionary for the score metric.
default_outputs (tuple): Tuple containing the default output keys.
"""
default_param_dict = {"scores": {"dtype": dtype.float32}}
default_outputs = tuple(["scores"])
[docs]class RewardScore(ScoreMetric):
"""
A class representing the reward score metric for a swarm of walkers.
Attributes:
default_inputs (dict): Default inputs dictionary for the reward score metric.
"""
default_inputs = {"rewards": {}}
def __init__(self, accumulate_reward: bool = True, keep_max_reward: bool = False, **kwargs):
self.accumulate_reward = accumulate_reward
self.keep_max_reward = keep_max_reward
super(RewardScore, self).__init__(**kwargs)
@property
def inputs(self) -> InputDict:
inputs = super(RewardScore, self).inputs
if self.accumulate_reward:
inputs["scores"] = {"clone": True}
return inputs
[docs] def calculate(
self,
rewards: Optional[Tensor] = None,
scores=None,
**kwargs,
) -> Dict[str, Tensor]:
"""
Calculate the scores for the walkers based on rewards.
Args:
rewards (array): Array of walkers' rewards.
scores (array, optional): Array of walkers' scores.
**kwargs: Additional keyword arguments.
Returns:
dict: A dictionary containing the scores.
"""
rewards = self.get("rewards", inactives=True) if rewards is None else rewards
actives = self.swarm.state.actives
if self.accumulate_reward:
values = self.get("scores", inactives=True)
values[actives] = values[actives] + rewards[actives]
else:
values = rewards
if self.keep_max_reward and scores is not None:
values = (
judo.minimum(values, scores)
if self.swarm.minimize
else judo.maximum(values, scores)
)
return {"scores": values}
[docs] def reset(
self,
inplace: bool = True,
root_walker: Optional[StateData] = None,
states: Optional[StateData] = None,
**kwargs,
) -> None:
"""
Reset the reward score metric.
Args:
inplace (bool, optional): Whether to reset in-place. Defaults to True.
root_walker (StateData, optional): The root walker to reset. Defaults to None.
states (Optional[StateData], optional): The state data to reset. Defaults to None.
**kwargs: Additional keyword arguments.
"""
if root_walker is None and not self.accumulate_reward:
rewards = self.get("rewards", inactives=True)
# _default_reward = numpy.inf if self.swarm.minimize else numpy.ninf
# rewards = rewards * _default_reward
self.update(scores=rewards, inactives=True)
[docs]class SonicScore(ScoreMetric):
accumulate_reward = False
default_inputs = {
"rewards": {"clone": True},
"infos": {"clone": True},
"scores": {"clone": True},
}
[docs] @staticmethod
def score_from_info(info):
not_in_bonus_level = not (info.get("in_bonus_level") or info.get("in_transition_screen"))
if info.get("in_boss_fight", False):
score_x = 1010
elif info.get("in_bonus_level"):
score_x = 1002
elif info.get("in_transition_screen"):
score_x = 1008
else:
score_x = 1000 * min(info.get("x", 0) / max(info.get("screen_x_end", 1), 1), 1)
score = (
info.get("score", 0)
+ score_x
+ 10 * info.get("rings", 0)
+ 1001
* (
info.get("act", 0)
if (not_in_bonus_level and not info.get("in_transition_screen"))
else info.get("act", 0) - 1
)
+ 5000 * info.get("zone", 0)
+ 1000 * info.get("lives", 0)
)
return int(score)
[docs] def calculate(self, rewards, scores=None, **kwargs):
scores = tensor([self.score_from_info(info) for info in self.get("infos", inactives=True)])
scores = judo.maximum(scores, self.get("scores", inactives=True))
return {"scores": scores}
[docs]class MarioScore(ScoreMetric):
name = "MarioScore"
accumulate_reward = False
default_inputs = {"rewards": {}, "infos": {}}
[docs] @staticmethod
def score_from_info(info):
score = (
(info.get("world", 0) * 25000)
+ (info.get("stage", 0) * 5000)
+ info.get("x_pos", 0)
+ 10 * int(bool(info.get("in_pipe", 0)))
+ 100 * int(bool(info.get("flag_get", 0)))
+ 10 * info.get("coins", 0)
+ info["life"] * 1000
# + (abs(info["x_pos"] - info["x_position_last"]))
)
return score
[docs] def calculate(self, rewards=None, scores=None, **kwargs):
scores = tensor([self.score_from_info(info) for info in self.get("infos")])
self.update(scores=scores)
scores = self.get("scores", inactives=True)
return {"scores": scores}
[docs]class DiversityMetric(WalkersMetric):
"""
A base class for diversity metrics in a swarm of walkers.
Attributes:
default_param_dict (dict): Default parameters dictionary for the diversity metric.
default_outputs (tuple): Tuple containing the default output keys.
"""
default_param_dict = {"diversities": {"dtype": dtype.float32}}
default_outputs = tuple(["diversities"])
[docs]class RandomDistance(DiversityMetric):
"""
A class representing the random distance diversity metric for a swarm of walkers.
Attributes:
default_inputs (dict): Default inputs dictionary for the random distance diversity metric.
"""
use_pbc = False
default_inputs = {"observs": {}, "oobs": {}}
[docs] def calculate(self, observs, oobs, **kwargs):
"""
Calculate the diversities for the walkers based on the L2 distance to another \
walker chosen at random.
Args:
observs (array): Array of walkers' observations.
oobs (array): Array of out-of-bounds flags for walkers.
**kwargs: Additional keyword arguments.
Returns:
dict: A dictionary containing the diversities.
"""
compas = self.swarm.walkers.get_in_bounds_compas(oobs=oobs)
obs = judo.astype(observs.reshape(observs.shape[0], -1), dtype.float32)
if self.use_pbc and hasattr(self.swarm.env, "bounds"):
deltas = self.swarm.env.bounds.pbc_distance(obs, obs[compas])
# return {"diversities": numpy.linalg.norm(deltas, axis=1).flatten()}
return {"diversities": l2_norm(deltas, 0).flatten()}
return {"diversities": l2_norm(obs, obs[compas]).flatten()}
[docs]class Walkers(WalkersAPI):
"""
Walkers class handles the walkers' dynamics, including scoring, diversity, cloning, \
and resetting.
Inherits from WalkersAPI.
"""
default_param_dict = {
"compas_clone": {"dtype": dtype.int64},
"virtual_rewards": {"dtype": dtype.float32},
"clone_probs": {"dtype": dtype.float32},
"will_clone": {"dtype": dtype.bool},
# "actives": {"dtype": dtype.bool},
}
default_outputs = (
"compas_clone",
"virtual_rewards",
"clone_probs",
"will_clone",
) # , "actives")
default_inputs = {"oobs": {}, "terminals": {"optional": True}}
[docs] def __init__(
self,
score: ScoreMetric = None,
diversity: DiversityMetric = None,
minimize: bool = False,
score_scale: float = 1.0,
diversity_scale: float = 1.0,
track_data=None,
accumulate_reward: bool = True,
keep_max_reward: bool = False,
clone_period: int = 1,
freeze_walkers=True,
**kwargs,
):
"""
Initialize a Walkers object.
Args:
score (ScoreMetric, optional): Scoring metric. Defaults to None.
diversity (DiversityMetric, optional): Diversity metric. Defaults to None.
minimize (bool, optional): Whether to minimize the score. Defaults to False.
score_scale (float, optional): Scaling factor for score. Defaults to 1.0.
diversity_scale (float, optional): Scaling factor for diversity. Defaults to 1.0.
track_data (iterable, optional): Data to track. Defaults to None.
accumulate_reward (bool, optional): Whether to accumulate rewards. Defaults to True.
keep_max_reward (bool, optional): Whether to keep maximum reward. Defaults to False.
clone_period (int, optional): Clone period. Defaults to 1.
freeze_walkers (bool, optional): Whether to freeze walkers. Defaults to True.
**kwargs: Additional keyword arguments.
"""
self.minimize = minimize
self.score_scale = score_scale
self.diversity_scale = diversity_scale
self.clone_period = clone_period
self.score = (
score
if score is not None
else RewardScore(accumulate_reward=accumulate_reward, keep_max_reward=keep_max_reward)
)
self.accumulate_reward = self.score.accumulate_reward
self.diversity = diversity if diversity is not None else RandomDistance()
self.track_data = set(track_data) if track_data is not None else set()
self.freeze_walkers = freeze_walkers
super(WalkersAPI, self).__init__(**kwargs)
@property
def param_dict(self) -> StateDict:
return {
**super(WalkersAPI, self).param_dict,
**self.diversity.param_dict,
**self.score.param_dict,
}
@property
def inputs(self) -> InputDict:
return {**super(WalkersAPI, self).inputs, **self.diversity.inputs, **self.score.inputs}
@property
def outputs(self) -> Tuple[str, ...]:
return super(WalkersAPI, self).outputs + self.score.outputs + self.diversity.outputs
[docs] def setup(self, swarm):
"""
Set up the Walkers object by initializing its swarm.
Args:
swarm: Swarm object to be initialized.
"""
super(Walkers, self).setup(swarm)
self.diversity.setup(swarm)
self.score.setup(swarm)
self.minimize = swarm.minimize
[docs] def balance(self, inplace: bool = True, **kwargs) -> Union[None, StateData]:
"""
Balance the walkers.
Args:
inplace (bool, optional): Whether to perform the operation inplace. Defaults to True.
**kwargs: Additional keyword arguments.
Returns:
Union[None, StateData]: None if inplace=True, otherwise a StateData object.
"""
if self.swarm.epoch % self.clone_period == 0 or self.swarm.epoch == 0:
return super(Walkers, self).balance(inplace=inplace, **kwargs)
[docs] def run_epoch(self, inplace: bool = True, oobs=None, **kwargs) -> Dict[str, Tensor]:
"""
Execute an epoch in the walkers' dynamics.
Args:
inplace (bool, optional): Whether to perform the operation inplace. Defaults to True.
oobs: Out of bounds flags.
**kwargs: Additional keyword arguments.
Returns:
StateData: State data after the epoch execution.
"""
scores = self.score(**kwargs)
diversities = self.diversity(oobs=oobs, **kwargs)
virtual_rewards = self.calculate_virtual_reward(**{**scores, **diversities})
clone_data = self.calculate_clones(oobs=oobs, **virtual_rewards)
actives = clone_data.get("actives")
if actives is not None and self.freeze_walkers:
self.swarm.state.update_actives(actives)
if inplace:
self.clone_walkers(**clone_data)
return {**scores, **diversities, **virtual_rewards, **clone_data}
[docs] def calculate_virtual_reward(self, scores, diversities, **kwargs):
"""
Calculate the virtual rewards for walkers based on scores and diversities.
Args:
scores (array): Array of walkers' scores.
diversities (array): Array of walkers' diversities.
**kwargs: Additional keyword arguments.
Returns:
dict: A dictionary containing the virtual rewards.
"""
scores = -1.0 * scores if self.minimize else scores
norm_scores = relativize(scores)
norm_diver = relativize(diversities)
virtual_rewards = norm_scores**self.score_scale * norm_diver**self.diversity_scale
return {"virtual_rewards": virtual_rewards}
[docs] def calculate_clones(self, virtual_rewards, oobs=None) -> Dict[str, Tensor]:
"""
Calculate the walkers that will clone and their target companions.
Args:
virtual_rewards (array): Array of walkers' virtual rewards.
oobs (array, optional): Array of out-of-bounds flags for walkers. Defaults to None.
Returns:
dict: A dictionary containing clone probabilities, will_clone flags, \
companion clones, and active flags.
"""
n_walkers = len(virtual_rewards)
all_virtual_rewards_are_equal = (virtual_rewards == virtual_rewards[0]).all()
terminals = self.swarm.get("terminals")
if all_virtual_rewards_are_equal:
clone_probs = judo.zeros(n_walkers, dtype=dtype.float)
compas_clone = judo.arange(n_walkers)
else:
compas_clone = self.get_in_bounds_compas(oobs)
# This value can be negative!!
clone_probs = (virtual_rewards[compas_clone] - virtual_rewards) / virtual_rewards
prob_trigger = judo.abs(clone_probs) > random_state.random_sample(n_walkers)
will_clone = judo.logical_and(prob_trigger, clone_probs > 0)
actives = judo.logical_not(judo.logical_and(prob_trigger, clone_probs < 0))
if oobs is not None:
will_clone[oobs] = True # Out of bounds walkers always clone
if terminals is not None:
will_clone[terminals] = False # Terminal walkers do not clone
actives[terminals] = False
return dict(
clone_probs=clone_probs,
will_clone=will_clone,
compas_clone=compas_clone,
actives=actives,
)
[docs] def reset(self, inplace: bool = True, **kwargs) -> None:
"""
Reset the Walkers object and its score and diversity components.
Args:
inplace (bool, optional): Whether to perform the operation inplace. Defaults to True.
**kwargs: Additional keyword arguments.
"""
super(Walkers, self).reset(inplace=inplace, **kwargs)
self.score.reset(**kwargs)
self.diversity.reset(**kwargs)
[docs]class ExplorationWalkers(Walkers):
def __init__(self, exploration_scale: float = 1.0, **kwargs):
super(ExplorationWalkers, self).__init__(**kwargs)
self._explore_counts = defaultdict(0)
self.exploration_scale = exploration_scale
@property
def explore_counts(self) -> Dict[Any, int]:
return self._explore_counts
[docs] def calculate_virtual_reward(self, scores, diversities, **kwargs):
"""Apply the virtual reward formula to account for all the different goal scores."""
vr_dict = super(ExplorationWalkers, self).calculate_virtual_reward(
scores,
diversities,
**kwargs,
)
explore_reward = relativize(self.get_explore_rewards())
virtual_rewards = vr_dict["virtual_rewards"] * explore_reward**self.exploration_scale
return {"virtual_rewards": virtual_rewards}
[docs] def get_explore_rewards(self):
coords_keys = self.get_coords_keys()
for key in coords_keys:
self._explore_counts[key] += 1
return 1 / judo.tensor([self.explore_counts[k] for k in coords_keys])
[docs] def get_coords_keys(self):
keys = []
for info in self.get("infos"):
in_bonus_level = info.get("screen_x_end", 1) == 0 and info.get("x", -1) != 0
act = info.get("act") if not in_bonus_level else 0.5
key = (info.get("zone"), act, int(info.get("x", 0) / 100), int(info.get("y", 0) / 100))
keys.append(key)
return keys
[docs]class NoBalance(Walkers):
"""
A class representing walkers with no balancing behavior in a swarm.
Inherits from the Walkers class and modifies the properties and methods to disable balancing.
"""
@property
def param_dict(self) -> StateDict:
return self.score.param_dict
@property
def inputs(self) -> InputDict:
return self.score.inputs
@property
def outputs(self) -> Tuple[str, ...]:
return self.score.outputs
[docs] def run_epoch(self, inplace: bool = True, oobs=None, **kwargs) -> Dict[str, Tensor]:
"""
Run an epoch for the walkers without balancing.
Args:
inplace (bool, optional): Whether to run the epoch in-place. Defaults to True.
oobs (array, optional): Array of out-of-bounds walkers. Defaults to None.
**kwargs: Additional keyword arguments.
Returns:
dict: A dictionary containing the scores.
"""
scores = self.score(**kwargs)
return scores