Source code for XLEMOO.LEMOO

from dataclasses import dataclass
from abc import ABC, abstractmethod
from typing import List, Callable, Optional, Union, Dict

from desdeo_emo.population import (
    SurrogatePopulation,
    create_new_individuals,
    Population,
)
from desdeo_emo.recombination import SBX_xover, BP_mutation
from desdeo_emo.selection import TournamentSelection
from desdeo_problem.problem import MOProblem

import numpy as np

from sklearn.tree import DecisionTreeClassifier
from imodels import SlipperClassifier, BoostedRulesClassifier, SkopeRulesClassifier

from XLEMOO.tree_interpreter import find_all_paths, instantiate_tree_rules
from XLEMOO.ruleset_interpreter import (
    extract_slipper_rules,
    instantiate_ruleset_rules,
    extract_skoped_rules,
)
from XLEMOO.selection import SelectNBest


[docs]class CrossOverOP(ABC): """An abstract class that defines the general interface for a crossover operator."""
[docs] @abstractmethod def do(self, population: Population, mating_pop_ids: List[int]) -> np.ndarray: """The crossover operator should have a 'do' method as specified in the abstract class. Args: population (Population): A population of solutions. mating_pop_ids (List[int]): The indices selected for mating. Returns: np.ndarray: The offspring individuals resulting from applying the mating operator. """ pass
CrossOverOP.register(SBX_xover)
[docs]class MutationOP(ABC): """An abstract class that defines the general interface for a mutation operator."""
[docs] @abstractmethod def do(self, offsprings: np.ndarray) -> np.ndarray: """The mutation operator should have a 'do' method as specified in the abstract class. Args: offsprings (np.ndarray): The offspring (or individuals from a population) to be mutated. Returns: np.ndarray: The mutated offsprings. """ pass
MutationOP.register(BP_mutation)
[docs]class SelectionOP(ABC): """An abstract class that defines the general interface for a selection operator."""
[docs] @abstractmethod def do(self, pop: Population, fitness: np.ndarray) -> List[int]: """The selection operator should have a 'do' method as specified in the abstract class. Args: pop (Population): The population the selection operator is applied to. fitness (np.ndarray): The fitness of the individuals in the population. It is assumed the fitnesses are related to the individuals in the population by index. Returns: List[int]: A list of indices indicating which individuals in the population have been selected. """ pass
SelectionOP.register(TournamentSelection) SelectionOP.register(SelectNBest)
[docs]class ElitismOP(ABC): """An abstract class that defines the general interface for an elitism operator."""
[docs] @abstractmethod def do( self, pop1: Population, pop1_fitness: np.ndarray, pop2: Population, pop2_fitness: np.ndarray, ) -> np.ndarray: """The elitism operator should have a 'do' method as specified in the abstract class. Two populations are compared by the elitism operator from which the best individuals, according to their fitness values, are selected and returned. Args: pop1 (Population): The first population. pop1_fitness (np.ndarray): The fitness values in the first population. It is assumed the fitnesses are related to the individuals in the population by index. pop2 (Population): The second population. pop2_fitness (np.ndarray): The fitness values in the second population. It is assumed the fitnesses are related to the individuals in the population by index. Returns: np.ndarray: The elite individuals from resulting by applying the elitism operator. """ pass
[docs]class MLModel(ABC): """An abstract class that defines the general interface for a machine learning model used in a LEMOO method's learning mode. """
[docs] @abstractmethod def fit(self, X: np.ndarray, Y: np.ndarray): """The machine learning model should have a 'fit' method which allows training the model based on data. Args: X (np.ndarray): Training samples. E.g., n-dimensional vectors of real values floats. Y (np.ndarray): The targets. E.g., a 1-dimensional vector of 0s and 1s for binary classification. """ pass
[docs] @abstractmethod def predict(self, X: np.ndarray) -> np.ndarray: """The machine learning model should have a 'predict' method that can be used to predict the, e.g., class in binary classification, or a sample, or samples. Args: X (np.ndarray): Sample or sample a class should be predicted for. Returns: np.ndarray: The predicted classes for the samples. """ pass
# Register supported ML models here MLModel.register(DecisionTreeClassifier) MLModel.register(SlipperClassifier) MLModel.register(BoostedRulesClassifier) MLModel.register(SkopeRulesClassifier)
[docs]class DummyPopulation: """Used for testing.""" def __init__(self, individuals: np.ndarray, problem: MOProblem): self.individuals = individuals self.problem = problem
[docs]@dataclass class EAParams: """A data class to store and pass parameter values related to the Darwinian mode of a LEMOO method. Args: population_size (int): The size of the population to be evolved. cross_over_op (CrossOverOP): The crossover operator. mutation_op (MutationOP): The mutation operator. selection_op (SelectionOP): The selection operator. population_init_design (str): Initialization strategy of the population. Should be 'Random' for random or 'LHSDesign' for latin hypercube sampling. iterations_per_cycle (int): How many times a population is evolved in a Darwinian mode before switching to a learning mode. Only relevant when a LEMOO method is run using the ``run_iterations`` method. """ population_size: int cross_over_op: CrossOverOP mutation_op: MutationOP selection_op: SelectionOP population_init_design: str iterations_per_cycle: int
[docs]@dataclass class MLParams: """A data class to store and pass parameter values related to the learning mode of a LEMOO method. Args: H_split (float): The splitting ratio of 'high performing' population individuals. E.g., a H_split of 0.10 means that 10% of the best performing population individuals are labeled as high performing during a learning mode. L_split (float): Same as H_split, but for the 'low performing' population individuals. ml_model (MLModel): The machine learning model to be used in a learning mode. instantiation_factor (float): A multiplier used to determine how many new population individuals are instantiated in a learning mode after hypothesis forming. E.g., a factor of 2.0 means that 2.0*N_population new population individuals are instantiated based on the learned hypothesis, where N_population is the size of the population in the LEMOO method. generation_lookback (int): How many older generations to consider in a learning mode. E.g., a lookback of 5 means that the 5 most recent population are considered when forming a hypothesis. ancestral_recall (int): This is like generation_lookback, but considers a specific number of the oldest populations. E.g., a recall of 5 will consider the five first populations. unique_only (bool): Whether to consider unique population individuals only when learning a hypothesis. iterations_per_cycle (int): How many times a population is ''evolved'' in a learning mode before switching to a Darwinian mode. A good default is 1. Only relevant when a LEMOO method is run using the ``run_iterations`` method. """ H_split: float L_split: float ml_model: MLModel instantiation_factor: float generation_lookback: int ancestral_recall: int unique_only: bool iterations_per_cycle: int
[docs]@dataclass class LEMParams: """A data class to store and pass general parameter values of a LEMOO method. Args: use_ml (bool): Whether to engage in a learning mode or not. use_darwin (bool): Whether to engage in a Darwinian mode or not. fitness_indicator (Callable[[np.ndarray, Optional[np.ndarray]], np.ndarray]): A fitness function that accepts a 2d numpy array that represents the population individuals in the objective space of the problem. Optionally, the decision variable values may also be passed for each population individual. ml_probe (int): The maximum time a learning mode is executed when a threshold is not reached. Only relevant when a LEMOO method is executed using the ``run`` method. darwin_probe (int): like ``ml_probe`` but for a Darwinian mode. ml_threshold (float): The relative improvement of the best population individual's fitness expected before switching out of a learning mode. E.g., a threshold of 1.05 means that executing a learning mode stops when the best population individual has improved by 5% when compared to the previous population's best individual. darwin_threshold (float): Like ``ml_threshold`` but for a Darwinian mode. total_iterations (int): Overall maximum number of iterations to be run. Only relevant when the ``run_iterations`` method of a LEMOO model is used. """ use_ml: bool use_darwin: bool fitness_indicator: Callable[[np.ndarray, Optional[np.ndarray]], np.ndarray] ml_probe: int darwin_probe: int ml_threshold: float darwin_threshold: float total_iterations: int
[docs]@dataclass class PastGeneration: """A helper data class representing past generation with the individuals (decision space) and their corresponding objective fitness values and fitness function values. Args: individuals (np.ndarray): The individuals of a population in the decision variable space. individuals (np.ndarray): The individuals of a population in the objective function space. fitness_fun_values (np.ndarray): The fitness function values of each individual. """ individuals: np.ndarray objectives_fitnesses: np.ndarray fitness_fun_values: np.ndarray
[docs]class LEMOO: """A class to define LEMOO models. Args: problem (MOProblem): The multiobjective optimization problem to be solved as defined in the DESDEO framework. lem_params (LEMParams): A dataclass with parameters relevant to the LEM part of the LEMOO method. See the dataclass' documentation for additional details. ea_params (EAParams): A dataclass with parameters relevant to the Darwin mode of the LEMOO method. See the dataclass' documentation for additional details. ml_params (MLParams): A dataclass with parameters relevant to the learning mode of the LEMOO method. See the dataclass' documentation for additional details. Attributes: current_ml_model (MLModel): The current machine learning model employed in the learning mode. _population (Union[None, SurrogatePopulation]): The current population of solutions. _best_fitness_fun_value (float): The current best fitness value found. _generation_history(List[PastGeneration]): A list to keep track fo the population histories during the executing of the LEMOO model. """ def __init__( self, problem: MOProblem, lem_params: LEMParams, ea_params: EAParams, ml_params: MLParams, ): self._problem: MOProblem = problem self._lem_params: LEMParams = lem_params self._ea_params: EAParams = ea_params self._ml_params: MLParams = ml_params self.current_ml_model: MLModel = ml_params.ml_model self._population: Union[None, SurrogatePopulation] = None self._best_fitness_fun_value: float = np.inf # lower is better! # initialize the population and the evolutionary operators self._generation_history: List[PastGeneration] = [] self.initialize_population()
[docs] def initialize_population(self) -> None: """Use the defined initialization design to init a new population, add the initial population to the history of populations.""" initial_population = create_new_individuals( self._ea_params.population_init_design, self._problem, pop_size=self._ea_params.population_size, ) self._population = SurrogatePopulation( self._problem, self._ea_params.population_size, initial_population, self._ea_params.cross_over_op, self._ea_params.mutation_op, None, ) self.add_population_to_history() self.update_best_fitness() return
[docs] def reset_population(self) -> None: """Reset the current population. Do not add the new population to history.""" initial_population = create_new_individuals( self._ea_params.population_init_design, self._problem, pop_size=self._ea_params.population_size, ) self._population = SurrogatePopulation( self._problem, self._ea_params.population_size, initial_population, self._ea_params.cross_over_op, self._ea_params.mutation_op, None, ) return
[docs] def reset_generation_history(self) -> None: """Reset the population history.""" self._generation_history = []
[docs] def update_best_fitness(self) -> bool: """Find the best fitness value in the current population and update the value stored in self._best_fitness_fun_value if the found fitness is better. Returns: bool: True if the best fitness was updated, otherwise False. """ fitness_fun_values = self._lem_params.fitness_indicator( self._population.fitness, self._population.individuals ) min_value = np.min(fitness_fun_values) if min_value < self._best_fitness_fun_value: self._best_fitness_fun_value = min_value return True else: return False
[docs] def add_population_to_history( self, individuals: Optional[np.ndarray] = None, objectives_fitnesses: Optional[np.ndarray] = None, ) -> None: """Add a population to the history of the LEMOO model. Args: individuals (Optional[np.ndarray], optional): The decision variables values of the population individuals. Defaults to None. objectives_fitnesses (Optional[np.ndarray], optional): The corresponding objective function values of the population individuals. Defaults to None. Note: If both arguments are ``None``, then the current population in the LEMOO model is added to the history. """ if individuals is not None and objectives_fitnesses is not None: # add supplied individuals and fitnesses to history fitness_fun_values = self._lem_params.fitness_indicator( objectives_fitnesses, individuals ) gen = PastGeneration(individuals, objectives_fitnesses, fitness_fun_values) self._generation_history.append(gen) return else: # add current population to history fitness_fun_values = self._lem_params.fitness_indicator( self._population.fitness, self._population.individuals ) gen = PastGeneration( self._population.individuals, self._population.fitness, fitness_fun_values, ) self._generation_history.append(gen) return
[docs] def update_population(self, new_individuals: np.ndarray) -> None: """Replace the current population of the LEMOO model with a new one. Args: new_individuals (np.ndarray): The new population individuals in the decision variable space. """ self._population.delete(np.arange(len(self._population.individuals))) self._population.add(new_individuals) return
[docs] def collect_n_past_generations( self, n: int, ancestral_recall: int = 0, unique_only=False ): """Collect the n past generations into a single numpy array for easier handling. Returns the collected individuals, objective fitness values, and fitness function values. Args: n (int): number of past generations to collect. Returns: Tuple[nd.array, np.ndarray, np.ndarray]: A tuple with the collected individuals, objective fitness values, and fitness function values. Each array is 2-dimensional. """ if n > len(self._generation_history): # n bigger than available history, truncate to length of history n = len(self._generation_history) # also set ancestral recall to zero since it will be included in any case ancestral_recall = 0 past_slice = self._generation_history[-n:] if ancestral_recall > 0: ancestral_slice = self._generation_history[0:ancestral_recall] past_slice = ancestral_slice + past_slice individuals = np.concatenate([gen.individuals for gen in past_slice]) objectives_fitnesses = np.concatenate( [gen.objectives_fitnesses for gen in past_slice] ) fitness_fun_values = np.concatenate( [gen.fitness_fun_values for gen in past_slice] ) if unique_only: # return only unique individuals, _, unique_inds = np.unique(individuals, return_index=True, axis=0) individuals = individuals[unique_inds] objectives_fitnesses = objectives_fitnesses[unique_inds] fitness_fun_values = fitness_fun_values[unique_inds] return ( np.atleast_2d(individuals), np.atleast_2d(objectives_fitnesses), np.atleast_2d(fitness_fun_values), )
[docs] def darwinian_mode(self) -> None: """Execute Darwinian mode. The size of the population can vary.""" # mate offspring = self._population.mate() self._population.add(offspring, False) fitness_fun_values = self._lem_params.fitness_indicator( self._population.fitness, self._population.individuals ) selected = self._ea_params.selection_op.do(self._population, fitness_fun_values) self._population.keep(selected) return
[docs] def learning_mode(self) -> None: """Execute learning mode.""" instantiation_factor = self._ml_params.instantiation_factor if self._ml_params.generation_lookback <= 0: lookback_n = len(self._generation_history) else: # if generation lookback is specified, then use at most that many past generations from the current generation lookback_n = ( self._ml_params.generation_lookback if self._ml_params.generation_lookback < len(self._generation_history) else len(self._generation_history) ) ( all_individuals, all_objectives_fitnesses, all_fitness_fun_values, ) = self.collect_n_past_generations( lookback_n, unique_only=self._ml_params.unique_only ) if not isinstance(self._ml_params.ml_model, MLModel): raise TypeError( f"MLModel of type {type(self._ml_params.ml_model)} is not supported in learning mode." ) sorted_indices = np.argsort(np.squeeze(all_fitness_fun_values)) # formulate the H and L groups # calculate the cut-off indices for both groups, the split might not be exactly # accurate due to the cast to int # if H or L split are more than 1, use that many samples in both groups, respectively if self._ml_params.H_split < 1.0: h_split_id = int(self._ml_params.H_split * len(sorted_indices)) else: if self._ml_params.H_split > (all_individuals.shape[0] / 2): # defualt to half if H is alrger than half of the individuals h_split_id = int(all_individuals.shape[0] / 2) else: h_split_id = int(self._ml_params.H_split) if self._ml_params.L_split < 1.0: l_split_id = int(self._ml_params.L_split * len(sorted_indices)) else: if self._ml_params.L_split > (all_individuals.shape[0] / 2): # default to half if L is larger than half of all individuals l_split_id = int(all_individuals.shape[0] / 2) else: l_split_id = int(self._ml_params.L_split) # because the indices are now sorted, we can just pick the top best and bottom worst # and set them as the H and L groups h_indices = sorted_indices[0:h_split_id] l_indices = sorted_indices[-l_split_id:][ ::-1 ] # reversing might not really be needed here # pick the individuals according to the calculated indices h_group = all_individuals[h_indices] l_group = all_individuals[l_indices] # create training data to train an ML model: x_train = np.vstack((h_group, l_group)) if isinstance(self._ml_params.ml_model, DecisionTreeClassifier): # for tree, 1 good, -1 bad y_train = np.hstack( ( np.ones(len(h_group), dtype=int), -1 * np.ones(len(l_group), dtype=int), ) ) elif ( isinstance(self._ml_params.ml_model, SlipperClassifier) or isinstance(self._ml_params.ml_model, BoostedRulesClassifier) or isinstance(self._ml_params.ml_model, SkopeRulesClassifier) ): # 1: target, 0: other y_train = np.hstack( (np.ones(len(h_group), dtype=int), np.zeros(len(l_group), dtype=int)) ) else: raise TypeError( f"MLModel of type {type(self._ml_params.ml_model)} is not supported in learning mode." ) n_to_instantiate = int(all_individuals.shape[0] * instantiation_factor) # TODO: this is tree specific, check if tree or rule set! # if isinstance(self._ml_params.ml_model, DecisionTreeClassifier): # do tree stuff classifier = self._ml_params.ml_model.fit(x_train, y_train) self.current_ml_model = classifier # after training the model, it must be used to instantiate new individuals according to # the H-group description. I.e., find new individuals that have a classification of 1. # by default, instantiate twice as many new individuals as present in the whole history. # this factor is controlled by the instantiation_factor argument (default 2) paths = find_all_paths(classifier) instantiated = instantiate_tree_rules( paths, self._problem.n_of_variables, self._problem.get_variable_bounds(), n_to_instantiate, 1, ) # reshape to have just a list of new individuals instantiated = instantiated.reshape((-1, instantiated.shape[2])) elif ( isinstance(self._ml_params.ml_model, SlipperClassifier) or isinstance(self._ml_params.ml_model, BoostedRulesClassifier) or isinstance(self._ml_params.ml_model, SkopeRulesClassifier) ): # do ruleset stuff classifier = self._ml_params.ml_model.fit(x_train, y_train) self.current_ml_model = classifier if isinstance(self._ml_params.ml_model, SlipperClassifier) or isinstance( self._ml_params.ml_model, BoostedRulesClassifier ): # Slipper rules ruleset, weights = extract_slipper_rules(classifier) elif isinstance(self._ml_params.ml_model, SkopeRulesClassifier): # Skoped rules ruleset, weights = extract_skoped_rules(classifier) else: raise ValueError( f"The current classifier {self._ml_params.ml_model} has no supported rule extractions." ) instantiated = instantiate_ruleset_rules( ruleset, weights, self._problem.n_of_variables, self._problem.get_variable_bounds(), n_to_instantiate, ) else: raise TypeError( f"MLModel of type {type(self._ml_params.ml_model)} is not supported in learning mode." ) # mix with the existing H-group instantiated_and_h = np.vstack((h_group, instantiated)) # compute objectives fitnesses objective_fitnesses_new = self._problem.evaluate(instantiated_and_h).fitness # compute fitness fun values fitness_fun_values_new = self._lem_params.fitness_indicator( objective_fitnesses_new, instantiated_and_h ) # sort the individuals according to their fitness value in ascending order sorted_indices_new = np.argsort(np.squeeze(fitness_fun_values_new)) # pic the best fraction that is the same size as the last population selected_indices = sorted_indices_new[0 : self._population.individuals.shape[0]] selected_individuals = instantiated_and_h[selected_indices] # replace the current population with the new selected individuals self.update_population(selected_individuals) return
[docs] def check_condition_best(self, n_lookback: int, threshold: float) -> bool: """ Check whether the Darwin termination criterion is met. In the past n_lookback iterations. Return True and update current best value if the condition is met, just return False otherwise. Args: n_lookback (int): How many generations to look back to. threshold (float): The relative improvement expected in regard to the best fitness value. E.g., a threshold of 1.05 means a 5% improvement is expected. Returns: bool: True if the threshold is met. False otherwise. """ # get past generations ( past_individuals, past_objectives_fitnesses, past_fitness_fun_values, ) = self.collect_n_past_generations(n_lookback) # find index of best (lowest fitness value) individual best_idx = np.argmin(past_fitness_fun_values) # check condition if ( (past_fitness_fun_values[best_idx] / self._best_fitness_fun_value) ) < threshold: self._best_fitness_fun_value = past_fitness_fun_values[best_idx][0] return True else: return False
[docs] def run(self) -> Dict: """ Run the LEMOO model. Switching between the Darwinian mode and learning mode happens when the fitness of the best population individual has improved past a threshold or when a maximum number of iterations has been executed in a mode. Returns: Dict: A dictionary with counters indicating how many times the Darwinian and learning modes have been executed, and total iterations. """ # counters: counters = {"darwin_mode": 0, "learning_mode": 0} keep_running = True while keep_running: improved_in_learning = False improved_in_darwin = False # start in ML mode if self._lem_params.use_ml: learning_iters = 0 while learning_iters < self._lem_params.ml_probe: # always assume previous population has been saved to history before self.learning_mode() self.add_population_to_history() counters["learning_mode"] += 1 learning_iters += 1 # check generations saved so far in learning more if they meet the termination criterion if self.check_condition_best( learning_iters, self._lem_params.ml_threshold ): improved_in_learning = True break else: improved_in_learning = False # do Darwinian mode if self._lem_params.use_darwin: darwin_iters = 0 while darwin_iters < self._lem_params.darwin_probe: # we assume the population has been saved in a previous iteration self.darwinian_mode() self.add_population_to_history() counters["darwin_mode"] += 1 darwin_iters += 1 # iterate until condition is True # check the generations saved so far in Darwin mode, that is # why we keep the darwin_iters counter. if self.check_condition_best( darwin_iters, self._lem_params.darwin_threshold ): improved_in_darwin = True break else: improved_in_darwin = False if not improved_in_learning and not improved_in_darwin: keep_running = False return counters
[docs] def run_iterations(self) -> Dict: """ Run the LEMOO model. The Darwinian mode and learning mode are always executed for a set number of iterations. Thresholds are ignored. Returns: Dict: A dictionary with counters indicating how many times the Darwinian and learning modes have been executed, and total iterations. """ # counters: counters = {"darwin_mode": 0, "learning_mode": 0} total_iterations: int = 0 while total_iterations < self._lem_params.total_iterations: # Darwinian (ea) mode if self._lem_params.use_darwin: iterations_in_ea: int = 0 while iterations_in_ea < self._ea_params.iterations_per_cycle: # we assume the population has been saved in a previous iteration self.darwinian_mode() self.add_population_to_history() counters["darwin_mode"] += 1 iterations_in_ea += 1 # start in ML mode if self._lem_params.use_ml: iterations_in_ml: int = 0 while iterations_in_ml < self._ml_params.iterations_per_cycle: # always assume previous population has been saved to history before self.learning_mode() self.add_population_to_history() counters["learning_mode"] += 1 iterations_in_ml += 1 total_iterations += 1 counters.update({"total_iterations": total_iterations}) return counters
if __name__ == "__main__": print(issubclass(SBX_xover, CrossOverOP))