diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 57c98a0..aeed31a 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -1,5 +1,5 @@ from typing import Callable, Dict, List, Any, Tuple -from pathos.multiprocessing import ProcessingPool as Pool +from pathos.multiprocessing import ProcessingPool as PPool from pandas.core.frame import DataFrame from cadCAD.utils import flatten @@ -44,7 +44,7 @@ def parallelize_simulations( Ns: List[int] ): l = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns)) - with Pool(len(configs_structs)) as p: + with PPool(len(configs_structs)) as p: results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6]), l) return results diff --git a/cadCAD/engine/simulation.py b/cadCAD/engine/simulation.py index 60acd66..4c65ba6 100644 --- a/cadCAD/engine/simulation.py +++ b/cadCAD/engine/simulation.py @@ -1,8 +1,10 @@ from typing import Any, Callable, Dict, List, Tuple +from pathos.pools import ThreadPool as TPool from copy import deepcopy from fn.op import foldr, call from cadCAD.engine.utils import engine_exception +from cadCAD.utils import flatten id_exception: Callable = engine_exception(KeyError, KeyError, None) @@ -142,7 +144,6 @@ class Executor: return simulation_list - # ToDo: Muiltithreaded Runs def simulation( self, var_dict: Dict[str, List[Any]], @@ -153,8 +154,7 @@ class Executor: runs: int ) -> List[List[Dict[str, Any]]]: - pipe_run: List[List[Dict[str, Any]]] = [] - for run in range(runs): + def execute_run(var_dict, states_list, configs, env_processes, time_seq, run) -> List[Dict[str, Any]]: run += 1 states_list_copy: List[Dict[str, Any]] = deepcopy(states_list) head, *tail = self.run_pipeline(var_dict, states_list_copy, configs, env_processes, time_seq, run) @@ -163,6 +163,13 @@ class Executor: genesis: Dict[str, Any] = head.pop() genesis['substep'], genesis['timestep'], genesis['run'] = 0, 0, run first_timestep_per_run: List[Dict[str, Any]] = [genesis] + tail.pop(0) - pipe_run += [first_timestep_per_run] + tail + return [first_timestep_per_run] + tail + + pipe_run: List[List[Dict[str, Any]]] = flatten( + TPool().map( + lambda run: execute_run(var_dict, states_list, configs, env_processes, time_seq, run), + list(range(runs)) + ) + ) return pipe_run