multithreaded mech_step, mech_pipeline in progress

This commit is contained in:
Joshua E. Jodesty 2018-12-10 22:54:29 -05:00
parent 980bba081a
commit f55124fbb0
8 changed files with 170 additions and 43 deletions

1
.gitignore vendored
View File

@ -2,6 +2,7 @@
.DS_Store .DS_Store
.idea .idea
notebooks/.ipynb_checkpoints notebooks/.ipynb_checkpoints
notebooks/multithreading.ipynb
SimCAD.egg-info SimCAD.egg-info
__pycache__ __pycache__
Pipfile Pipfile

View File

@ -2,7 +2,9 @@ from datetime import datetime, timedelta
from decimal import Decimal from decimal import Decimal
from fn.func import curried from fn.func import curried
import pandas as pd import pandas as pd
from pathos.threading import ThreadPool
from SimCAD.utils import groupByKey
class TensorFieldReport: class TensorFieldReport:
def __init__(self, config_proc): def __init__(self, config_proc):
@ -18,6 +20,14 @@ class TensorFieldReport:
return df return df
# def s_update(y, x):
# return lambda step, sL, s, _input: (y, x)
#
#
def state_update(y, x):
return lambda step, sL, s, _input: (y, x)
def bound_norm_random(rng, low, high): def bound_norm_random(rng, low, high):
# Add RNG Seed # Add RNG Seed
res = rng.normal((high+low)/2,(high-low)/6) res = rng.normal((high+low)/2,(high-low)/6)
@ -53,9 +63,15 @@ def ep_time_step(s, dt_str, fromat_str='%Y-%m-%d %H:%M:%S', _timedelta = t_delta
def exo_update_per_ts(ep): def exo_update_per_ts(ep):
@curried @curried
def ep_decorator(f, y, step, sL, s, _input): def ep_decorator(fs, y, step, sL, s, _input):
# print(s)
if s['mech_step'] + 1 == 1: # inside f body to reduce performance costs if s['mech_step'] + 1 == 1: # inside f body to reduce performance costs
return f(step, sL, s, _input) if isinstance(fs, list):
pool = ThreadPool(nodes=len(fs))
fx = pool.map(lambda f: f(step, sL, s, _input), fs)
return groupByKey(fx)
else:
return fs(step, sL, s, _input)
else: else:
return (y, s[y]) return (y, s[y])
return {es: ep_decorator(f, es) for es, f in ep.items()} return {es: ep_decorator(f, es) for es, f in ep.items()}

View File

@ -2,14 +2,15 @@ from fn.op import foldr
from fn.func import curried from fn.func import curried
def get_base_value(datatype): def get_base_value(x):
if datatype is str: if isinstance(x, str):
return '' return ''
elif datatype is int: elif isinstance(x, int):
return 0 return 0
elif datatype is list: elif isinstance(x, list):
return [] return []
return 0 else:
return 0
def behavior_to_dict(v): def behavior_to_dict(v):
@ -33,7 +34,7 @@ def sum_dict_values():
def dict_op(f, d1, d2): def dict_op(f, d1, d2):
def set_base_value(target_dict, source_dict, key): def set_base_value(target_dict, source_dict, key):
if key not in target_dict: if key not in target_dict:
return get_base_value(type(source_dict[key])) return get_base_value(source_dict[key])
else: else:
return target_dict[key] return target_dict[key]

View File

@ -1,7 +1,11 @@
from pathos.threading import ThreadPool
from copy import deepcopy from copy import deepcopy
from fn.op import foldr, call from fn.op import foldr, call
import pprint
from SimCAD.utils import rename pp = pprint.PrettyPrinter(indent=4)
from SimCAD.utils import groupByKey, flatten, drop_right
from SimCAD.engine.utils import engine_exception from SimCAD.engine.utils import engine_exception
@ -23,23 +27,42 @@ class Executor:
return foldr(call, get_col_results(step, sL, s, funcs))(ops) return foldr(call, get_col_results(step, sL, s, funcs))(ops)
def xthreaded_env_proc(self, f, s_valx):
if isinstance(s_valx, list):
pool = ThreadPool(nodes=len(s_valx)) # ToDo: Optimize
return pool.map(lambda f: f(s_valx), s_valx)
else:
return f(s_valx)
def apply_env_proc(self, env_processes, state_dict, step): def apply_env_proc(self, env_processes, state_dict, step):
for state in state_dict.keys(): for state in state_dict.keys():
if state in list(env_processes.keys()): if state in list(env_processes.keys()):
env_state = env_processes[state] env_state = env_processes[state]
if (env_state.__name__ == '_curried') or (env_state.__name__ == 'proc_trigger'): # might want to change if (env_state.__name__ == '_curried') or (env_state.__name__ == 'proc_trigger'): # might want to change
state_dict[state] = env_state(step)(state_dict[state]) state_dict[state] = self.xthreaded_env_proc(env_state(step), state_dict[state])
else: else:
state_dict[state] = env_state(state_dict[state]) state_dict[state] = self.xthreaded_env_proc(env_state, state_dict[state])
def xthreaded_state_update(self, fs, m_step, sL, last_in_obj, _input):
if isinstance(fs, list):
pool = ThreadPool(nodes=len(fs)) # ToDo: Optimize
fx = pool.map(lambda f: f(m_step, sL, last_in_obj, _input), fs)
return groupByKey(fx)
else:
return fs(m_step, sL, last_in_obj, _input)
def mech_step(self, m_step, sL, state_funcs, behavior_funcs, env_processes, t_step, run): def mech_step(self, m_step, sL, state_funcs, behavior_funcs, env_processes, t_step, run):
last_in_obj = sL[-1] last_in_obj = sL[-1]
_input = self.state_update_exception(self.get_behavior_input(m_step, sL, last_in_obj, behavior_funcs)) _input = self.state_update_exception(self.get_behavior_input(m_step, sL, last_in_obj, behavior_funcs))
# ToDo: add env_proc generator to `last_in_copy` iterator as wrapper function # ToDo: add env_proc generator to `last_in_copy` iterator as wrapper function
last_in_copy = dict([self.behavior_update_exception(f(m_step, sL, last_in_obj, _input)) for f in state_funcs]) last_in_copy = dict([
self.behavior_update_exception(
self.xthreaded_state_update(f, m_step, sL, last_in_obj, _input)
) for f in state_funcs
])
for k in last_in_obj: for k in last_in_obj:
if k not in last_in_copy: if k not in last_in_copy:
@ -49,11 +72,28 @@ class Executor:
# make env proc trigger field agnostic # make env proc trigger field agnostic
self.apply_env_proc(env_processes, last_in_copy, last_in_copy['timestamp']) # mutating last_in_copy self.apply_env_proc(env_processes, last_in_copy, last_in_copy['timestamp']) # mutating last_in_copy
# print()
# pp.pprint(last_in_copy)
# exit()
def set_sys_metrics(m_step, t_step, run):
last_in_copy["mech_step"], last_in_copy["time_step"], last_in_copy['run'] = m_step, t_step, run
if any(isinstance(x, list) for x in last_in_copy.values()):
last_in_copies = flatten(last_in_copy)
for last_in_copy in last_in_copies:
set_sys_metrics(m_step, t_step, run)
sL.append(last_in_copies)
else:
set_sys_metrics(m_step, t_step, run)
sL.append(last_in_copy)
last_in_copy["mech_step"], last_in_copy["time_step"], last_in_copy['run'] = m_step, t_step, run
sL.append(last_in_copy)
del last_in_copy del last_in_copy
# print()
# pp.pprint(sL)
# exit()
return sL return sL
def mech_pipeline(self, states_list, configs, env_processes, t_step, run): def mech_pipeline(self, states_list, configs, env_processes, t_step, run):
@ -64,15 +104,28 @@ class Executor:
genesis_states = states_list_copy[-1] genesis_states = states_list_copy[-1]
genesis_states['mech_step'], genesis_states['time_step'] = m_step, t_step genesis_states['mech_step'], genesis_states['time_step'] = m_step, t_step
states_list = [genesis_states] states_list = [genesis_states]
# print(genesis_states)
m_step += 1 m_step += 1
for config in configs: for config in configs:
s_conf, b_conf = config[0], config[1] s_conf, b_conf = config[0], config[1]
last_states = states_list[-1]
dropped_right_sL = drop_right(states_list, 1)
print()
# print(states_list)
# if isinstance(last_states, list):
# x = list(map(lambda last_state_dict: states_list.pop().append(last_state_dict), last_states))
# pp.pprint(states_list)
states_list = self.mech_step(m_step, states_list, s_conf, b_conf, env_processes, t_step, run) states_list = self.mech_step(m_step, states_list, s_conf, b_conf, env_processes, t_step, run)
m_step += 1 m_step += 1
t_step += 1 t_step += 1
print()
# print(states_list)
exit()
return states_list return states_list
# rename pipe # rename pipe

View File

@ -1,5 +1,7 @@
from datetime import datetime from datetime import datetime
from fn.func import curried from fn.func import curried
from SimCAD.utils import rename
# from SimCAD.configuration.utils import s_update
def datetime_range(start, end, delta, dt_format='%Y-%m-%d %H:%M:%S'): def datetime_range(start, end, delta, dt_format='%Y-%m-%d %H:%M:%S'):
@ -24,6 +26,8 @@ def retrieve_state(l, offset):
return l[last_index(l) + offset + 1] return l[last_index(l) + offset + 1]
# exception_function = f(m_step, sL, sL[-2], _input)
# try_function = f(m_step, sL, last_mut_obj, _input)
@curried @curried
def engine_exception(ErrorType, error_message, exception_function, try_function): def engine_exception(ErrorType, error_message, exception_function, try_function):
try: try:
@ -33,9 +37,11 @@ def engine_exception(ErrorType, error_message, exception_function, try_function)
return exception_function return exception_function
# def exception_handler(f, m_step, sL, last_mut_obj, _input): @curried
# try: def fit_param(param, x):
# return f(m_step, sL, last_mut_obj, _input) return x + param
# except KeyError:
# print("Exception") # fit_param = lambda param: lambda x: x + param
# return f(m_step, sL, sL[-2], _input)
def sweep(params, sweep_f):
return [rename('sweep', sweep_f(param)) for param in params]

View File

@ -1,3 +1,5 @@
from collections import defaultdict
from itertools import product
# from fn.func import curried # from fn.func import curried
def pipe(x): def pipe(x):
@ -9,17 +11,46 @@ def print_pipe(x):
return x return x
def flattenDict(l):
def tupalize(k, vs):
l = []
if isinstance(vs, list):
for v in vs:
l.append((k, v))
else:
l.append((k, vs))
return l
flat_list = [tupalize(k, vs) for k, vs in l.items()]
flat_dict = [dict(items) for items in product(*flat_list)]
return flat_dict
def flatten(l): def flatten(l):
return [item for sublist in l for item in sublist] if isinstance(l, list):
return [item for sublist in l for item in sublist]
elif isinstance(l, dict):
return flattenDict(l)
def flatmap(f, items): def drop_right(l, n=1):
return list(map(f, items)) return l[:len(l)-n]
# def flatmap(f, items):
# return list(map(f, items))
def key_filter(l, keyname): def key_filter(l, keyname):
return [v[keyname] for k, v in l.items()] return [v[keyname] for k, v in l.items()]
def groupByKey(l):
d = defaultdict(list)
for key, value in l:
d[key].append(value)
return list(dict(d).items()).pop()
# @curried # @curried
def rename(new_name, f): def rename(new_name, f):
f.__name__ = new_name f.__name__ = new_name

View File

@ -29,16 +29,16 @@ print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:") print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql')) print(tabulate(result, headers='keys', tablefmt='psql'))
print() print()
#
print("Simulation Execution 2: Pairwise Execution") # print("Simulation Execution 2: Pairwise Execution")
print() # print()
multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc) # multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
run2 = Executor(exec_context=multi_proc_ctx, configs=configs) # run2 = Executor(exec_context=multi_proc_ctx, configs=configs)
for raw_result, tensor_field in run2.main(): # for raw_result, tensor_field in run2.main():
result = pd.DataFrame(raw_result) # result = pd.DataFrame(raw_result)
print() # print()
print("Tensor Field:") # print("Tensor Field:")
print(tabulate(tensor_field, headers='keys', tablefmt='psql')) # print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:") # print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql')) # print(tabulate(result, headers='keys', tablefmt='psql'))
print() # print()

