From f55124fbb0f5da8cbca2c81bdf6828301254ebe1 Mon Sep 17 00:00:00 2001 From: "Joshua E. Jodesty" Date: Mon, 10 Dec 2018 22:54:29 -0500 Subject: [PATCH] multithreaded mech_step, mech_pipeline in progress --- .gitignore | 1 + SimCAD/configuration/utils/__init__.py | 22 ++++++- .../utils/behaviorAggregation.py | 13 ++-- SimCAD/engine/simulation.py | 65 +++++++++++++++++-- SimCAD/engine/utils.py | 18 +++-- SimCAD/utils/__init__.py | 37 ++++++++++- simulations/sim_test.py | 26 ++++---- simulations/validation/config1.py | 31 +++++++-- 8 files changed, 170 insertions(+), 43 deletions(-) diff --git a/.gitignore b/.gitignore index 9e0a0b6..2728be8 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ .DS_Store .idea notebooks/.ipynb_checkpoints +notebooks/multithreading.ipynb SimCAD.egg-info __pycache__ Pipfile diff --git a/SimCAD/configuration/utils/__init__.py b/SimCAD/configuration/utils/__init__.py index 5364566..9fb3e22 100644 --- a/SimCAD/configuration/utils/__init__.py +++ b/SimCAD/configuration/utils/__init__.py @@ -2,7 +2,9 @@ from datetime import datetime, timedelta from decimal import Decimal from fn.func import curried import pandas as pd +from pathos.threading import ThreadPool +from SimCAD.utils import groupByKey class TensorFieldReport: def __init__(self, config_proc): @@ -18,6 +20,14 @@ class TensorFieldReport: return df +# def s_update(y, x): +# return lambda step, sL, s, _input: (y, x) +# +# +def state_update(y, x): + return lambda step, sL, s, _input: (y, x) + + def bound_norm_random(rng, low, high): # Add RNG Seed res = rng.normal((high+low)/2,(high-low)/6) @@ -53,9 +63,15 @@ def ep_time_step(s, dt_str, fromat_str='%Y-%m-%d %H:%M:%S', _timedelta = t_delta def exo_update_per_ts(ep): @curried - def ep_decorator(f, y, step, sL, s, _input): + def ep_decorator(fs, y, step, sL, s, _input): + # print(s) if s['mech_step'] + 1 == 1: # inside f body to reduce performance costs - return f(step, sL, s, _input) + if isinstance(fs, list): + pool = ThreadPool(nodes=len(fs)) + fx = pool.map(lambda f: f(step, sL, s, _input), fs) + return groupByKey(fx) + else: + return fs(step, sL, s, _input) else: return (y, s[y]) - return {es: ep_decorator(f, es) for es, f in ep.items()} \ No newline at end of file + return {es: ep_decorator(f, es) for es, f in ep.items()} diff --git a/SimCAD/configuration/utils/behaviorAggregation.py b/SimCAD/configuration/utils/behaviorAggregation.py index abde5d6..7b51f8a 100644 --- a/SimCAD/configuration/utils/behaviorAggregation.py +++ b/SimCAD/configuration/utils/behaviorAggregation.py @@ -2,14 +2,15 @@ from fn.op import foldr from fn.func import curried -def get_base_value(datatype): - if datatype is str: +def get_base_value(x): + if isinstance(x, str): return '' - elif datatype is int: + elif isinstance(x, int): return 0 - elif datatype is list: + elif isinstance(x, list): return [] - return 0 + else: + return 0 def behavior_to_dict(v): @@ -33,7 +34,7 @@ def sum_dict_values(): def dict_op(f, d1, d2): def set_base_value(target_dict, source_dict, key): if key not in target_dict: - return get_base_value(type(source_dict[key])) + return get_base_value(source_dict[key]) else: return target_dict[key] diff --git a/SimCAD/engine/simulation.py b/SimCAD/engine/simulation.py index a0c5d61..7bcfa43 100644 --- a/SimCAD/engine/simulation.py +++ b/SimCAD/engine/simulation.py @@ -1,7 +1,11 @@ +from pathos.threading import ThreadPool from copy import deepcopy from fn.op import foldr, call +import pprint -from SimCAD.utils import rename +pp = pprint.PrettyPrinter(indent=4) + +from SimCAD.utils import groupByKey, flatten, drop_right from SimCAD.engine.utils import engine_exception @@ -23,23 +27,42 @@ class Executor: return foldr(call, get_col_results(step, sL, s, funcs))(ops) + def xthreaded_env_proc(self, f, s_valx): + if isinstance(s_valx, list): + pool = ThreadPool(nodes=len(s_valx)) # ToDo: Optimize + return pool.map(lambda f: f(s_valx), s_valx) + else: + return f(s_valx) + def apply_env_proc(self, env_processes, state_dict, step): for state in state_dict.keys(): if state in list(env_processes.keys()): env_state = env_processes[state] if (env_state.__name__ == '_curried') or (env_state.__name__ == 'proc_trigger'): # might want to change - state_dict[state] = env_state(step)(state_dict[state]) + state_dict[state] = self.xthreaded_env_proc(env_state(step), state_dict[state]) else: - state_dict[state] = env_state(state_dict[state]) + state_dict[state] = self.xthreaded_env_proc(env_state, state_dict[state]) + def xthreaded_state_update(self, fs, m_step, sL, last_in_obj, _input): + if isinstance(fs, list): + pool = ThreadPool(nodes=len(fs)) # ToDo: Optimize + fx = pool.map(lambda f: f(m_step, sL, last_in_obj, _input), fs) + return groupByKey(fx) + else: + return fs(m_step, sL, last_in_obj, _input) + def mech_step(self, m_step, sL, state_funcs, behavior_funcs, env_processes, t_step, run): last_in_obj = sL[-1] _input = self.state_update_exception(self.get_behavior_input(m_step, sL, last_in_obj, behavior_funcs)) # ToDo: add env_proc generator to `last_in_copy` iterator as wrapper function - last_in_copy = dict([self.behavior_update_exception(f(m_step, sL, last_in_obj, _input)) for f in state_funcs]) + last_in_copy = dict([ + self.behavior_update_exception( + self.xthreaded_state_update(f, m_step, sL, last_in_obj, _input) + ) for f in state_funcs + ]) for k in last_in_obj: if k not in last_in_copy: @@ -49,11 +72,28 @@ class Executor: # make env proc trigger field agnostic self.apply_env_proc(env_processes, last_in_copy, last_in_copy['timestamp']) # mutating last_in_copy + # print() + # pp.pprint(last_in_copy) + # exit() + + def set_sys_metrics(m_step, t_step, run): + last_in_copy["mech_step"], last_in_copy["time_step"], last_in_copy['run'] = m_step, t_step, run + + if any(isinstance(x, list) for x in last_in_copy.values()): + last_in_copies = flatten(last_in_copy) + for last_in_copy in last_in_copies: + set_sys_metrics(m_step, t_step, run) + sL.append(last_in_copies) + else: + set_sys_metrics(m_step, t_step, run) + sL.append(last_in_copy) - last_in_copy["mech_step"], last_in_copy["time_step"], last_in_copy['run'] = m_step, t_step, run - sL.append(last_in_copy) del last_in_copy + # print() + # pp.pprint(sL) + # exit() + return sL def mech_pipeline(self, states_list, configs, env_processes, t_step, run): @@ -64,15 +104,28 @@ class Executor: genesis_states = states_list_copy[-1] genesis_states['mech_step'], genesis_states['time_step'] = m_step, t_step states_list = [genesis_states] + # print(genesis_states) m_step += 1 for config in configs: s_conf, b_conf = config[0], config[1] + last_states = states_list[-1] + dropped_right_sL = drop_right(states_list, 1) + print() + # print(states_list) + # if isinstance(last_states, list): + # x = list(map(lambda last_state_dict: states_list.pop().append(last_state_dict), last_states)) + # pp.pprint(states_list) + states_list = self.mech_step(m_step, states_list, s_conf, b_conf, env_processes, t_step, run) m_step += 1 t_step += 1 + print() + # print(states_list) + exit() + return states_list # rename pipe diff --git a/SimCAD/engine/utils.py b/SimCAD/engine/utils.py index 947a167..90c8fd9 100644 --- a/SimCAD/engine/utils.py +++ b/SimCAD/engine/utils.py @@ -1,5 +1,7 @@ from datetime import datetime from fn.func import curried +from SimCAD.utils import rename +# from SimCAD.configuration.utils import s_update def datetime_range(start, end, delta, dt_format='%Y-%m-%d %H:%M:%S'): @@ -24,6 +26,8 @@ def retrieve_state(l, offset): return l[last_index(l) + offset + 1] +# exception_function = f(m_step, sL, sL[-2], _input) +# try_function = f(m_step, sL, last_mut_obj, _input) @curried def engine_exception(ErrorType, error_message, exception_function, try_function): try: @@ -33,9 +37,11 @@ def engine_exception(ErrorType, error_message, exception_function, try_function) return exception_function -# def exception_handler(f, m_step, sL, last_mut_obj, _input): -# try: -# return f(m_step, sL, last_mut_obj, _input) -# except KeyError: -# print("Exception") -# return f(m_step, sL, sL[-2], _input) \ No newline at end of file +@curried +def fit_param(param, x): + return x + param + +# fit_param = lambda param: lambda x: x + param + +def sweep(params, sweep_f): + return [rename('sweep', sweep_f(param)) for param in params] diff --git a/SimCAD/utils/__init__.py b/SimCAD/utils/__init__.py index 3ee6e50..36b6d84 100644 --- a/SimCAD/utils/__init__.py +++ b/SimCAD/utils/__init__.py @@ -1,3 +1,5 @@ +from collections import defaultdict +from itertools import product # from fn.func import curried def pipe(x): @@ -9,17 +11,46 @@ def print_pipe(x): return x +def flattenDict(l): + def tupalize(k, vs): + l = [] + if isinstance(vs, list): + for v in vs: + l.append((k, v)) + else: + l.append((k, vs)) + return l + + flat_list = [tupalize(k, vs) for k, vs in l.items()] + flat_dict = [dict(items) for items in product(*flat_list)] + return flat_dict + + def flatten(l): - return [item for sublist in l for item in sublist] + if isinstance(l, list): + return [item for sublist in l for item in sublist] + elif isinstance(l, dict): + return flattenDict(l) -def flatmap(f, items): - return list(map(f, items)) +def drop_right(l, n=1): + return l[:len(l)-n] + +# def flatmap(f, items): +# return list(map(f, items)) def key_filter(l, keyname): return [v[keyname] for k, v in l.items()] + +def groupByKey(l): + d = defaultdict(list) + for key, value in l: + d[key].append(value) + return list(dict(d).items()).pop() + + # @curried def rename(new_name, f): f.__name__ = new_name diff --git a/simulations/sim_test.py b/simulations/sim_test.py index 1bbfcdc..161db56 100644 --- a/simulations/sim_test.py +++ b/simulations/sim_test.py @@ -29,16 +29,16 @@ print(tabulate(tensor_field, headers='keys', tablefmt='psql')) print("Output:") print(tabulate(result, headers='keys', tablefmt='psql')) print() - -print("Simulation Execution 2: Pairwise Execution") -print() -multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc) -run2 = Executor(exec_context=multi_proc_ctx, configs=configs) -for raw_result, tensor_field in run2.main(): - result = pd.DataFrame(raw_result) - print() - print("Tensor Field:") - print(tabulate(tensor_field, headers='keys', tablefmt='psql')) - print("Output:") - print(tabulate(result, headers='keys', tablefmt='psql')) - print() \ No newline at end of file +# +# print("Simulation Execution 2: Pairwise Execution") +# print() +# multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc) +# run2 = Executor(exec_context=multi_proc_ctx, configs=configs) +# for raw_result, tensor_field in run2.main(): +# result = pd.DataFrame(raw_result) +# print() +# print("Tensor Field:") +# print(tabulate(tensor_field, headers='keys', tablefmt='psql')) +# print("Output:") +# print(tabulate(result, headers='keys', tablefmt='psql')) +# print() \ No newline at end of file diff --git a/simulations/validation/config1.py b/simulations/validation/config1.py index 7bdd5ac..e9a01c5 100644 --- a/simulations/validation/config1.py +++ b/simulations/validation/config1.py @@ -4,8 +4,9 @@ from datetime import timedelta from SimCAD import configs from SimCAD.configuration import Configuration -from SimCAD.configuration.utils import exo_update_per_ts, proc_trigger, bound_norm_random, \ +from SimCAD.configuration.utils import state_update, exo_update_per_ts, proc_trigger, bound_norm_random, \ ep_time_step +from SimCAD.engine.utils import sweep seed = { 'z': np.random.RandomState(1), @@ -42,6 +43,14 @@ def s2m1(step, sL, s, _input): x = _input['param2'] #+ [Coef2 x 5] return (y, x) +s2m1 = sweep( + params = [Decimal(11.0), Decimal(22.0)], + sweep_f = lambda param: lambda step, sL, s, _input: ( + 's2', + s['s2'] + param + ) +) + def s1m2(step, sL, s, _input): y = 's1' x = _input['param1'] @@ -64,10 +73,20 @@ def s2m3(step, sL, s, _input): proc_one_coef_A = 0.7 proc_one_coef_B = 1.3 -def es3p1(step, sL, s, _input): - y = 's3' - x = s['s3'] * bound_norm_random(seed['a'], proc_one_coef_A, proc_one_coef_B) - return (y, x) +# def es3p1(step, sL, s, _input): +# y = 's3' +# x = s['s3'] * bound_norm_random(seed['a'], proc_one_coef_A, proc_one_coef_B) +# return (y, x) + + +es3p1 = sweep( + params = [Decimal(11.0), Decimal(22.0)], + sweep_f = lambda param: lambda step, sL, s, _input: ( + 's3', + s['s3'] + param + ) +) + def es4p2(step, sL, s, _input): y = 's4' @@ -111,7 +130,7 @@ exogenous_states = exo_update_per_ts( # ToDo: make env proc trigger field agnostic # ToDo: input json into function renaming __name__ env_processes = { - "s3": env_a, + # "s3": env_a, "s4": proc_trigger('2018-10-01 15:16:25', env_b) }