This commit is contained in:
Joshua E. Jodesty 2019-01-07 17:40:09 -05:00
parent 4f9e320109
commit 1b52a4bebf
3 changed files with 139 additions and 142 deletions

View File

@ -1,12 +1,7 @@
from pathos.threading import ThreadPool
from copy import deepcopy
from fn.op import foldr, call
import numpy as np
import pprint
pp = pprint.PrettyPrinter(indent=4)
from SimCAD.utils import groupByKey, flatten, drop_right
from SimCAD.utils import rename
from SimCAD.engine.utils import engine_exception
@ -28,42 +23,27 @@ class Executor:
return foldr(call, get_col_results(step, sL, s, funcs))(ops)
def xthreaded_env_proc(self, f, s_valx):
if isinstance(s_valx, list):
pool = ThreadPool(nodes=len(s_valx)) # ToDo: Optimize
return pool.map(lambda f: f(s_valx), s_valx)
else:
return f(s_valx)
def apply_env_proc(self, env_processes, state_dict, step):
for state in state_dict.keys():
if state in list(env_processes.keys()):
env_state = env_processes[state]
if (env_state.__name__ == '_curried') or (env_state.__name__ == 'proc_trigger'): # might want to change
state_dict[state] = self.xthreaded_env_proc(env_state(step), state_dict[state])
state_dict[state] = env_state(step)(state_dict[state])
else:
state_dict[state] = self.xthreaded_env_proc(env_state, state_dict[state])
state_dict[state] = env_state(state_dict[state])
def xthreaded_state_update(self, fs, m_step, sL, last_in_obj, _input):
if isinstance(fs, list):
pool = ThreadPool(nodes=len(fs)) # ToDo: Optimize
fx = pool.map(lambda f: f(m_step, sL, last_in_obj, _input), fs)
return groupByKey(fx)
else:
return fs(m_step, sL, last_in_obj, _input)
def mech_step(self, m_step, sL, state_funcs, behavior_funcs, env_processes, t_step, run):
last_in_obj = sL[-1]
_input = self.state_update_exception(self.get_behavior_input(m_step, sL, last_in_obj, behavior_funcs))
# ToDo: add env_proc generator to `last_in_copy` iterator as wrapper function
last_in_copy = dict([
self.behavior_update_exception(
self.xthreaded_state_update(f, m_step, sL, last_in_obj, _input)
) for f in state_funcs
])
# last_in_copy = dict([self.behavior_update_exception(f(m_step, sL, last_in_obj, _input)) for f in state_funcs])
for f in state_funcs:
print(f(1,2,3,4))
exit()
for k in last_in_obj:
if k not in last_in_copy:
@ -74,32 +54,12 @@ class Executor:
# make env proc trigger field agnostic
self.apply_env_proc(env_processes, last_in_copy, last_in_copy['timestamp']) # mutating last_in_copy
print()
print(last_in_copy)
print()
def set_sys_metrics(state_dict, m_step, t_step, run):
state_dict["mech_step"], state_dict["time_step"], state_dict['run'] = m_step, t_step, run
if any(isinstance(x, list) for x in last_in_copy.values()):
last_in_copies = flatten(last_in_copy)
for last_in_copy in last_in_copies:
set_sys_metrics(last_in_copy, m_step, t_step, run)
sL.append(last_in_copies)
else:
set_sys_metrics(last_in_copy, m_step, t_step, run)
sL.append(last_in_copy)
print()
pp.pprint(last_in_copies)
print()
last_in_copy["mech_step"], last_in_copy["time_step"], last_in_copy['run'] = m_step, t_step, run
sL.append(last_in_copy)
del last_in_copy
return sL
def mech_pipeline(self, states_list, configs, env_processes, t_step, run):
m_step = 0
states_list_copy = deepcopy(states_list)
@ -108,57 +68,30 @@ class Executor:
genesis_states = states_list_copy[-1]
genesis_states['mech_step'], genesis_states['time_step'] = m_step, t_step
states_list = [genesis_states]
# print(genesis_states)
m_step += 1
for config in configs:
s_conf, b_conf = config[0], config[1]
last_states = states_list[-1]
if isinstance(last_states, list):
pool = ThreadPool(nodes=len(last_states)) # ToDo: Optimize
dropped_right_sL = drop_right(states_list, 1)
def multithreaded_mech_step(mod_states_list):
return self.mech_step(m_step, mod_states_list, s_conf, b_conf, env_processes, t_step, run)
states_lists = pool.map(
lambda last_state_dict: dropped_right_sL + [last_state_dict],
last_states
)
print()
# pp.pprint(configs)
else:
states_lists = self.mech_step(m_step, states_list, s_conf, b_conf, env_processes, t_step, run)
states_list = self.mech_step(m_step, states_list, s_conf, b_conf, env_processes, t_step, run)
m_step += 1
t_step += 1
exit()
return states_list
# rename pipe
# ToDo: Rename Run Pipeline
def block_pipeline(self, states_list, configs, env_processes, time_seq, run):
time_seq = [x + 1 for x in time_seq]
simulation_list = [states_list]
# print(len(configs))
for time_step in time_seq:
# print(simulation_list)
if len(simulation_list) == 1:
pipe_run = self.mech_pipeline(simulation_list[-1], configs, env_processes, time_step, run)
exit()
# elif np.array(pipe_run[-1]) == 2:
# pipe_run = self.mech_pipeline(simulation_list[-1], configs, env_processes, time_step, run)
# print(pipe_run)
pipe_run = self.mech_pipeline(simulation_list[-1], configs, env_processes, time_step, run)
_, *pipe_run = pipe_run
# print(pipe_run)
simulation_list.append(pipe_run)
return simulation_list
# Del _ / head
# ToDo: Muiltithreaded Runs
def simulation(self, states_list, configs, env_processes, time_seq, runs):
pipe_run = []
for run in range(runs):

