From 9e9f7be17e3e2cb536cd457aee83b19ffce23099 Mon Sep 17 00:00:00 2001 From: "Joshua E. Jodesty" Date: Wed, 27 Feb 2019 11:36:10 -0500 Subject: [PATCH] wrapping up type stuff --- .../utils/depreciationHandler.py | 4 +- .../configuration/utils/policyAggregation.py | 2 +- cadCAD/engine/__init__.py | 79 ++++++++++++++----- cadCAD/engine/simulation.py | 8 +- simulations/multi_config_run.py | 6 +- simulations/param_sweep_run.py | 6 +- simulations/single_config_run.py | 9 ++- 7 files changed, 78 insertions(+), 36 deletions(-) diff --git a/cadCAD/configuration/utils/depreciationHandler.py b/cadCAD/configuration/utils/depreciationHandler.py index 8997771..330823b 100644 --- a/cadCAD/configuration/utils/depreciationHandler.py +++ b/cadCAD/configuration/utils/depreciationHandler.py @@ -30,10 +30,10 @@ def sanitize_partial_state_updates(partial_state_updates): # Also for backwards compatibility, we accept partial state update blocks both as list or dict # No need for a deprecation warning as it's already raised by cadCAD.utils.key_filter - if (type(new_partial_state_updates)==list): + if isinstance(new_partial_state_updates, list): for v in new_partial_state_updates: rename_keys(v) - elif (type(new_partial_state_updates)==dict): + elif isinstance(new_partial_state_updates, dict): for k, v in new_partial_state_updates.items(): rename_keys(v) diff --git a/cadCAD/configuration/utils/policyAggregation.py b/cadCAD/configuration/utils/policyAggregation.py index eac845d..68535a3 100644 --- a/cadCAD/configuration/utils/policyAggregation.py +++ b/cadCAD/configuration/utils/policyAggregation.py @@ -14,7 +14,7 @@ def get_base_value(x): def policy_to_dict(v): - return dict(list(zip(map(lambda n: 'b' + str(n + 1), list(range(len(v)))), v))) + return dict(list(zip(map(lambda n: 'p' + str(n + 1), list(range(len(v)))), v))) add = lambda a, b: a + b diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 6ba51e9..ee36088 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -1,33 +1,72 @@ +from typing import Callable, Dict, List, Any, Tuple from pathos.multiprocessing import ProcessingPool as Pool +from pandas.core.frame import DataFrame from cadCAD.utils import flatten -from cadCAD.configuration import Processor +from cadCAD.configuration import Configuration, Processor from cadCAD.configuration.utils import TensorFieldReport from cadCAD.engine.simulation import Executor as SimExecutor +VarDictType = Dict[str, List[Any]] +StatesListsType = List[Dict[str, Any]] +ConfigsType = List[Tuple[List[Callable], List[Callable]]] +EnvProcessesType = Dict[str, Callable] + +SimulationType = Callable[ + [ + SimExecutor, + VarDictType, + StatesListsType, + ConfigsType, + EnvProcessesType, + range, + int + ], + List[List[Dict[str, Any]]] +] + class ExecutionMode: single_proc = 'single_proc' multi_proc = 'multi_proc' +def single_proc_exec( + simulation_execs: List[SimulationType], + var_dict_list: List[VarDictType], + states_lists: List[StatesListsType], + configs_structs: List[ConfigsType], + env_processes_list: List[EnvProcessesType], + Ts: List[range], + Ns: List[int] + ): + + l = [simulation_execs, states_lists, configs_structs, env_processes_list, Ts, Ns] + simulation_exec, states_list, config, env_processes, T, N = list(map(lambda x: x.pop(), l)) + result = simulation_exec(var_dict_list, states_list, config, env_processes, T, N) + return flatten(result) + + +def parallelize_simulations( + simulation_execs: List[SimulationType], + var_dict_list: List[VarDictType], + states_lists: List[StatesListsType], + configs_structs: List[ConfigsType], + env_processes_list: List[EnvProcessesType], + Ts: List[range], + Ns: List[int] + ): + l = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns)) + with Pool(len(configs_structs)) as p: + results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6]), l) + return results + + class ExecutionContext: - def __init__(self, context=ExecutionMode.multi_proc): + def __init__(self, context: str = ExecutionMode.multi_proc) -> None: self.name = context self.method = None - def single_proc_exec(simulation_execs, var_dict, states_lists, configs_structs, env_processes_list, Ts, Ns): - l = [simulation_execs, states_lists, configs_structs, env_processes_list, Ts, Ns] - simulation, states_list, config, env_processes, T, N = list(map(lambda x: x.pop(), l)) - result = simulation(var_dict, states_list, config, env_processes, T, N) - return flatten(result) - - def parallelize_simulations(simulations, var_dict_list, states_list, configs, env_processes, Ts, Ns): - l = list(zip(simulations, var_dict_list, states_list, configs, env_processes, Ts, Ns)) - with Pool(len(configs)) as p: - results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6]), l) - return results - if context == 'single_proc': self.method = single_proc_exec elif context == 'multi_proc': @@ -35,14 +74,14 @@ class ExecutionContext: class Executor: - def __init__(self, exec_context, configs): + def __init__(self, exec_context: ExecutionContext, configs: List[Configuration]) -> None: self.SimExecutor = SimExecutor self.exec_method = exec_context.method self.exec_context = exec_context.name self.configs = configs self.main = self.execute - def execute(self): + def execute(self) -> Tuple[List[Dict[str, Any]], DataFrame]: config_proc = Processor() create_tensor_field = TensorFieldReport(config_proc).create_tensor_field @@ -64,11 +103,13 @@ class Executor: config_idx += 1 + final_result = None + if self.exec_context == ExecutionMode.single_proc: # ToDO: Deprication Handler - "sanitize" in appropriate place tensor_field = create_tensor_field(partial_state_updates.pop(), eps.pop()) result = self.exec_method(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns) - return result, tensor_field + final_result = result, tensor_field elif self.exec_context == ExecutionMode.multi_proc: if len(self.configs) > 1: simulations = self.exec_method(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns) @@ -76,4 +117,6 @@ class Executor: for result, partial_state_updates, ep in list(zip(simulations, partial_state_updates, eps)): results.append((flatten(result), create_tensor_field(partial_state_updates, ep))) - return results \ No newline at end of file + final_result = results + + return final_result diff --git a/cadCAD/engine/simulation.py b/cadCAD/engine/simulation.py index 97af73d..ed0b466 100644 --- a/cadCAD/engine/simulation.py +++ b/cadCAD/engine/simulation.py @@ -6,8 +6,6 @@ from typing import Any, Callable, Dict, List, Tuple id_exception: Callable = engine_exception(KeyError, KeyError, None) -import pprint as pp - class Executor: @@ -51,9 +49,9 @@ class Executor: if state in list(env_processes.keys()): env_state: Callable = env_processes[state] if (env_state.__name__ == '_curried') or (env_state.__name__ == 'proc_trigger'): - state_dict[state]: Any = env_state(sub_step)(state_dict[state]) + state_dict[state] = env_state(sub_step)(state_dict[state]) else: - state_dict[state]: Any = env_state(state_dict[state]) + state_dict[state] = env_state(state_dict[state]) # mech_step def partial_state_update( @@ -81,7 +79,7 @@ class Executor: for k in last_in_obj: if k not in last_in_copy: - last_in_copy[k]: Any = last_in_obj[k] + last_in_copy[k] = last_in_obj[k] del last_in_obj diff --git a/simulations/multi_config_run.py b/simulations/multi_config_run.py index c81e1f0..28cee65 100644 --- a/simulations/multi_config_run.py +++ b/simulations/multi_config_run.py @@ -9,11 +9,11 @@ exec_mode = ExecutionMode() print("Simulation Execution: Concurrent Execution") multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc) -run2 = Executor(exec_context=multi_proc_ctx, configs=configs) +run = Executor(exec_context=multi_proc_ctx, configs=configs) i = 0 config_names = ['config1', 'config2'] -for raw_result, tensor_field in run2.main(): +for raw_result, tensor_field in run.main(): result = pd.DataFrame(raw_result) print() print("Tensor Field: " + config_names[i]) @@ -21,4 +21,4 @@ for raw_result, tensor_field in run2.main(): print("Output:") print(tabulate(result, headers='keys', tablefmt='psql')) print() - i += 1 \ No newline at end of file + i += 1 diff --git a/simulations/param_sweep_run.py b/simulations/param_sweep_run.py index ecf620f..328e774 100644 --- a/simulations/param_sweep_run.py +++ b/simulations/param_sweep_run.py @@ -9,11 +9,11 @@ exec_mode = ExecutionMode() print("Simulation Execution: Concurrent Execution") multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc) -run2 = Executor(exec_context=multi_proc_ctx, configs=configs) +run = Executor(exec_context=multi_proc_ctx, configs=configs) i = 0 config_names = ['sweep_config_A', 'sweep_config_B'] -for raw_result, tensor_field in run2.main(): +for raw_result, tensor_field in run.main(): result = pd.DataFrame(raw_result) print() print("Tensor Field: " + config_names[i]) @@ -21,4 +21,4 @@ for raw_result, tensor_field in run2.main(): print("Output:") print(tabulate(result, headers='keys', tablefmt='psql')) print() - i += 1 \ No newline at end of file + i += 1 diff --git a/simulations/single_config_run.py b/simulations/single_config_run.py index 38b1307..6326c52 100644 --- a/simulations/single_config_run.py +++ b/simulations/single_config_run.py @@ -11,12 +11,13 @@ print("Simulation Execution: Single Configuration") print() first_config = configs # only contains config1 single_proc_ctx = ExecutionContext(context=exec_mode.single_proc) -run1 = Executor(exec_context=single_proc_ctx, configs=first_config) -run1_raw_result, tensor_field = run1.main() -result = pd.DataFrame(run1_raw_result) +run = Executor(exec_context=single_proc_ctx, configs=first_config) + +raw_result, tensor_field = run.main() +result = pd.DataFrame(raw_result) print() print("Tensor Field: config1") print(tabulate(tensor_field, headers='keys', tablefmt='psql')) print("Output:") print(tabulate(result, headers='keys', tablefmt='psql')) -print() \ No newline at end of file +print()