wrapping up type stuff

This commit is contained in:
Joshua E. Jodesty 2019-02-27 11:36:10 -05:00
parent cb6acce3d9
commit 9e9f7be17e
7 changed files with 78 additions and 36 deletions

View File

@ -30,10 +30,10 @@ def sanitize_partial_state_updates(partial_state_updates):
# Also for backwards compatibility, we accept partial state update blocks both as list or dict
# No need for a deprecation warning as it's already raised by cadCAD.utils.key_filter
if (type(new_partial_state_updates)==list):
if isinstance(new_partial_state_updates, list):
for v in new_partial_state_updates:
rename_keys(v)
elif (type(new_partial_state_updates)==dict):
elif isinstance(new_partial_state_updates, dict):
for k, v in new_partial_state_updates.items():
rename_keys(v)

View File

@ -14,7 +14,7 @@ def get_base_value(x):
def policy_to_dict(v):
return dict(list(zip(map(lambda n: 'b' + str(n + 1), list(range(len(v)))), v)))
return dict(list(zip(map(lambda n: 'p' + str(n + 1), list(range(len(v)))), v)))
add = lambda a, b: a + b

View File

@ -1,33 +1,72 @@
from typing import Callable, Dict, List, Any, Tuple
from pathos.multiprocessing import ProcessingPool as Pool
from pandas.core.frame import DataFrame
from cadCAD.utils import flatten
from cadCAD.configuration import Processor
from cadCAD.configuration import Configuration, Processor
from cadCAD.configuration.utils import TensorFieldReport
from cadCAD.engine.simulation import Executor as SimExecutor
VarDictType = Dict[str, List[Any]]
StatesListsType = List[Dict[str, Any]]
ConfigsType = List[Tuple[List[Callable], List[Callable]]]
EnvProcessesType = Dict[str, Callable]
SimulationType = Callable[
[
SimExecutor,
VarDictType,
StatesListsType,
ConfigsType,
EnvProcessesType,
range,
int
],
List[List[Dict[str, Any]]]
]
class ExecutionMode:
single_proc = 'single_proc'
multi_proc = 'multi_proc'
def single_proc_exec(
simulation_execs: List[SimulationType],
var_dict_list: List[VarDictType],
states_lists: List[StatesListsType],
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
Ns: List[int]
):
l = [simulation_execs, states_lists, configs_structs, env_processes_list, Ts, Ns]
simulation_exec, states_list, config, env_processes, T, N = list(map(lambda x: x.pop(), l))
result = simulation_exec(var_dict_list, states_list, config, env_processes, T, N)
return flatten(result)
def parallelize_simulations(
simulation_execs: List[SimulationType],
var_dict_list: List[VarDictType],
states_lists: List[StatesListsType],
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
Ns: List[int]
):
l = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns))
with Pool(len(configs_structs)) as p:
results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6]), l)
return results
class ExecutionContext:
def __init__(self, context=ExecutionMode.multi_proc):
def __init__(self, context: str = ExecutionMode.multi_proc) -> None:
self.name = context
self.method = None
def single_proc_exec(simulation_execs, var_dict, states_lists, configs_structs, env_processes_list, Ts, Ns):
l = [simulation_execs, states_lists, configs_structs, env_processes_list, Ts, Ns]
simulation, states_list, config, env_processes, T, N = list(map(lambda x: x.pop(), l))
result = simulation(var_dict, states_list, config, env_processes, T, N)
return flatten(result)
def parallelize_simulations(simulations, var_dict_list, states_list, configs, env_processes, Ts, Ns):
l = list(zip(simulations, var_dict_list, states_list, configs, env_processes, Ts, Ns))
with Pool(len(configs)) as p:
results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6]), l)
return results
if context == 'single_proc':
self.method = single_proc_exec
elif context == 'multi_proc':
@ -35,14 +74,14 @@ class ExecutionContext:
class Executor:
def __init__(self, exec_context, configs):
def __init__(self, exec_context: ExecutionContext, configs: List[Configuration]) -> None:
self.SimExecutor = SimExecutor
self.exec_method = exec_context.method
self.exec_context = exec_context.name
self.configs = configs
self.main = self.execute
def execute(self):
def execute(self) -> Tuple[List[Dict[str, Any]], DataFrame]:
config_proc = Processor()
create_tensor_field = TensorFieldReport(config_proc).create_tensor_field
@ -64,11 +103,13 @@ class Executor:
config_idx += 1
final_result = None
if self.exec_context == ExecutionMode.single_proc:
# ToDO: Deprication Handler - "sanitize" in appropriate place
tensor_field = create_tensor_field(partial_state_updates.pop(), eps.pop())
result = self.exec_method(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns)
return result, tensor_field
final_result = result, tensor_field
elif self.exec_context == ExecutionMode.multi_proc:
if len(self.configs) > 1:
simulations = self.exec_method(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns)
@ -76,4 +117,6 @@ class Executor:
for result, partial_state_updates, ep in list(zip(simulations, partial_state_updates, eps)):
results.append((flatten(result), create_tensor_field(partial_state_updates, ep)))
return results
final_result = results
return final_result