View File

@ -3,7 +3,7 @@ from tabulate import tabulate
# The following imports NEED to be in the exact same order
from SimCAD.engine import ExecutionMode, ExecutionContext, Executor
from simulations.validation import config1, config2
from simulations.validation import config1 #, config2
# from simulations.validation import base_config1, base_config2
# from simulations.barlin import config4
# from simulations.zx import config_zx
@ -15,7 +15,7 @@ from SimCAD import configs
exec_mode = ExecutionMode()
print("Simulation Execution 1")
print("Simulation Execution 1: Config 1")
print()
first_config = [configs[0]] # from config1
single_proc_ctx = ExecutionContext(context=exec_mode.single_proc)
@ -29,9 +29,9 @@ print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))
print()
#
# print("Simulation Execution 2: Pairwise Execution")
# print()
# multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
# run2 = Executor(exec_context=multi_proc_ctx, configs=configs)
# for raw_result, tensor_field in run2.main():

View File

@ -3,8 +3,9 @@ import numpy as np
from datetime import timedelta
from fn.func import curried
import pprint
from copy import deepcopy
from SimCAD import configs
from SimCAD.utils import flatMap
from SimCAD.utils import flatMap, rename
from SimCAD.configuration import Configuration
from SimCAD.configuration.utils import exo_update_per_ts, proc_trigger, bound_norm_random, \
ep_time_step, param_sweep
@ -12,6 +13,7 @@ from SimCAD.engine.utils import sweep
pp = pprint.PrettyPrinter(indent=4)
# ToDo: handle single param sweep
beta =[Decimal(1), Decimal(2)]
seed = {
@ -28,12 +30,12 @@ def b1m1(step, sL, s):
def b2m1(step, sL, s):
return {'param2': 4}
# @curried
# def b1m2(param, step, sL, s):
# return {'param1': 'a', 'param2': param}
def b1m2(step, sL, s):
return {'param1': 'a', 'param2': 2}
@curried
def b1m2(param, step, sL, s):
return {'param1': 'a', 'param2': param}
#
# def b1m2(step, sL, s):
# return {'param1': 'a', 'param2': 2}
def b2m2(step, sL, s):
return {'param1': 'b', 'param2': 4}
@ -52,18 +54,18 @@ def s1m1(step, sL, s, _input):
return (y, x)
param = Decimal(11.0)
def s2m1(step, sL, s, _input):
y = 's2'
x = _input['param2'] + param
return (y, x)
# @curried
# def s2m1(param, step, sL, s, _input):
# param = Decimal(11.0)
# def s2m1(step, sL, s, _input):
# y = 's2'
# x = _input['param2'] + param
# return (y, x)
@curried
def s2m1(param, step, sL, s, _input):
y = 's2'
x = _input['param2'] + param
return (y, x)
def s1m2(step, sL, s, _input):
y = 's1'
x = _input['param1']
@ -85,18 +87,18 @@ def s2m3(step, sL, s, _input):
# Exogenous States
proc_one_coef_A = 0.7
proc_one_coef_B = 1.3
def es3p1(step, sL, s, _input):
y = 's3'
x = s['s3'] * bound_norm_random(seed['a'], proc_one_coef_A, proc_one_coef_B)
return (y, x)
# @curried
# def es3p1(param, step, sL, s, _input):
#
# def es3p1(step, sL, s, _input):
# y = 's3'
# x = s['s3'] + param
# x = s['s3'] * bound_norm_random(seed['a'], proc_one_coef_A, proc_one_coef_B)
# return (y, x)
@curried
def es3p1(param, step, sL, s, _input):
y = 's3'
x = s['s3'] + param
return (y, x)
def es4p2(step, sL, s, _input):
y = 's4'
x = s['s4'] * bound_norm_random(seed['b'], proc_one_coef_A, proc_one_coef_B)
@ -137,27 +139,25 @@ raw_exogenous_states = {
"timestamp": es5p2
}
exogenous_states = exo_update_per_ts(raw_exogenous_states)
exogenous_states['s3'] = rename('parameterized', es3p1)
# ToDo: make env proc trigger field agnostic
# ToDo: input json into function renaming __name__
triggered_env_b = proc_trigger('2018-10-01 15:16:25', env_b)
env_processes = {
"s3": env_a, #sweep(beta, env_a, 'env_a'),
"s4": sweep(beta, triggered_env_b, 'triggered_env_b')
"s4": rename('parameterized', triggered_env_b) #sweep(beta, triggered_env_b)
}
# lambdas
# genesis Sites should always be there
# [1, 2]
# behavior_ops = [ foldr(_ + _), lambda x: x + 0 ]
# [1, 2] = {'b1': ['a'], 'b2', [1]} =
# behavior_ops = [ behavior_to_dict, print_fwd, sum_dict_values ]
# behavior_ops = [foldr(dict_elemwise_sum())]
# behavior_ops = [foldr(lambda a, b: a + b)]
# ToDo: The number of values enteren in sweep should be the # of config objs created,
# not dependent on the # of times the sweep is applied
# sweep exo_state func and point to exo-state in every other funtion
# param sweep on genesis states
# need at least 1 behaviour and 1 state function for the 1st mech with behaviors
# mechanisms = {}
mechanisms = {
"m1": {
"behaviors": {
@ -166,12 +166,12 @@ mechanisms = {
},
"states": { # exclude only. TypeError: reduce() of empty sequence with no initial value
"s1": s1m1,
"s2": s2m1 #sweep(beta, s2m1)
"s2": rename('parameterized', s2m1) #s2m1(1) #sweep(beta, s2m1)
}
},
"m2": {
"behaviors": {
"b1": b1m2, #sweep(beta, b1m2),
"b1": rename('parameterized', b1m2), #b1m2(1) #sweep(beta, b1m2),
"b2": b2m2
},
"states": {
@ -196,27 +196,91 @@ sim_config = {
"T": range(5)
}
c = Configuration(
sim_config=sim_config,
state_dict=genesis_states,
seed=seed,
env_processes=env_processes,
exogenous_states=exogenous_states,
mechanisms=mechanisms
# # print(rename('new', b2m2).__name__)
#
def parameterize_mechanism(mechanisms, param):
new_mechanisms = deepcopy(mechanisms)
for mech, update_types in new_mechanisms.items():
for update_type, fkv in update_types.items():
for sk, vf in fkv.items():
if vf.__name__ == 'parameterized':
print(vf.__name__)
new_mechanisms[mech][update_type][sk] = vf(param)
del mechanisms
return new_mechanisms
def parameterize_states(states_dict, param):
new_states_dict = deepcopy(states_dict)
for sk, vf in new_states_dict.items():
if vf.__name__ == 'parameterized':
print(vf.__name__)
new_states_dict[sk] = vf(param)
del states_dict
return new_states_dict
# parameterize_mechanism(mechanisms, beta)
@curried
def s2m1(param, a, b, c, d):
y = a
x = b, + c + d + param
return (y, x)
# print(s2m1(1)(1))
pp.pprint(parameterize_mechanism(mechanisms, 1))
# pp.pprint(parameterize_states(raw_exogenous_states, 1))
# pp.pprint(parameterize_states(env_processes, 1))
configs.append(
Configuration(
sim_config=sim_config,
state_dict=genesis_states,
seed=seed,
exogenous_states=exogenous_states,
env_processes=env_processes,
mechanisms=parameterize_mechanism(mechanisms, 1)
)
)
configs = configs + param_sweep(c, raw_exogenous_states)
# def sweep_config(config, params):
# new_config = deepcopy(config)
# configs = []
# for param in params:
# new_config.mechanisms = parameterize_mechanism(config.mechanisms, param)
# # new_config.raw_exogenous_states = parameterize_states(config.exogenous_states, param)
# # new_config.env_processes = parameterize_states(config.env_processes, param)
# configs.append(new_config)
# del config
# return configs
print()
print(len(configs))
print()
for g in configs:
print()
print('Configuration')
print()
pp.pprint(g.env_processes)
print()
pp.pprint(g.exogenous_states)
print()
pp.pprint(g.mechanisms)
print()
# print(sweep_config(c, beta))
#
# for config in sweep_config(c, beta):
# configs.append(config)
# for config in param_sweep(c, raw_exogenous_states):
# configs.append(config)
# # configs = configs +
# #
# print()
# print(len(configs))
# print()
# for g in configs:
# print()
# print('Configuration')
# print()
# pp.pprint(g.env_processes)
# print()
# pp.pprint(g.exogenous_states)
# print()
# pp.pprint(g.mechanisms)
# print()