diff --git a/SimCAD/engine/simulation.py b/SimCAD/engine/simulation.py index e5bf89c..8144e6a 100644 --- a/SimCAD/engine/simulation.py +++ b/SimCAD/engine/simulation.py @@ -1,12 +1,7 @@ -from pathos.threading import ThreadPool from copy import deepcopy from fn.op import foldr, call -import numpy as np -import pprint -pp = pprint.PrettyPrinter(indent=4) - -from SimCAD.utils import groupByKey, flatten, drop_right +from SimCAD.utils import rename from SimCAD.engine.utils import engine_exception @@ -28,42 +23,27 @@ class Executor: return foldr(call, get_col_results(step, sL, s, funcs))(ops) - def xthreaded_env_proc(self, f, s_valx): - if isinstance(s_valx, list): - pool = ThreadPool(nodes=len(s_valx)) # ToDo: Optimize - return pool.map(lambda f: f(s_valx), s_valx) - else: - return f(s_valx) - def apply_env_proc(self, env_processes, state_dict, step): for state in state_dict.keys(): if state in list(env_processes.keys()): env_state = env_processes[state] if (env_state.__name__ == '_curried') or (env_state.__name__ == 'proc_trigger'): # might want to change - state_dict[state] = self.xthreaded_env_proc(env_state(step), state_dict[state]) + state_dict[state] = env_state(step)(state_dict[state]) else: - state_dict[state] = self.xthreaded_env_proc(env_state, state_dict[state]) + state_dict[state] = env_state(state_dict[state]) - def xthreaded_state_update(self, fs, m_step, sL, last_in_obj, _input): - if isinstance(fs, list): - pool = ThreadPool(nodes=len(fs)) # ToDo: Optimize - fx = pool.map(lambda f: f(m_step, sL, last_in_obj, _input), fs) - return groupByKey(fx) - else: - return fs(m_step, sL, last_in_obj, _input) - def mech_step(self, m_step, sL, state_funcs, behavior_funcs, env_processes, t_step, run): last_in_obj = sL[-1] _input = self.state_update_exception(self.get_behavior_input(m_step, sL, last_in_obj, behavior_funcs)) # ToDo: add env_proc generator to `last_in_copy` iterator as wrapper function - last_in_copy = dict([ - self.behavior_update_exception( - self.xthreaded_state_update(f, m_step, sL, last_in_obj, _input) - ) for f in state_funcs - ]) + # last_in_copy = dict([self.behavior_update_exception(f(m_step, sL, last_in_obj, _input)) for f in state_funcs]) + + for f in state_funcs: + print(f(1,2,3,4)) + exit() for k in last_in_obj: if k not in last_in_copy: @@ -74,32 +54,12 @@ class Executor: # make env proc trigger field agnostic self.apply_env_proc(env_processes, last_in_copy, last_in_copy['timestamp']) # mutating last_in_copy - print() - print(last_in_copy) - print() - - - def set_sys_metrics(state_dict, m_step, t_step, run): - state_dict["mech_step"], state_dict["time_step"], state_dict['run'] = m_step, t_step, run - - if any(isinstance(x, list) for x in last_in_copy.values()): - last_in_copies = flatten(last_in_copy) - for last_in_copy in last_in_copies: - set_sys_metrics(last_in_copy, m_step, t_step, run) - sL.append(last_in_copies) - else: - set_sys_metrics(last_in_copy, m_step, t_step, run) - sL.append(last_in_copy) - - print() - pp.pprint(last_in_copies) - print() - + last_in_copy["mech_step"], last_in_copy["time_step"], last_in_copy['run'] = m_step, t_step, run + sL.append(last_in_copy) del last_in_copy return sL - def mech_pipeline(self, states_list, configs, env_processes, t_step, run): m_step = 0 states_list_copy = deepcopy(states_list) @@ -108,57 +68,30 @@ class Executor: genesis_states = states_list_copy[-1] genesis_states['mech_step'], genesis_states['time_step'] = m_step, t_step states_list = [genesis_states] - # print(genesis_states) m_step += 1 for config in configs: s_conf, b_conf = config[0], config[1] - last_states = states_list[-1] - if isinstance(last_states, list): - pool = ThreadPool(nodes=len(last_states)) # ToDo: Optimize - dropped_right_sL = drop_right(states_list, 1) - - def multithreaded_mech_step(mod_states_list): - return self.mech_step(m_step, mod_states_list, s_conf, b_conf, env_processes, t_step, run) - - states_lists = pool.map( - lambda last_state_dict: dropped_right_sL + [last_state_dict], - last_states - ) - print() - # pp.pprint(configs) - else: - states_lists = self.mech_step(m_step, states_list, s_conf, b_conf, env_processes, t_step, run) - - + states_list = self.mech_step(m_step, states_list, s_conf, b_conf, env_processes, t_step, run) m_step += 1 t_step += 1 - exit() - return states_list - # rename pipe + # ToDo: Rename Run Pipeline def block_pipeline(self, states_list, configs, env_processes, time_seq, run): time_seq = [x + 1 for x in time_seq] simulation_list = [states_list] # print(len(configs)) for time_step in time_seq: - # print(simulation_list) - if len(simulation_list) == 1: - pipe_run = self.mech_pipeline(simulation_list[-1], configs, env_processes, time_step, run) - exit() - # elif np.array(pipe_run[-1]) == 2: - # pipe_run = self.mech_pipeline(simulation_list[-1], configs, env_processes, time_step, run) - # print(pipe_run) + pipe_run = self.mech_pipeline(simulation_list[-1], configs, env_processes, time_step, run) _, *pipe_run = pipe_run - # print(pipe_run) simulation_list.append(pipe_run) return simulation_list - # Del _ / head + # ToDo: Muiltithreaded Runs def simulation(self, states_list, configs, env_processes, time_seq, runs): pipe_run = [] for run in range(runs): diff --git a/simulations/sim_test.py b/simulations/sim_test.py index 161db56..3d982b2 100644 --- a/simulations/sim_test.py +++ b/simulations/sim_test.py @@ -3,7 +3,7 @@ from tabulate import tabulate # The following imports NEED to be in the exact same order from SimCAD.engine import ExecutionMode, ExecutionContext, Executor -from simulations.validation import config1, config2 +from simulations.validation import config1 #, config2 # from simulations.validation import base_config1, base_config2 # from simulations.barlin import config4 # from simulations.zx import config_zx @@ -15,7 +15,7 @@ from SimCAD import configs exec_mode = ExecutionMode() -print("Simulation Execution 1") +print("Simulation Execution 1: Config 1") print() first_config = [configs[0]] # from config1 single_proc_ctx = ExecutionContext(context=exec_mode.single_proc) @@ -29,9 +29,9 @@ print(tabulate(tensor_field, headers='keys', tablefmt='psql')) print("Output:") print(tabulate(result, headers='keys', tablefmt='psql')) print() -# + + # print("Simulation Execution 2: Pairwise Execution") -# print() # multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc) # run2 = Executor(exec_context=multi_proc_ctx, configs=configs) # for raw_result, tensor_field in run2.main(): diff --git a/simulations/validation/config1.py b/simulations/validation/config1.py index 4578dbf..aa5d52f 100644 --- a/simulations/validation/config1.py +++ b/simulations/validation/config1.py @@ -3,8 +3,9 @@ import numpy as np from datetime import timedelta from fn.func import curried import pprint +from copy import deepcopy from SimCAD import configs -from SimCAD.utils import flatMap +from SimCAD.utils import flatMap, rename from SimCAD.configuration import Configuration from SimCAD.configuration.utils import exo_update_per_ts, proc_trigger, bound_norm_random, \ ep_time_step, param_sweep @@ -12,6 +13,7 @@ from SimCAD.engine.utils import sweep pp = pprint.PrettyPrinter(indent=4) +# ToDo: handle single param sweep beta =[Decimal(1), Decimal(2)] seed = { @@ -28,12 +30,12 @@ def b1m1(step, sL, s): def b2m1(step, sL, s): return {'param2': 4} -# @curried -# def b1m2(param, step, sL, s): -# return {'param1': 'a', 'param2': param} - -def b1m2(step, sL, s): - return {'param1': 'a', 'param2': 2} +@curried +def b1m2(param, step, sL, s): + return {'param1': 'a', 'param2': param} +# +# def b1m2(step, sL, s): +# return {'param1': 'a', 'param2': 2} def b2m2(step, sL, s): return {'param1': 'b', 'param2': 4} @@ -52,18 +54,18 @@ def s1m1(step, sL, s, _input): return (y, x) -param = Decimal(11.0) -def s2m1(step, sL, s, _input): - y = 's2' - x = _input['param2'] + param - return (y, x) - -# @curried -# def s2m1(param, step, sL, s, _input): +# param = Decimal(11.0) +# def s2m1(step, sL, s, _input): # y = 's2' # x = _input['param2'] + param # return (y, x) +@curried +def s2m1(param, step, sL, s, _input): + y = 's2' + x = _input['param2'] + param + return (y, x) + def s1m2(step, sL, s, _input): y = 's1' x = _input['param1'] @@ -85,18 +87,18 @@ def s2m3(step, sL, s, _input): # Exogenous States proc_one_coef_A = 0.7 proc_one_coef_B = 1.3 - -def es3p1(step, sL, s, _input): - y = 's3' - x = s['s3'] * bound_norm_random(seed['a'], proc_one_coef_A, proc_one_coef_B) - return (y, x) - -# @curried -# def es3p1(param, step, sL, s, _input): +# +# def es3p1(step, sL, s, _input): # y = 's3' -# x = s['s3'] + param +# x = s['s3'] * bound_norm_random(seed['a'], proc_one_coef_A, proc_one_coef_B) # return (y, x) +@curried +def es3p1(param, step, sL, s, _input): + y = 's3' + x = s['s3'] + param + return (y, x) + def es4p2(step, sL, s, _input): y = 's4' x = s['s4'] * bound_norm_random(seed['b'], proc_one_coef_A, proc_one_coef_B) @@ -137,27 +139,25 @@ raw_exogenous_states = { "timestamp": es5p2 } exogenous_states = exo_update_per_ts(raw_exogenous_states) +exogenous_states['s3'] = rename('parameterized', es3p1) # ToDo: make env proc trigger field agnostic # ToDo: input json into function renaming __name__ triggered_env_b = proc_trigger('2018-10-01 15:16:25', env_b) env_processes = { "s3": env_a, #sweep(beta, env_a, 'env_a'), - "s4": sweep(beta, triggered_env_b, 'triggered_env_b') + "s4": rename('parameterized', triggered_env_b) #sweep(beta, triggered_env_b) } -# lambdas -# genesis Sites should always be there -# [1, 2] -# behavior_ops = [ foldr(_ + _), lambda x: x + 0 ] - -# [1, 2] = {'b1': ['a'], 'b2', [1]} = -# behavior_ops = [ behavior_to_dict, print_fwd, sum_dict_values ] -# behavior_ops = [foldr(dict_elemwise_sum())] -# behavior_ops = [foldr(lambda a, b: a + b)] +# ToDo: The number of values enteren in sweep should be the # of config objs created, +# not dependent on the # of times the sweep is applied +# sweep exo_state func and point to exo-state in every other funtion +# param sweep on genesis states # need at least 1 behaviour and 1 state function for the 1st mech with behaviors # mechanisms = {} + + mechanisms = { "m1": { "behaviors": { @@ -166,12 +166,12 @@ mechanisms = { }, "states": { # exclude only. TypeError: reduce() of empty sequence with no initial value "s1": s1m1, - "s2": s2m1 #sweep(beta, s2m1) + "s2": rename('parameterized', s2m1) #s2m1(1) #sweep(beta, s2m1) } }, "m2": { "behaviors": { - "b1": b1m2, #sweep(beta, b1m2), + "b1": rename('parameterized', b1m2), #b1m2(1) #sweep(beta, b1m2), "b2": b2m2 }, "states": { @@ -196,27 +196,91 @@ sim_config = { "T": range(5) } -c = Configuration( - sim_config=sim_config, - state_dict=genesis_states, - seed=seed, - env_processes=env_processes, - exogenous_states=exogenous_states, - mechanisms=mechanisms +# # print(rename('new', b2m2).__name__) +# +def parameterize_mechanism(mechanisms, param): + new_mechanisms = deepcopy(mechanisms) + for mech, update_types in new_mechanisms.items(): + for update_type, fkv in update_types.items(): + for sk, vf in fkv.items(): + if vf.__name__ == 'parameterized': + print(vf.__name__) + new_mechanisms[mech][update_type][sk] = vf(param) + + del mechanisms + return new_mechanisms + +def parameterize_states(states_dict, param): + new_states_dict = deepcopy(states_dict) + for sk, vf in new_states_dict.items(): + if vf.__name__ == 'parameterized': + print(vf.__name__) + new_states_dict[sk] = vf(param) + + del states_dict + return new_states_dict + +# parameterize_mechanism(mechanisms, beta) +@curried +def s2m1(param, a, b, c, d): + y = a + x = b, + c + d + param + return (y, x) + +# print(s2m1(1)(1)) +pp.pprint(parameterize_mechanism(mechanisms, 1)) +# pp.pprint(parameterize_states(raw_exogenous_states, 1)) +# pp.pprint(parameterize_states(env_processes, 1)) + + +configs.append( + Configuration( + sim_config=sim_config, + state_dict=genesis_states, + seed=seed, + exogenous_states=exogenous_states, + env_processes=env_processes, + mechanisms=parameterize_mechanism(mechanisms, 1) + ) ) -configs = configs + param_sweep(c, raw_exogenous_states) +# def sweep_config(config, params): +# new_config = deepcopy(config) +# configs = [] +# for param in params: +# new_config.mechanisms = parameterize_mechanism(config.mechanisms, param) +# # new_config.raw_exogenous_states = parameterize_states(config.exogenous_states, param) +# # new_config.env_processes = parameterize_states(config.env_processes, param) +# configs.append(new_config) +# del config +# return configs -print() -print(len(configs)) -print() -for g in configs: - print() - print('Configuration') - print() - pp.pprint(g.env_processes) - print() - pp.pprint(g.exogenous_states) - print() - pp.pprint(g.mechanisms) - print() \ No newline at end of file + +# print(sweep_config(c, beta)) +# +# for config in sweep_config(c, beta): +# configs.append(config) + +# for config in param_sweep(c, raw_exogenous_states): +# configs.append(config) + + + +# # configs = configs + +# # +# print() +# print(len(configs)) +# print() + + + +# for g in configs: +# print() +# print('Configuration') +# print() +# pp.pprint(g.env_processes) +# print() +# pp.pprint(g.exogenous_states) +# print() +# pp.pprint(g.mechanisms) +# print() \ No newline at end of file