diff --git a/engine/configProcessor.py b/engine/configProcessor.py index 4aaa690..e2da966 100644 --- a/engine/configProcessor.py +++ b/engine/configProcessor.py @@ -69,12 +69,4 @@ def generate_config(state_dict, mechanisms, exo_proc): sdf_values, bdf_values = only_ep_handler(state_dict) zipped_list = list(zip(sdf_values, bdf_values)) - return list(map(lambda x: (x[0] + exo_proc, x[1]), zipped_list)) - -def create_tensor_field(mechanisms, exo_proc, keys=['behaviors', 'states']): - dfs = [ create_matrix_field(mechanisms, k) for k in keys ] - df = pd.concat(dfs, axis=1) - for es, i in zip(exo_proc, range(len(exo_proc))): - df['es'+str(i+1)] = es - df['m'] = df.index + 1 - return df \ No newline at end of file + return list(map(lambda x: (x[0] + exo_proc, x[1]), zipped_list)) \ No newline at end of file diff --git a/engine/mechanismExecutor.py b/engine/mechanismExecutor.py index 2b8948c..18bf827 100644 --- a/engine/mechanismExecutor.py +++ b/engine/mechanismExecutor.py @@ -1,107 +1,109 @@ from copy import deepcopy from fn import _ from fn.op import foldr, call -from ui.config2 import behavior_ops +class Executor(object): + def __init__(self, behavior_ops): + self.behavior_ops = behavior_ops -def getColResults(step, sL, s, funcs): - return list(map(lambda f: f(step, sL, s), funcs)) + # Data Type reduction + def getBehaviorInput(self, step, sL, s, funcs): -# Data Type reduction -def getBehaviorInput(step, sL, s, funcs, ops = behavior_ops): - - if len(ops) == 0: - ops = [foldr(_ + _)] - else: - ops = ops[::-1] - - return foldr(call, getColResults(step, sL, s, funcs))(ops) - -def apply_env_proc(env_processes, state_dict, step): - for state in state_dict.keys(): - if state in list(env_processes.keys()): - state_dict[state] = env_processes[state](step)(state_dict[state]) - -# remove / modify -def exception_handler(f, m_step, sL, last_mut_obj, _input): - try: - return f(m_step, sL, last_mut_obj, _input) - except KeyError: - print("Exception") - return f(m_step, sL, sL[-2], _input) - - -def mech_step(m_step, sL, state_funcs, behavior_funcs, env_processes, t_step, run): - last_in_obj = sL[-1] - - _input = exception_handler(getBehaviorInput, m_step, sL, last_in_obj, behavior_funcs) - - # print(sL) - - # *** add env_proc value here as wrapper function *** - last_in_copy = dict([ exception_handler(f, m_step, sL, last_in_obj, _input) for f in state_funcs ]) - - for k in last_in_obj: - if k not in last_in_copy: - last_in_copy[k] = last_in_obj[k] - - del last_in_obj - - # make env proc trigger field agnostic - apply_env_proc(env_processes, last_in_copy, last_in_copy['timestamp']) # mutating last_in_copy - - last_in_copy["mech_step"], last_in_copy["time_step"], last_in_copy['run'] = m_step, t_step, run - # print(last_in_copy) - sL.append(last_in_copy) - del last_in_copy - - return sL - - -def block_gen(states_list, configs, env_processes, t_step, run): - m_step = 0 - states_list_copy = deepcopy(states_list) - # remove copy - genesis_states = states_list_copy[-1] - genesis_states['mech_step'], genesis_states['time_step'] = m_step, t_step - states_list = [genesis_states] - - m_step += 1 - for config in configs: - s_conf, b_conf = config[0], config[1] - states_list = mech_step(m_step, states_list, s_conf, b_conf, env_processes, t_step, run) - m_step += 1 - - t_step += 1 - - return states_list - - -# rename pipe -def pipe(states_list, configs, env_processes, time_seq, run): - time_seq = [x + 1 for x in time_seq] - simulation_list = [states_list] - for time_step in time_seq: - pipe_run = block_gen(simulation_list[-1], configs, env_processes, time_step, run) - _, *pipe_run = pipe_run - simulation_list.append(pipe_run) - - return simulation_list - - -# Del _ / head -def simulation(states_list, configs, env_processes, time_seq, runs): - pipe_run = [] - for run in range(runs): - run += 1 - if run == 1: - head, *tail = pipe(states_list, configs, env_processes, time_seq, run) - head[-1]['mech_step'], head[-1]['time_step'], head[-1]['run'] = 0, 0, 0 - simulation_list = [head] + tail - pipe_run += simulation_list + if len(self.behavior_ops) == 0: + ops = [foldr(_ + _)] else: - transient_states_list = [pipe_run[-1][-1]] - _, *tail = pipe(transient_states_list, configs, env_processes, time_seq, run) - pipe_run += tail + ops = self.behavior_ops[::-1] - return pipe_run \ No newline at end of file + def getColResults(step, sL, s, funcs): + return list(map(lambda f: f(step, sL, s), funcs)) + + return foldr(call, getColResults(step, sL, s, funcs))(ops) + + def apply_env_proc(env_processes, state_dict, step): + for state in state_dict.keys(): + if state in list(env_processes.keys()): + state_dict[state] = env_processes[state](step)(state_dict[state]) + + # remove / modify + def exception_handler(f, m_step, sL, last_mut_obj, _input): + try: + return f(m_step, sL, last_mut_obj, _input) + except KeyError: + print("Exception") + return f(m_step, sL, sL[-2], _input) + + + def mech_step(self, m_step, sL, state_funcs, behavior_funcs, env_processes, t_step, run): + last_in_obj = sL[-1] + + _input = Executor.getBehaviorInput(self, m_step, sL, last_in_obj, behavior_funcs) + + # print(sL) + + # *** add env_proc value here as wrapper function *** + last_in_copy = dict([ Executor.exception_handler(f, m_step, sL, last_in_obj, _input) for f in state_funcs ]) + + for k in last_in_obj: + if k not in last_in_copy: + last_in_copy[k] = last_in_obj[k] + + del last_in_obj + + # make env proc trigger field agnostic + Executor.apply_env_proc(env_processes, last_in_copy, last_in_copy['timestamp']) # mutating last_in_copy + + last_in_copy["mech_step"], last_in_copy["time_step"], last_in_copy['run'] = m_step, t_step, run + # print(last_in_copy) + sL.append(last_in_copy) + del last_in_copy + + return sL + + + def block_gen(self, states_list, configs, env_processes, t_step, run): + m_step = 0 + states_list_copy = deepcopy(states_list) + # remove copy + genesis_states = states_list_copy[-1] + genesis_states['mech_step'], genesis_states['time_step'] = m_step, t_step + states_list = [genesis_states] + + m_step += 1 + for config in configs: + s_conf, b_conf = config[0], config[1] + states_list = Executor.mech_step(self, m_step, states_list, s_conf, b_conf, env_processes, t_step, run) + m_step += 1 + + t_step += 1 + + return states_list + + + # rename pipe + def pipe(self, states_list, configs, env_processes, time_seq, run): + time_seq = [x + 1 for x in time_seq] + simulation_list = [states_list] + for time_step in time_seq: + pipe_run = Executor.block_gen(self, simulation_list[-1], configs, env_processes, time_step, run) + _, *pipe_run = pipe_run + simulation_list.append(pipe_run) + + return simulation_list + + + # Del _ / head + def simulation(self, states_list, configs, env_processes, time_seq, runs): + pipe_run = [] + for run in range(runs): + run += 1 + if run == 1: + head, *tail = Executor.pipe(self, states_list, configs, env_processes, time_seq, run) + head[-1]['mech_step'], head[-1]['time_step'], head[-1]['run'] = 0, 0, 0 + simulation_list = [head] + tail + pipe_run += simulation_list + else: + transient_states_list = [pipe_run[-1][-1]] + _, *tail = Executor.pipe(self, transient_states_list, configs, env_processes, time_seq, run) + pipe_run += tail + + return pipe_run \ No newline at end of file diff --git a/engine/multiproc.py b/engine/multiproc.py index 82c4f74..ab9e82c 100644 --- a/engine/multiproc.py +++ b/engine/multiproc.py @@ -1,8 +1,8 @@ from pathos.multiprocessing import ProcessingPool as Pool -def parallelize_simulations(f, states_list, configs, env_processes, T, N): +def parallelize_simulations(fs, states_list, configs, env_processes, T, N): + l = list(zip(fs, states_list, configs, env_processes)) with Pool(len(configs)) as p: - results = p.map(lambda x: f(states_list, x[0], x[1], T, N), list(zip(configs, env_processes))) - + results = p.map(lambda x: x[0](x[1], x[2], x[3], T, N), l) return results \ No newline at end of file diff --git a/engine/run.py b/engine/run.py index 452abec..ac0139b 100644 --- a/engine/run.py +++ b/engine/run.py @@ -1,56 +1,52 @@ import pandas as pd from tabulate import tabulate -from engine.configProcessor import generate_config, create_tensor_field -from engine.mechanismExecutor import simulation -from engine.utils import flatten +from engine.configProcessor import generate_config +from engine.mechanismExecutor import Executor +from utils.engine import flatten +from utils.ui import create_tensor_field from engine.multiproc import parallelize_simulations -from decimal import Decimal - # from ui.config import state_dict, mechanisms, exogenous_states, env_processes, sim_config import ui.config1 as conf1 import ui.config2 as conf2 def main(): - state_dict = { - 's1': Decimal(0.0), - 's2': Decimal(0.0), - 's3': Decimal(1.0), - 's4': Decimal(1.0), - 'timestamp': '2018-10-01 15:16:24' - } - sim_config = { - "N": 2, - "T": range(5) - } + states_list1 = [conf1.state_dict] + states_list2 = [conf2.state_dict] + states_lists = [states_list1,states_list2] - T = sim_config['T'] - N = sim_config['N'] - states_list = [state_dict] + T = conf1.sim_config['T'] + N = conf2.sim_config['N'] ep1 = list(conf1.exogenous_states.values()) ep2 = list(conf2.exogenous_states.values()) - eps = [ep1,ep2] + eps = [ep1, ep2] config1 = generate_config(conf1.state_dict, conf1.mechanisms, ep1) config2 = generate_config(conf2.state_dict, conf2.mechanisms, ep2) - - mechanisms = [conf1.mechanisms, conf2.mechanisms] - configs = [config1, config2] + env_processes = [conf1.env_processes, conf2.env_processes] # Dimensions: N x r x mechs + simulation1 = Executor(conf1.behavior_ops).simulation + simulation2 = Executor(conf2.behavior_ops).simulation + simulation_execs = [simulation1,simulation2] + + if len(configs) > 1: - simulations = parallelize_simulations(simulation, states_list, configs, env_processes, T, N) + simulations = parallelize_simulations(simulation_execs, states_lists, configs, env_processes, T, N) # else: - # simulations = [simulation(states_list, configs[0], env_processes, T, N)] + # simulations = [simulation(states_list1, configs[0], env_processes, T, N)] - # simulations = [simulation(states_list, config1, conf1.env_processes, T, N)] + # behavior_ops, states_list, configs, env_processes, time_seq, runs + # result = simulation(states_list1, config1, conf1.env_processes, T, N) + # return pd.DataFrame(flatten(result)) + mechanisms = [conf1.mechanisms, conf2.mechanisms] for result, mechanism, ep in list(zip(simulations, mechanisms, eps)): print(tabulate(create_tensor_field(mechanism, ep), headers='keys', tablefmt='psql')) print(tabulate(pd.DataFrame(flatten(result)), headers='keys', tablefmt='psql')) \ No newline at end of file diff --git a/engine/utils.py b/engine/utils.py deleted file mode 100644 index bde5835..0000000 --- a/engine/utils.py +++ /dev/null @@ -1,95 +0,0 @@ -from datetime import datetime, timedelta -from decimal import Decimal -from fn.func import curried - -flatten = lambda l: [item for sublist in l for item in sublist] - -def flatmap(f, items): - return list(map(f, items)) - - -def datetime_range(start, end, delta, dt_format='%Y-%m-%d %H:%M:%S'): - reverse_head = end - [start, end] = [datetime.strptime(x, dt_format) for x in [start, end]] - - def _datetime_range(start, end, delta): - current = start - while current < end: - yield current - current += delta - - reverse_tail = [dt.strftime(dt_format) for dt in _datetime_range(start, end, delta)] - return reverse_tail + [reverse_head] - -def last_index(l): - return len(l)-1 - -def retrieve_state(l, offset): - return l[last_index(l) + offset + 1] - -# shouldn't -def bound_norm_random(rng, low, high): - # Add RNG Seed - res = rng.normal((high+low)/2,(high-low)/6) - if (reshigh): - res = bound_norm_random(rng, low, high) - return Decimal(res) - -@curried -def proc_trigger(trigger_step, update_f, step): - if step == trigger_step: - return update_f - else: - return lambda x: x - -# accept timedelta instead of timedelta params -def time_step(dt_str, dt_format='%Y-%m-%d %H:%M:%S', days=0, minutes=0, seconds=30): - dt = datetime.strptime(dt_str, dt_format) - t = dt + timedelta(days=days, minutes=minutes, seconds=seconds) - return t.strftime(dt_format) - -# accept timedelta instead of timedelta params -def ep_time_step(s, dt_str, fromat_str='%Y-%m-%d %H:%M:%S', days=0, minutes=0, seconds=1): - if s['mech_step'] == 0: - return time_step(dt_str, fromat_str, days, minutes, seconds) - else: - return dt_str - -def exo_update_per_ts(ep): - @curried - def ep_decorator(f, y, step, sL, s, _input): - if s['mech_step'] + 1 == 1: # inside f body to reduce performance costs - return f(step, sL, s, _input) - else: - return (y, s[y]) - return {es: ep_decorator(f, es) for es, f in ep.items()} - -# def create_tensor_field(mechanisms, env_poc, keys=['behaviors', 'states']): -# dfs = [ create_matrix_field(mechanisms, k) for k in keys ] -# df = pd.concat(dfs, axis=1) -# for es, i in zip(env_poc, range(len(env_poc))): -# df['es'+str(i)] = es -# df['m'] = df.index + 1 -# return df -################# - -# def exo_proc_trigger(mech_step, update_f, y): -# if mech_step == 1: -# return update_f -# else: -# return lambda step, sL, s, _input: (y, s[y]) - - - -# def apply_exo_proc(s, x, y): -# if s['mech_step'] == 1: -# return x -# else: -# return s[y] - -# def es5p2(step, sL, s, _input): # accept timedelta instead of timedelta params -# y = 'timestamp' -# x = ep_time_step(s, s['timestamp'], seconds=1) -# return (y, x) - - diff --git a/ui/config1.py b/ui/config1.py index 4c6333c..d1d2400 100644 --- a/ui/config1.py +++ b/ui/config1.py @@ -1,4 +1,4 @@ -from engine.utils import bound_norm_random, ep_time_step, proc_trigger, exo_update_per_ts +from utils.configuration import * from fn.op import foldr from fn import _ from fn.func import curried @@ -13,17 +13,6 @@ seed = { 'c': np.random.RandomState(3) } -# # Behaviors per Mechanism -# def b1m1(step, sL, s): -# return np.array([1, 2]) -# def b2m1(step, sL, s): -# return np.array([3, 4]) -# # Internal States per Mechanism -# def s1m1(step, sL, s, _input): -# y = 's1' -# x = _input['b1'] * s['s1'] + _input['b2'] -# return (y, x) - # Behaviors per Mechanism # Different return types per mechanism ?? *** No *** def b1m1(step, sL, s): @@ -126,45 +115,6 @@ env_processes = { # genesis Sites should always be there # [1, 2] # behavior_ops = [ foldr(_ + _), lambda x: x + 0 ] -def print_fwd(x): - print(x) - return x - -def behavior_to_dict(v): - return dict(list(zip(map(lambda n: 'b' + str(n+1), list(range(len(v)))), v))) - -@curried -def foldr_dict_vals(f, d): - return foldr(f)(list(d.values())) - -def sum_dict_values(): - return foldr_dict_vals(_ + _) - -def get_base_value(datatype): - if datatype is str: - return '' - elif datatype is int: - return 0 - elif datatype is list: - return [] - return 0 - - -@curried -def dict_op(f, d1, d2): - - def set_base_value(target_dict, source_dict, key): - if key not in target_dict: - return get_base_value(type(source_dict[key])) - else: - return target_dict[key] - - key_set = set(list(d1.keys())+list(d2.keys())) - - return {k: f(set_base_value(d1, d2, k), set_base_value(d2, d1, k)) for k in key_set} - -def dict_elemwise_sum(): - return dict_op(_ + _) # [1, 2] = {'b1': ['a'], 'b2', [1]} = # behavior_ops = [ behavior_to_dict, print_fwd, sum_dict_values ] diff --git a/ui/config2.py b/ui/config2.py index ca8b120..6bc52de 100644 --- a/ui/config2.py +++ b/ui/config2.py @@ -1,11 +1,11 @@ -from engine.utils import bound_norm_random, ep_time_step, proc_trigger, exo_update_per_ts +from utils.configuration import * from fn.op import foldr -from fn import _ -from fn.func import curried + import numpy as np from decimal import Decimal + seed = { 'z': np.random.RandomState(1), 'a': np.random.RandomState(2), @@ -13,17 +13,6 @@ seed = { 'c': np.random.RandomState(3) } -# # Behaviors per Mechanism -# def b1m1(step, sL, s): -# return np.array([1, 2]) -# def b2m1(step, sL, s): -# return np.array([3, 4]) -# # Internal States per Mechanism -# def s1m1(step, sL, s, _input): -# y = 's1' -# x = _input['b1'] * s['s1'] + _input['b2'] -# return (y, x) - # Behaviors per Mechanism # Different return types per mechanism ?? *** No *** def b1m1(step, sL, s): @@ -126,49 +115,11 @@ env_processes = { # genesis Sites should always be there # [1, 2] # behavior_ops = [ foldr(_ + _), lambda x: x + 0 ] -def print_fwd(x): - print(x) - return x -def behavior_to_dict(v): - return dict(list(zip(map(lambda n: 'b' + str(n+1), list(range(len(v)))), v))) - -@curried -def foldr_dict_vals(f, d): - return foldr(f)(list(d.values())) - -def sum_dict_values(): - return foldr_dict_vals(_ + _) - -def get_base_value(datatype): - if datatype is str: - return '' - elif datatype is int: - return 0 - elif datatype is list: - return [] - return 0 - - -@curried -def dict_op(f, d1, d2): - - def set_base_value(target_dict, source_dict, key): - if key not in target_dict: - return get_base_value(type(source_dict[key])) - else: - return target_dict[key] - - key_set = set(list(d1.keys())+list(d2.keys())) - - return {k: f(set_base_value(d1, d2, k), set_base_value(d2, d1, k)) for k in key_set} - -def dict_elemwise_sum(): - return dict_op(_ + _) # [1, 2] = {'b1': ['a'], 'b2', [1]} = -# behavior_ops = [ behavior_to_dict, print_fwd, sum_dict_values ] -behavior_ops = [ foldr(dict_elemwise_sum()) ] +# behavior_ops = [behavior_to_dict, print_fwd, sum_dict_values] +behavior_ops = [foldr(dict_elemwise_sum())] # behavior_ops = [] # need at least 1 behaviour and 1 state function for the 1st mech with behaviors @@ -209,4 +160,4 @@ mechanisms = { sim_config = { "N": 2, "T": range(5) -} +} \ No newline at end of file diff --git a/utils/configuration.py b/utils/configuration.py new file mode 100644 index 0000000..fba3af6 --- /dev/null +++ b/utils/configuration.py @@ -0,0 +1,82 @@ +from datetime import datetime, timedelta +from decimal import Decimal +from fn import _ +from fn.func import curried +from fn.op import foldr + +def bound_norm_random(rng, low, high): + # Add RNG Seed + res = rng.normal((high+low)/2,(high-low)/6) + if (reshigh): + res = bound_norm_random(rng, low, high) + return Decimal(res) + +@curried +def proc_trigger(trigger_step, update_f, step): + if step == trigger_step: + return update_f + else: + return lambda x: x + +# accept timedelta instead of timedelta params +def time_step(dt_str, dt_format='%Y-%m-%d %H:%M:%S', days=0, minutes=0, seconds=30): + dt = datetime.strptime(dt_str, dt_format) + t = dt + timedelta(days=days, minutes=minutes, seconds=seconds) + return t.strftime(dt_format) + +# accept timedelta instead of timedelta params +def ep_time_step(s, dt_str, fromat_str='%Y-%m-%d %H:%M:%S', days=0, minutes=0, seconds=1): + if s['mech_step'] == 0: + return time_step(dt_str, fromat_str, days, minutes, seconds) + else: + return dt_str + +def exo_update_per_ts(ep): + @curried + def ep_decorator(f, y, step, sL, s, _input): + if s['mech_step'] + 1 == 1: # inside f body to reduce performance costs + return f(step, sL, s, _input) + else: + return (y, s[y]) + return {es: ep_decorator(f, es) for es, f in ep.items()} + +def print_fwd(x): + print(x) + return x + +def get_base_value(datatype): + if datatype is str: + return '' + elif datatype is int: + return 0 + elif datatype is list: + return [] + return 0 + +def behavior_to_dict(v): + return dict(list(zip(map(lambda n: 'b' + str(n + 1), list(range(len(v)))), v))) + +add = lambda a, b: a + b + +@curried +def foldr_dict_vals(f, d): + return foldr(f)(list(d.values())) + +def sum_dict_values(): + return foldr_dict_vals(add) + +@curried +def dict_op(f, d1, d2): + + def set_base_value(target_dict, source_dict, key): + if key not in target_dict: + return get_base_value(type(source_dict[key])) + else: + return target_dict[key] + + key_set = set(list(d1.keys()) + list(d2.keys())) + + return {k: f(set_base_value(d1, d2, k), set_base_value(d2, d1, k)) for k in key_set} + +def dict_elemwise_sum(): + return dict_op(add) \ No newline at end of file diff --git a/utils/engine.py b/utils/engine.py new file mode 100644 index 0000000..b5f3f7a --- /dev/null +++ b/utils/engine.py @@ -0,0 +1,45 @@ +from datetime import datetime + +flatten = lambda l: [item for sublist in l for item in sublist] + +def flatmap(f, items): + return list(map(f, items)) + + +def datetime_range(start, end, delta, dt_format='%Y-%m-%d %H:%M:%S'): + reverse_head = end + [start, end] = [datetime.strptime(x, dt_format) for x in [start, end]] + + def _datetime_range(start, end, delta): + current = start + while current < end: + yield current + current += delta + + reverse_tail = [dt.strftime(dt_format) for dt in _datetime_range(start, end, delta)] + return reverse_tail + [reverse_head] + +def last_index(l): + return len(l)-1 + +def retrieve_state(l, offset): + return l[last_index(l) + offset + 1] + +# def exo_proc_trigger(mech_step, update_f, y): +# if mech_step == 1: +# return update_f +# else: +# return lambda step, sL, s, _input: (y, s[y]) + + + +# def apply_exo_proc(s, x, y): +# if s['mech_step'] == 1: +# return x +# else: +# return s[y] + +# def es5p2(step, sL, s, _input): # accept timedelta instead of timedelta params +# y = 'timestamp' +# x = ep_time_step(s, s['timestamp'], seconds=1) +# return (y, x) \ No newline at end of file diff --git a/utils/ui.py b/utils/ui.py new file mode 100644 index 0000000..467d4f2 --- /dev/null +++ b/utils/ui.py @@ -0,0 +1,10 @@ +import pandas as pd +from engine.configProcessor import create_matrix_field + +def create_tensor_field(mechanisms, exo_proc, keys=['behaviors', 'states']): + dfs = [ create_matrix_field(mechanisms, k) for k in keys ] + df = pd.concat(dfs, axis=1) + for es, i in zip(exo_proc, range(len(exo_proc))): + df['es'+str(i+1)] = es + df['m'] = df.index + 1 + return df \ No newline at end of file