View File

@ -6,8 +6,6 @@ from typing import Any, Callable, Dict, List, Tuple
id_exception: Callable = engine_exception(KeyError, KeyError, None)
import pprint as pp
class Executor:
@ -51,9 +49,9 @@ class Executor:
if state in list(env_processes.keys()):
env_state: Callable = env_processes[state]
if (env_state.__name__ == '_curried') or (env_state.__name__ == 'proc_trigger'):
state_dict[state]: Any = env_state(sub_step)(state_dict[state])
state_dict[state] = env_state(sub_step)(state_dict[state])
else:
state_dict[state]: Any = env_state(state_dict[state])
state_dict[state] = env_state(state_dict[state])
# mech_step
def partial_state_update(
@ -81,7 +79,7 @@ class Executor:
for k in last_in_obj:
if k not in last_in_copy:
last_in_copy[k]: Any = last_in_obj[k]
last_in_copy[k] = last_in_obj[k]
del last_in_obj

View File

@ -9,11 +9,11 @@ exec_mode = ExecutionMode()
print("Simulation Execution: Concurrent Execution")
multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
run2 = Executor(exec_context=multi_proc_ctx, configs=configs)
run = Executor(exec_context=multi_proc_ctx, configs=configs)
i = 0
config_names = ['config1', 'config2']
for raw_result, tensor_field in run2.main():
for raw_result, tensor_field in run.main():
result = pd.DataFrame(raw_result)
print()
print("Tensor Field: " + config_names[i])
@ -21,4 +21,4 @@ for raw_result, tensor_field in run2.main():
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))
print()
i += 1
i += 1

View File

@ -9,11 +9,11 @@ exec_mode = ExecutionMode()
print("Simulation Execution: Concurrent Execution")
multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
run2 = Executor(exec_context=multi_proc_ctx, configs=configs)
run = Executor(exec_context=multi_proc_ctx, configs=configs)
i = 0
config_names = ['sweep_config_A', 'sweep_config_B']
for raw_result, tensor_field in run2.main():
for raw_result, tensor_field in run.main():
result = pd.DataFrame(raw_result)
print()
print("Tensor Field: " + config_names[i])
@ -21,4 +21,4 @@ for raw_result, tensor_field in run2.main():
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))
print()
i += 1
i += 1

View File

@ -11,12 +11,13 @@ print("Simulation Execution: Single Configuration")
print()
first_config = configs # only contains config1
single_proc_ctx = ExecutionContext(context=exec_mode.single_proc)
run1 = Executor(exec_context=single_proc_ctx, configs=first_config)
run1_raw_result, tensor_field = run1.main()
result = pd.DataFrame(run1_raw_result)
run = Executor(exec_context=single_proc_ctx, configs=first_config)
raw_result, tensor_field = run.main()
result = pd.DataFrame(raw_result)
print()
print("Tensor Field: config1")
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))
print()
print()