View File

@ -4,8 +4,9 @@ from datetime import timedelta
from SimCAD import configs from SimCAD import configs
from SimCAD.configuration import Configuration from SimCAD.configuration import Configuration
from SimCAD.configuration.utils import exo_update_per_ts, proc_trigger, bound_norm_random, \ from SimCAD.configuration.utils import state_update, exo_update_per_ts, proc_trigger, bound_norm_random, \
ep_time_step ep_time_step
from SimCAD.engine.utils import sweep
seed = { seed = {
'z': np.random.RandomState(1), 'z': np.random.RandomState(1),
@ -42,6 +43,14 @@ def s2m1(step, sL, s, _input):
x = _input['param2'] #+ [Coef2 x 5] x = _input['param2'] #+ [Coef2 x 5]
return (y, x) return (y, x)
s2m1 = sweep(
params = [Decimal(11.0), Decimal(22.0)],
sweep_f = lambda param: lambda step, sL, s, _input: (
's2',
s['s2'] + param
)
)
def s1m2(step, sL, s, _input): def s1m2(step, sL, s, _input):
y = 's1' y = 's1'
x = _input['param1'] x = _input['param1']
@ -64,10 +73,20 @@ def s2m3(step, sL, s, _input):
proc_one_coef_A = 0.7 proc_one_coef_A = 0.7
proc_one_coef_B = 1.3 proc_one_coef_B = 1.3
def es3p1(step, sL, s, _input): # def es3p1(step, sL, s, _input):
y = 's3' # y = 's3'
x = s['s3'] * bound_norm_random(seed['a'], proc_one_coef_A, proc_one_coef_B) # x = s['s3'] * bound_norm_random(seed['a'], proc_one_coef_A, proc_one_coef_B)
return (y, x) # return (y, x)
es3p1 = sweep(
params = [Decimal(11.0), Decimal(22.0)],
sweep_f = lambda param: lambda step, sL, s, _input: (
's3',
s['s3'] + param
)
)
def es4p2(step, sL, s, _input): def es4p2(step, sL, s, _input):
y = 's4' y = 's4'
@ -111,7 +130,7 @@ exogenous_states = exo_update_per_ts(
# ToDo: make env proc trigger field agnostic # ToDo: make env proc trigger field agnostic
# ToDo: input json into function renaming __name__ # ToDo: input json into function renaming __name__
env_processes = { env_processes = {
"s3": env_a, # "s3": env_a,
"s4": proc_trigger('2018-10-01 15:16:25', env_b) "s4": proc_trigger('2018-10-01 15:16:25', env_b)
} }