Source code for qnetsur.utils

"""
Central script containing objects for surrogate optimization and helper functions.
"""

import numpy as np
import time
import pandas as pd
from scipy.stats import truncnorm
from functools import partial
import multiprocessing as mp

from sklearn.model_selection import cross_val_score
from sklearn.svm import SVR
from sklearn.tree import DecisionTreeRegressor
from sklearn.multioutput import MultiOutputRegressor
from sklearn.metrics import mean_absolute_error, make_scorer

[docs] class Simulation: """ Object to run quantum network simulations. It allows for the specification of both fixed and variable simulation parameters, supports the generation of random parameter sets, and facilitates the running of simulations through a user-defined simulation function. Parameters ---------- sim_wrapper : function The wrapper function that preprocesses simulation parameters before execution and adapts outputs according to needs of use case. sim : function The actual simulation function to be executed. rng : class Random number generator object to be used. values : dict Fixed parameters for the simulation. variables : dict, optional Variable parameters for the simulation, allowing for dynamic adjustments. Methods ------- run_sim(x, vals=None) Executes a single simulation run with the given parameters. run_exhaustive(x, N=10, seed=42) Executes `N` simulations in parallel. get_random_x(n) Generates `n` sets of random parameters based on the specified variable parameters. """ def __init__(self, sim_wrapper, sim, rng, values, variables): self.rng = rng # specify fixed parameters self.vals = values # specify variable parameters self.vars = variables # simulation function handler self.sim_wrapper = sim_wrapper # simulation function self.sim = sim
[docs] def run_sim(self, x :dict) -> list: """ Runs the quantum network simulation with the provided parameters. Args: x (dict): Parameters for the simulation. Returns: list: Results of the simulation. """ xrun = {**self.vals, **x} res = self.sim_wrapper(self.sim, xrun) return res
[docs] def run_exhaustive(self, x :dict, N=10, seed=42) -> list: """ Runs the quantum network simulation with the provided parameters N times in parallel. """ xrun = {**x, **self.vals} task = partial(self.sim_wrapper, self.sim) with mp.Pool(processes=N) as pool: res = pool.map(task, [{**xrun, **{'seed': (i+1)*seed}} for i in range(N)]) pool.close() pool.join() return res
[docs] def get_random_x(self, n :int) -> dict: """ Generates random parameters for the simulation. """ assert all(isinstance(val, tuple) for val in self.vars['range'].values()) and n > 0,\ f"Dimension types must be a tuple (sample-list, dataype) and n must be greater zero." x = {} for dim, par in self.vars['range'].items(): vals = par[0] if par[1] == 'int': x[dim] = self.rng.integers(vals[0], vals[1], n) if n > 1\ else self.rng.integers(vals[0], vals[1]) elif par[1] == 'float': x[dim] = self.rng.uniform(vals[0], vals[1], n) if n > 1\ else self.rng.uniform(vals[0], vals[1]) else: raise Exception('Datatype must be "int" or "float".') for dim, vals in self.vars['ordinal'].items(): x[dim] = self.rng.choice(vals, size=n) if n > 1\ else self.rng.choice(vals) for dim, vals in self.vars['choice'].items(): x[dim] = self.rng.choice(vals, n) if n > 1\ else self.rng.choice(vals) return x
[docs] class Surrogate(Simulation): """ Initializes the Surrogate model with a simulation wrapper, the actual simulation function, and a set of fixed and variable parameters. It sets up the environment for running surrogate model-based optimizations, including the multiprocessing setup for parallel simulations. Parameters ---------- sim_wrapper : function A function that wraps around the actual simulation to perform pre-processing of simulation parameters. sim : function The actual simulation function to be optimized. initial_training_size : int The number of configurations to use for the initial surrogate model training. ntop : int The number of configurations to use in acquisition process degree : int, optional Exponent in acquisition transition function (exploitation degree). Attributes ---------- X : dict Training set parameters. X_df : pandas.DataFrame DataFrame version of `X` for easier manipulation and model training. y : list Simulation results corresponding to each set of parameters in `X`. y_std : list Standard deviation of simulation results, providing insight into result variability. model : class Regression model used for surrogate modeling. Default is SVR (Support Vector Regression) from scikit-learn. mmodel : class Multi-output wrapper for the regression model, allowing it to output multiple predictions. mmodel_std : class Separate multi-output model for predicting the standard deviation of simulation results. build_time : list Time taken to build or update the surrogate model. sim_time : list Time taken for simulation runs. optimize_time : list Time taken for optimization processes. procs : int Number of processes to use for parallel simulations, based on CPU count. Methods ------- get_neighbour(max_time, current_time, x) Generates a neighbouring set of parameters based on current optimization state. acquisition(max_T, current_t) Selects new parameters for evaluation by balancing exploration and exploitation. update() Updates the surrogate model with new data points collected from simulation runs. optimize(max_T) Conducts the optimization process to find optimal simulation parameters. """ def __init__(self, sim_wrapper, sim, rng, values, variables, initial_training_size, ntop, degree=4, max_data_size=100000): super().__init__(sim_wrapper, sim, rng, values, variables) assert initial_training_size>=5, f"Sample size must be at least 5 (requirement for 5-fold cross validation)." assert max_data_size>=100, f"Less than 100 values in the dataset does not allow for meaningful optimization." # storage for time profiling self.sim_time = [] self.build_time = [] self.acquisition_time = [] self.optimize_time = [] # set multiprocessing self.procs = min(mp.cpu_count(),ntop) print('Available processors for parallel execution: ', self.procs) self.init_size = initial_training_size self.ntop = ntop # storage target value self.y = [] self.y_std = [] # storage raw values self.y_raw = [] # storage model declaration self.model = SVR() self.model_scores = {'SVR': [], 'DecisionTree': []} self.degree = degree # coefficient in neighbour selection self.max_data_size = max_data_size
[docs] def get_neighbour(self, x :dict) -> dict: """ Generates most promising parameters in limited neighbouring region according to current knowledge of surrogate model and depending on the time left. """ if self.isscore: f_svr = (1-np.log(1+(self.model_scores['SVR'][0]-self.model_scores['SVR'][-1])/self.model_scores['SVR'][0])**2)**self.degree f_tree = (1-np.log(1+(self.model_scores['DecisionTree'][0]-self.model_scores['DecisionTree'][-1])/self.model_scores['DecisionTree'][0])**2)**self.degree f = f_svr if f_svr < f_tree else f_tree size = int((1-f)*10000 + 10) elif self.ntop > 500: f = 1-np.log(1+self.current_time_counter/self.limit)**2 size = int(self.current_time_counter/self.limit*250 + 10) elif self.ntop > 100: f = 1-np.log(1+self.current_time_counter/self.limit)**2 size = int(self.current_time_counter/self.limit*500 + 10) else: f = (1-np.log(1+self.current_time_counter/self.limit)**2)**self.degree size = int(self.current_time_counter/self.limit*10000 + 10) x_n = {} for dim, par in self.vars['range'].items(): vals = par[0] if par[1] == 'int': std = f * (vals[1] - vals[0])/2 x_n[dim] = truncnorm.rvs((vals[0] - x[dim]) / std, (vals[1] - x[dim]) / std, loc=x[dim], scale=std, size=size, random_state=self.rng).astype(int) elif par[1] == 'float': std = f * (vals[1] - vals[0])/2 x_n[dim] = truncnorm.rvs((vals[0] - x[dim]) / std, (vals[1] - x[dim]) / std, loc=x[dim], scale=std, size=size, random_state=self.rng) else: raise Exception('Datatype must be "int" or "float".') for dim, vals in self.vars['ordinal'].items(): pos = x[dim] # current position loc = vals.index(pos)/len(vals) # corresponding location between 0 and 1 pval = np.linspace(0,1,len(vals)) std = f/2 probs = truncnorm.pdf(pval, (0-loc)/std, (1-loc)/std, scale=std, loc=loc, random_state=self.rng)/len(x) # corresponding weights probs /= probs.sum() # normalize probabilities x_n[dim] = self.rng.random.choice(vals, size=size , p=probs) for dim, vals in self.vars['choice'].items(): x_n[dim] = self.rng.random.choice(vals, size) # select best prediction as next neighbour samples_x = pd.DataFrame(x_n).astype(object) samples_y = self.mmodel.predict(samples_x.values) fittest_neighbour_index = np.argsort(np.array(samples_y).sum(axis=1))[-1] x_fittest = samples_x.iloc[fittest_neighbour_index].to_dict() return x_fittest
[docs] def acquisition(self) -> None: """ Computes n new data points according to estimated improvement and degree of exploration and adds the data to the training sample. """ start = time.time() y_obj_vec = np.sum(self.y, axis=1) newx = [] if self.verbose: print('Start sort ...') top_selection = self.X_df.iloc[np.argsort(y_obj_vec)[-self.ntop:]] # select top n candidates if self.verbose: print('Sort done. Retrieve promising neighbours ...') for x in top_selection.iloc: # get most promising neighbour according to surrogate neighbour = self.get_neighbour(x=x.to_dict()) newx.append(neighbour) if self.verbose: print('Neighbours retrieved.') self.X_df_add = pd.DataFrame.from_records(newx).astype(object) self.acquisition_time.append(time.time()-start)
[docs] def run_multiple_and_add_target_values(self, X) -> None: start = time.time() if self.issequential: y_temp = [] for x in X: y_temp.append(self.run_sim(x)) else: with mp.Pool(processes=self.procs) as pool: y_temp = pool.map(self.run_sim, X) pool.close() pool.join() self.sim_time.append(time.time() - start) # measure simulation time # add new data for y_i in y_temp: yi, yi_std, *yi_raw = y_i self.y.append(yi) self.y_std.append(yi_std) self.y_raw += yi_raw
[docs] def train_models(self) -> None: """ Trains the machine lerning models on current data set and chooses the one with smaller error to use for prediction. """ # train/update surrogate models start = time.time() current_min = np.nanmin(self.y) # nan value handling self.y = np.nan_to_num(self.y, copy=True, nan=current_min, posinf=current_min, neginf=current_min).tolist() self.y_std = np.nan_to_num(self.y_std, copy=True, nan=0, posinf=0, neginf=0).tolist() if len(self.X_df) > self.max_data_size: # train models on top max_data_size samples (these are simple models best used with small to medium data set sizes) y_obj = np.sum(self.y, axis=1) top_indices = np.argsort(y_obj)[-self.max_data_size:] self.X_df = self.X_df.iloc[top_indices] self.y = list(np.array(self.y)[top_indices]) # Test which model performs better in this iterations X_df_cv = self.X_df.drop('Iteration', axis=1).values y_cv = self.y score_svr = cross_val_score(MultiOutputRegressor(SVR()), # get current error of models X_df_cv, y_cv, scoring=make_scorer(mean_absolute_error)).mean() score_tree = cross_val_score(MultiOutputRegressor(DecisionTreeRegressor(random_state=42)), X_df_cv, y_cv, scoring=make_scorer(mean_absolute_error)).mean() if score_svr < score_tree: # set model that currently performs best (smaller error) self.model = SVR() else: self.model = DecisionTreeRegressor(random_state=42) if self.verbose: print(f'MAE: svr = {score_svr} and tree={score_tree}') self.model_scores['SVR'].append(score_svr) self.model_scores['DecisionTree'].append(score_tree) self.mmodel = MultiOutputRegressor(self.model) #self.mmodel_std = MultiOutputRegressor(self.model) self.mmodel.fit(self.X_df.drop('Iteration', axis=1).values, self.y) #self.mmodel_std.fit(self.X_df.drop('Iteration', axis=1).values, self.y_std) self.build_time.append(time.time()-start)
[docs] def update(self, counter) -> None: """ Updates the model with new data. Executes simulation on most promising points found and adds simulation results to training dataset. """ # execute simulation and add target values self.run_multiple_and_add_target_values(X=self.X_df_add.iloc) # add parameter set self.X_df_add['Iteration'] = counter self.X_df = pd.concat([self.X_df, self.X_df_add], axis=0, ignore_index=True) # train models self.train_models()
[docs] def gen_initial_set(self) -> None: """ Generates the initial training set. """ if self.verbose: print("Start optimization ...") optimize_start = time.time() # generate initial training set X start = time.time() X = self.get_random_x(self.init_size) self.X_df = pd.DataFrame(X).astype(object) self.build_time.append(time.time() - start) self.run_multiple_and_add_target_values(X=self.X_df.iloc) self.X_df['Iteration'] = 0 # train models self.train_models() self.initial_optimize_time = time.time()-optimize_start self.optimize_time.append(self.initial_optimize_time) if self.verbose and isinstance(self.limit, int): print(f'Iteration 0/{self.limit}')
[docs] def optimize_with_timer(self) -> None: """ Optimization until a set maximum number of seconds. """ current_times = [] self.current_time_counter = 0 self.max_optimize_time = self.limit - self.initial_optimize_time delta = 0 counter = 1 while (self.current_time_counter + delta) < self.max_optimize_time: start = time.time() self.acquisition() self.update(counter) self.optimize_time.append(time.time()-start) current_times.append(time.time()-start) self.current_time_counter = np.sum(current_times) delta = np.mean(current_times) counter +=1 if self.verbose: print(f'Time left for optimization: {self.max_optimize_time-self.current_time_counter:.2f}s')
[docs] def optimize_with_iteration(self) -> None: """ Optimization until a set maximum number of iterations. """ self.current_time_counter = 1 while self.current_time_counter <= self.limit: start = time.time() self.acquisition() self.update(self.current_time_counter) self.optimize_time.append(time.time()-start) if self.verbose: print(f'Iteration {self.current_time_counter}/{self.limit} done.') self.current_time_counter +=1
[docs] def optimize(self, limit, isscore=False, issequential=False, verbose=False) -> None: """ Conducts the optimization process to find optimal simulation parameters. """ self.limit = limit self.verbose = verbose self.isscore = isscore self.issequential = issequential if isinstance(limit, float): self.limit *= 3600 # to seconds if self.verbose: print(f'Optimize with timer. LIMIT: {self.limit} seconds.') self.gen_initial_set() self.optimize_with_timer() else: if self.verbose: print(f'Optimize with iterator. LIMIT: {self.limit} cycles.') self.gen_initial_set() self.optimize_with_iteration() if self.verbose: print('Optimization finished.')