diff --git a/cadCAD/configuration/__init__.py b/cadCAD/configuration/__init__.py index fd8d551..0bae909 100644 --- a/cadCAD/configuration/__init__.py +++ b/cadCAD/configuration/__init__.py @@ -1,6 +1,5 @@ from typing import Dict, Callable, List, Tuple from functools import reduce -from fn.op import foldr import pandas as pd from pandas.core.frame import DataFrame @@ -13,9 +12,10 @@ from cadCAD.configuration.utils.depreciationHandler import sanitize_partial_stat # policy_ops=[foldr(dict_elemwise_sum())] # policy_ops=[reduce, lambda a, b: {**a, **b}] + class Configuration(object): def __init__(self, sim_config={}, initial_state={}, seeds={}, env_processes={}, - exogenous_states={}, partial_state_update_blocks={}, policy_ops=[foldr(dict_elemwise_sum())], + exogenous_states={}, partial_state_update_blocks={}, policy_ops=[lambda a, b: a + b], **kwargs) -> None: self.sim_config = sim_config self.initial_state = initial_state @@ -30,7 +30,7 @@ class Configuration(object): def append_configs(sim_configs={}, initial_state={}, seeds={}, raw_exogenous_states={}, env_processes={}, - partial_state_update_blocks={}, policy_ops=[foldr(dict_elemwise_sum())], _exo_update_per_ts: bool = True) -> None: + partial_state_update_blocks={}, policy_ops=[lambda a, b: a + b], _exo_update_per_ts: bool = True) -> None: if _exo_update_per_ts is True: exogenous_states = exo_update_per_ts(raw_exogenous_states) else: diff --git a/cadCAD/configuration/utils/udo.py b/cadCAD/configuration/utils/userDefinedObject.py similarity index 74% rename from cadCAD/configuration/utils/udo.py rename to cadCAD/configuration/utils/userDefinedObject.py index 96a9a86..c013952 100644 --- a/cadCAD/configuration/utils/udo.py +++ b/cadCAD/configuration/utils/userDefinedObject.py @@ -13,8 +13,9 @@ def val_switch(v): return v class udcView(object): - def __init__(self, d): + def __init__(self, d, masked_members): self.__dict__ = d + self.masked_members = masked_members # returns dict to dataframe # def __repr__(self): @@ -22,7 +23,7 @@ class udcView(object): members = {} variables = { k: val_switch(v) for k, v in self.__dict__.items() - if str(type(v)) != "" and k != 'obj' # and isinstance(v, DataFrame) is not True + if str(type(v)) != "" and k not in self.masked_members # and isinstance(v, DataFrame) is not True } members['methods'] = [k for k, v in self.__dict__.items() if str(type(v)) == ""] members.update(variables) @@ -43,19 +44,17 @@ class udcBroker(object): def get_members(self): return self.members_dict - def get_view(self): - return udcView(self.members_dict) + def get_view(self, masked_members): + return udcView(self.members_dict, masked_members) def get_namedtuple(self): return namedtuple("Hydra", self.members_dict.keys())(*self.members_dict.values()) - -def UDO(udc): - return udcBroker(udc).get_view() +def UDO(udo, masked_members=['obj']): + return udcBroker(udo).get_view(masked_members) def udoPipe(obj_view): - return UDO(obj_view.obj) - + return UDO(obj_view.obj, obj_view.masked_members) diff --git a/cadCAD/engine/simulation.py b/cadCAD/engine/simulation.py index 426b1f3..0eb8605 100644 --- a/cadCAD/engine/simulation.py +++ b/cadCAD/engine/simulation.py @@ -94,6 +94,7 @@ class Executor: var_dict: Dict[str, List[Any]], sub_step: int, sL: Any, + sH: Any, state_funcs: List[Callable], policy_funcs: List[Callable], env_processes: Dict[str, Callable], @@ -101,16 +102,19 @@ class Executor: run: int ) -> List[Dict[str, Any]]: - last_in_obj: Dict[str, Any] = deepcopy(sL[-1]) - # last_in_obj: Dict[str, Any] = sL[-1] + # last_in_obj: Dict[str, Any] = deepcopy(sL[-1]) + last_in_obj: Dict[str, Any] = sL[-1] + # last_in_obj: Dict[str, Any] = sH[-1] + # print(last_in_obj) + # print(sH[-1]) - _input: Dict[str, Any] = self.policy_update_exception(self.get_policy_input(var_dict, sub_step, sL, last_in_obj, policy_funcs)) + _input: Dict[str, Any] = self.policy_update_exception(self.get_policy_input(var_dict, sub_step, sH, last_in_obj, policy_funcs)) # ToDo: add env_proc generator to `last_in_copy` iterator as wrapper function # ToDo: Can be multithreaded ?? def generate_record(state_funcs): for f in state_funcs: - yield self.state_update_exception(f(var_dict, sub_step, sL, last_in_obj, _input)) + yield self.state_update_exception(f(var_dict, sub_step, sH, last_in_obj, _input)) def transfer_missing_fields(source, destination): for k in source: @@ -127,13 +131,16 @@ class Executor: sL.append(last_in_copy) del last_in_copy + # print(sL) + # print() + return sL # mech_pipeline - state_update_block def state_update_pipeline( self, var_dict: Dict[str, List[Any]], - states_list: List[Dict[str, Any]], + simulation_list, #states_list: List[Dict[str, Any]], configs: List[Tuple[List[Callable], List[Callable]]], env_processes: Dict[str, Callable], time_step: int, @@ -141,19 +148,33 @@ class Executor: ) -> List[Dict[str, Any]]: sub_step = 0 - states_list_copy: List[Dict[str, Any]] = deepcopy(states_list) + # states_list_copy: List[Dict[str, Any]] = deepcopy(states_list) + # states_list_copy: List[Dict[str, Any]] = states_list + # ToDo: flatten first + states_list_copy: List[Dict[str, Any]] = simulation_list[-1] + # print(states_list_copy) + # ToDo: Causes Substep repeats in sL: genesis_states: Dict[str, Any] = states_list_copy[-1] + + if len(states_list_copy) == 1: + genesis_states['substep'] = sub_step + # genesis_states['timestep'] = 0 + # else: + # genesis_states['timestep'] = time_step + del states_list_copy - genesis_states['substep'], genesis_states['timestep'] = sub_step, time_step states_list: List[Dict[str, Any]] = [genesis_states] + # ToDo: Causes Substep repeats in sL, use for yield sub_step += 1 for [s_conf, p_conf] in configs: states_list: List[Dict[str, Any]] = self.partial_state_update( - var_dict, sub_step, states_list, s_conf, p_conf, env_processes, time_step, run + var_dict, sub_step, states_list, simulation_list, s_conf, p_conf, env_processes, time_step, run ) - + # print(sub_step) + # print(simulation_list) + # print(flatten(simulation_list)) sub_step += 1 time_step += 1 @@ -173,12 +194,20 @@ class Executor: time_seq: List[int] = [x + 1 for x in time_seq] simulation_list: List[List[Dict[str, Any]]] = [states_list] + + # print(simulation_list[-1]) + # print() + # pipe_run = simulation_list[-1] + # print(simulation_list) for time_step in time_seq: pipe_run: List[Dict[str, Any]] = self.state_update_pipeline( - var_dict, simulation_list[-1], configs, env_processes, time_step, run + var_dict, simulation_list, configs, env_processes, time_step, run ) + _, *pipe_run = pipe_run simulation_list.append(pipe_run) + # print(simulation_list) + # print() return simulation_list @@ -197,7 +226,7 @@ class Executor: def generate_init_sys_metrics(genesis_states_list): for d in genesis_states_list: - d['run'], d['substep'], d['timestep'] = run, int(0), int(0) + d['run'], d['substep'], d['timestep'] = run, 0, 0 yield d states_list_copy: List[Dict[str, Any]] = list(generate_init_sys_metrics(deepcopy(states_list))) diff --git a/cadCAD/utils/__init__.py b/cadCAD/utils/__init__.py index 802bb9d..44717b5 100644 --- a/cadCAD/utils/__init__.py +++ b/cadCAD/utils/__init__.py @@ -9,14 +9,18 @@ from pandas import DataFrame class SilentDF(DataFrame): def __repr__(self): - return f"{hex(id(DataFrame))})" #"pandas.core.frame.DataFrame" + return str(hex(id(DataFrame))) #"pandas.core.frame.DataFrame" + +def append_dict(dict, new_dict): + dict.update(new_dict) + return dict -def val_switch(v): - if isinstance(v, DataFrame) is True or isinstance(v, SilentDF) is True: - return SilentDF(v) - else: - return v.x +# def val_switch(v): +# if isinstance(v, DataFrame) is True or isinstance(v, SilentDF) is True: +# return SilentDF(v) +# else: +# return v.x class IndexCounter: diff --git a/cadCAD/utils/sys_config.py b/cadCAD/utils/sys_config.py new file mode 100644 index 0000000..90d3250 --- /dev/null +++ b/cadCAD/utils/sys_config.py @@ -0,0 +1,26 @@ +from cadCAD.configuration.utils import ep_time_step + + +def increment(y, incr_by): + return lambda _g, step, sL, s, _input: (y, s[y] + incr_by) + +def track(y): + return lambda _g, step, sL, s, _input: (y, s[y].x) + +def simple_state_update(y, x): + return lambda _g, step, sH, s, _input: (y, x) + +def simple_policy_update(y): + return lambda _g, step, sH, s: y + +def update_timestamp(y, timedelta, format): + return lambda _g, step, sL, s, _input: ( + y, + ep_time_step(s, dt_str=s[y], fromat_str=format, _timedelta=timedelta) + ) + + +# def repr(_g, step, sL, s, _input): +# y = 'z' +# x = s['state_udo'].__repr__() +# return (y, x) \ No newline at end of file diff --git a/simulations/single_config_run.py b/simulations/single_config_run.py index 4d2e9f1..081d4e1 100644 --- a/simulations/single_config_run.py +++ b/simulations/single_config_run.py @@ -4,7 +4,8 @@ from tabulate import tabulate from cadCAD.engine import ExecutionMode, ExecutionContext, Executor # from simulations.validation import config1_test_pipe # from simulations.validation import config1 -from simulations.validation import externalds +# from simulations.validation import externalds +from simulations.validation import incr_external_dataset from cadCAD import configs exec_mode = ExecutionMode() diff --git a/simulations/az_run_udc.py b/simulations/udo_run.py similarity index 66% rename from simulations/az_run_udc.py rename to simulations/udo_run.py index 5cefaed..bbb07ba 100644 --- a/simulations/az_run_udc.py +++ b/simulations/udo_run.py @@ -2,7 +2,7 @@ import pandas as pd from tabulate import tabulate # The following imports NEED to be in the exact order from cadCAD.engine import ExecutionMode, ExecutionContext, Executor -from simulations.validation import config_udc_json3 +from simulations.validation import udo from cadCAD import configs @@ -10,13 +10,21 @@ exec_mode = ExecutionMode() print("Simulation Execution: Single Configuration") print() + + first_config = configs # only contains config1 single_proc_ctx = ExecutionContext(context=exec_mode.single_proc) run = Executor(exec_context=single_proc_ctx, configs=first_config) - +# cols = configs[0].initial_state.keys() +cols = [ + 'increment', + 'state_udo_tracker_a', 'state_udo', 'state_udo_perception_tracker', 'state_udo_tracker_b', + 'udo_policy_tracker_a', 'udo_policies', 'udo_policy_tracker_b', + 'timestamp' +] raw_result, tensor_field = run.main() -result = pd.DataFrame(raw_result) -result = pd.concat([result.drop(['c'], axis=1), result['c'].apply(pd.Series)], axis=1) +result = pd.DataFrame(raw_result)[['run', 'substep', 'timestep'] + cols] +# result = pd.concat([result.drop(['c'], axis=1), result['c'].apply(pd.Series)], axis=1) # print(list(result['c'])) diff --git a/simulations/validation/config1_test_pipe.py b/simulations/validation/config1_test_pipe.py index 2295f0d..5d9e3dd 100644 --- a/simulations/validation/config1_test_pipe.py +++ b/simulations/validation/config1_test_pipe.py @@ -40,23 +40,31 @@ def test_pipeline(_g, step, sL, s): # Internal States per Mechanism -def policies(_g, step, sL, s, _input): +def s(y): + return lambda _g, step, sH, s, _input: (y, s[y] + 1) + +def sH(_g, step, sH, s, _input): + y = 'sh' + x = sH + return (y, x) + +def policies(_g, step, sH, s, _input): y = 'policies' x = _input return (y, x) # Genesis States genesis_states = { - 'policies': {} + 'policies': {}, + 's1': 0, + 's2': 0, + # 'sh': [] } - raw_exogenous_states = {} - env_processes = {} - partial_state_update_block = { "m1": { "policies": { @@ -64,6 +72,9 @@ partial_state_update_block = { "b2": p2m1 }, "variables": { + 's1': s('s1'), + 's2': s('s2'), + # 'sh': sH, "policies": policies } }, @@ -73,6 +84,9 @@ partial_state_update_block = { "b2": p2m2 }, "variables": { + 's1': s('s1'), + 's2': s('s2'), + # 'sh': sH, "policies": policies } }, @@ -82,6 +96,9 @@ partial_state_update_block = { "b2": p2m3 }, "variables": { + 's1': s('s1'), + 's2': s('s2'), + # 'sh': sH, "policies": policies } } @@ -90,11 +107,12 @@ partial_state_update_block = { sim_config = config_sim( { - "N": 2, - "T": range(5), + "N": 1, + "T": range(3), } ) + append_configs( sim_configs=sim_config, initial_state=genesis_states, @@ -102,5 +120,13 @@ append_configs( raw_exogenous_states=raw_exogenous_states, env_processes=env_processes, partial_state_update_blocks=partial_state_update_block, - policy_ops=[lambda a, b: a + b] # , lambda y: y + 100, lambda y: y + 300 -) \ No newline at end of file + # policy_ops=[lambda a, b: a + b, lambda y: y + 100, lambda y: y + 300] +) + + +# def p1m3(_g, step, sL, s): +# return {'param1': 1, 'param2': 2, 'param3': 3} +# def p2m3(_g, step, sL, s): +# return {'param1': 1, 'param2': 2, 'param3': 3} +# +# xx = {'param1': [1,1], 'param2': [2,2], 'param3': [3,3]} \ No newline at end of file diff --git a/simulations/validation/config_udc_json3.py b/simulations/validation/config_udc_json3.py deleted file mode 100644 index 95e5bd9..0000000 --- a/simulations/validation/config_udc_json3.py +++ /dev/null @@ -1,186 +0,0 @@ -from datetime import timedelta - -from cadCAD.configuration import append_configs -from cadCAD.configuration.utils import ep_time_step, config_sim -# from cadCAD.configuration.utils.policyAggregation import dict_op, dict_elemwise_sum -from cadCAD.configuration.utils.udo import udcBroker, udoPipe, UDO -import pandas as pd -from cadCAD.utils import SilentDF, val_switch - -ds = SilentDF(pd.read_csv('/Users/jjodesty/Projects/DiffyQ-SimCAD/simulations/output.csv')) - - -class MyClass(object): - def __init__(self, x, ds=None): - self.x = x - self.ds = ds # for setting ds initially or querying - - def update(self): - self.x += 1 - return self - - def read(self, ds_uri): - self.ds = SilentDF(pd.read_csv(ds_uri)) - return self - - def write(self, ds_uri): - pd.to_csv(ds_uri) - - def getMemID(self): - return str(hex(id(self))) - - pass - - -# can be accessed after an update within the same substep and timestep - -hydra_state_view = UDO(MyClass(0, ds)) -udc_view_A = UDO(MyClass(0, ds)) -udc_view_B = UDO(MyClass(0, ds)) - -print(udc_view_A) - -# g: Dict[str, List[int]] = {'MyClassB'} - -state_dict = { - 'a': 0, 'b': 0, 'j': 0, - 'k': (0, 0), 'q': (0, 0), - 'hydra_state': hydra_state_view, - 'policies': {'hydra_A': udc_view_A, 'hydra_B': udc_view_B}, - 'timestamp': '2019-01-01 00:00:00', - 'c': {"ds1": None, "ds2": None, "ds3": None, "timestep": None} -} - -def p1(_g, step, sL, s): - s['policies']['hydra_A'].update() - return {'hydra_A': udoPipe(s['policies']['hydra_A'])} - -def p2(_g, step, sL, s): - s['policies']['hydra_B'].update() - # df = s['policies']['hydra_B'].ds - return {'hydra_B': udoPipe(s['policies']['hydra_B'])} - -# ToDo: SilentDF(df) wont work -def C(_g, step, sL, s, _input): - y = 'c' - ds = _input['hydra_B'].ds - df = ds[(ds['run'] == s['run']) & (ds['substep'] == s['substep']) & (ds['timestep'] == s['timestep'])].drop(columns=['run', 'substep']) - def pop_if_not_empty(l): - if len(l) == 0: - return None - else: - return l.pop() - - x = {k: pop_if_not_empty(list(v.values())) for k, v in df.to_dict().items()} # reomve idx - return (y, x) - -def policies(_g, step, sL, s, _input): - y = 'policies' - x = _input - return (y, x) - -timestep_duration = timedelta(minutes=1) # In this example, a timestep has a duration of 1 minute. -ts_format = '%Y-%m-%d %H:%M:%S' -def time_model(_g, step, sL, s, _input): - y = 'timestamp' - x = ep_time_step(s, dt_str=s['timestamp'], fromat_str=ts_format, _timedelta=timestep_duration) - return (y, x) - - -def HydraMembers(_g, step, sL, s, _input): - y = 'hydra_state' - s['hydra_state'].update() - x = udoPipe(s['hydra_state']) - return (y, x) - -def repr(_g, step, sL, s, _input): - y = 'z' - x = s['hydra_members'].__repr__() - return (y, x) - -def incriment(y, incr_val): - return lambda _g, step, sL, s, _input: (y, s[y] + incr_val) - -def A(_g, step, sL, s, _input): - y = 'a' - x = s['a'] + 1 - return (y, x) - -def hydra_state_tracker(y): - return lambda _g, step, sL, s, _input: (y, s['hydra_state'].x) - - -def hydra_policy_tracker(y): - return lambda _g, step, sL, s, _input: (y, tuple(val_switch(v) for k, v in s['policies'].items())) - - -# needs M1&2 need behaviors -partial_state_update_blocks = { - 'PSUB1': { - 'policies': { - "b1": p1, - "b2": p2 - }, - 'states': { - 'a': A, - 'b': hydra_state_tracker('b'), - 'c': C, - 'j': hydra_state_tracker('j'), - 'k': hydra_policy_tracker('k'), - 'q': hydra_policy_tracker('q'), - 'hydra_state': HydraMembers, - 'timestamp': time_model, - 'policies': policies - } - }, - 'PSUB2': { - 'policies': { - "b1": p1, - "b2": p2 - }, - 'states': { - 'a': A, - 'b': hydra_state_tracker('b'), - 'c': C, - 'j': hydra_state_tracker('j'), - 'k': hydra_policy_tracker('k'), - 'q': hydra_policy_tracker('q'), - 'hydra_state': HydraMembers, - 'policies': policies - } - }, - 'PSUB3': { - 'policies': { - "b1": p1, - "b2": p2 - }, - 'states': { - 'a': A, - 'b': hydra_state_tracker('b'), - 'c': C, - 'j': hydra_state_tracker('j'), - 'k': hydra_policy_tracker('k'), - 'q': hydra_policy_tracker('q'), - 'hydra_state': HydraMembers, - 'policies': policies - } - } -} - -sim_config = config_sim({ - "N": 2, - "T": range(4) -}) -z = {'z': 1} - -def addZ(d, z): - d.update(z) - return d - -append_configs( - sim_config, - state_dict, - {}, {}, {}, - partial_state_update_blocks, - policy_ops=[lambda a, b: {**a, **b}] -) diff --git a/simulations/validation/config_udc_json4.py b/simulations/validation/config_udc_json4.py index 7107dd7..29dd353 100644 --- a/simulations/validation/config_udc_json4.py +++ b/simulations/validation/config_udc_json4.py @@ -3,7 +3,7 @@ from datetime import timedelta from cadCAD.configuration import append_configs from cadCAD.configuration.utils import ep_time_step, config_sim from cadCAD.configuration.utils.policyAggregation import dict_op, dict_elemwise_sum -from cadCAD.configuration.utils.udo import udcBroker, udoPipe, UDO +from cadCAD.configuration.utils.userDefinedObject import udcBroker, udoPipe, UDO # ToDo: Create member for past value diff --git a/simulations/validation/incr_external_dataset.py b/simulations/validation/incr_external_dataset.py new file mode 100644 index 0000000..d503201 --- /dev/null +++ b/simulations/validation/incr_external_dataset.py @@ -0,0 +1,69 @@ +from datetime import timedelta + +from cadCAD.configuration import append_configs +from cadCAD.configuration.utils import ep_time_step, config_sim +# from cadCAD.configuration.utils.policyAggregation import dict_op, dict_elemwise_sum +from cadCAD.configuration.utils.userDefinedObject import udcBroker, udoPipe, UDO +import pandas as pd +from cadCAD.utils import SilentDF + +df = SilentDF(pd.read_csv('/Users/jjodesty/Projects/DiffyQ-SimCAD/simulations/output.csv')) + +state_dict = { + 'increment': 0, + 'external_data': {"ds1": None, "ds2": None, "ds3": None}, + 'policies': {} +} + +def query(s, df): + return df[ + (df['run'] == s['run']) & (df['substep'] == s['substep']) & (df['timestep'] == s['timestep']) + ].drop(columns=['run', 'substep', "timestep"]) + +def p1(_g, substep, sL, s): + result_dict = query(s, df).to_dict() + del result_dict["ds3"] + return {k: list(v.values()).pop() for k, v in result_dict.items()} + +def p2(_g, step, sL, s): + result_dict = query(s, df).to_dict() + del result_dict["ds1"], result_dict["ds2"] + return {k: list(v.values()).pop() for k, v in result_dict.items()} + +# ToDo: SilentDF(df) wont work +#integrate_ext_dataset +def integrate_ext_dataset(_g, step, sL, s, _input): + result_dict = query(s, df).to_dict() + return 'external_data', {k: list(v.values()).pop() for k, v in result_dict.items()} + +def increment(y, incr_by): + return lambda _g, step, sL, s, _input: (y, s[y] + incr_by) +increment = increment('increment', 1) + +def view_policies(_g, step, sL, s, _input): + return 'policies', _input + + +policies = {"p1": p1, "p2": p2} +states = {'increment': increment, 'external_data': integrate_ext_dataset, 'policies': view_policies} +PSUB = {'policies': policies, 'states': states} + +# needs M1&2 need behaviors +partial_state_update_blocks = { + 'PSUB1': PSUB, + 'PSUB2': PSUB, + 'PSUB3': PSUB +} + +sim_config = config_sim({ + "N": 2, + "T": range(4) +}) + +append_configs( + sim_config, + state_dict, + {}, {}, {}, + partial_state_update_blocks, + policy_ops=[lambda a, b: {**a, **b}] +) diff --git a/simulations/validation/udo.py b/simulations/validation/udo.py new file mode 100644 index 0000000..1f06107 --- /dev/null +++ b/simulations/validation/udo.py @@ -0,0 +1,159 @@ +from datetime import timedelta +from cadCAD.utils import SilentDF #, val_switch +from cadCAD.configuration import append_configs +from cadCAD.configuration.utils import ep_time_step, config_sim +from cadCAD.configuration.utils.userDefinedObject import udoPipe, UDO +import pandas as pd + +from fn.func import curried + +DF = SilentDF(pd.read_csv('/Users/jjodesty/Projects/DiffyQ-SimCAD/simulations/output.csv')) + +class udoExample(object): + def __init__(self, x, dataset=None): + self.x = x + self.mem_id = str(hex(id(self))) + self.ds = dataset # for setting ds initially or querying + self.perception = {} + + def anon(self, f): + return f(self) + + def updateX(self): + self.x += 1 + return self + + def perceive(self, s): + self.perception = self.ds[ + (self.ds['run'] == s['run']) & (self.ds['substep'] == s['substep']) & (self.ds['timestep'] == s['timestep']) + ].drop(columns=['run', 'substep']).to_dict() + return self + + def read(self, ds_uri): + self.ds = SilentDF(pd.read_csv(ds_uri)) + return self + + def write(self, ds_uri): + pd.to_csv(ds_uri) + + # ToDo: Generic update function + + pass + +# can be accessed after an update within the same substep and timestep + +state_udo = UDO(udo=udoExample(0, DF), masked_members=['obj', 'perception']) +policy_udoA = UDO(udo=udoExample(0, DF), masked_members=['obj', 'perception']) +policy_udoB = UDO(udo=udoExample(0, DF), masked_members=['obj', 'perception']) + +# ToDo: DataFrame Column order +state_dict = { + 'increment': 0, + 'state_udo': state_udo, 'state_udo_tracker_a': 0, 'state_udo_tracker_b': 0, + 'state_udo_perception_tracker': {"ds1": None, "ds2": None, "ds3": None, "timestep": None}, + 'udo_policies': {'udo_A': policy_udoA, 'udo_B': policy_udoB}, + 'udo_policy_tracker_a': (None, None), 'udo_policy_tracker_b': (None, None), + 'timestamp': '2019-01-01 00:00:00', +} + +@curried +def perceive(s, self): + self.perception = self.ds[ + (self.ds['run'] == s['run']) & (self.ds['substep'] == s['substep']) & (self.ds['timestep'] == s['timestep']) + ].drop(columns=['run', 'substep']).to_dict() + return self + +def view_udo_policy(_g, step, sL, s, _input): + y = 'udo_policies' + x = _input + return (y, x) + +def update_timestamp(y, timedelta, format): + return lambda _g, step, sL, s, _input: ( + y, + ep_time_step(s, dt_str=s[y], fromat_str=format, _timedelta=timedelta) + ) +time_model = update_timestamp('timestamp', timedelta(minutes=1), '%Y-%m-%d %H:%M:%S') + + +def state_udo_update(_g, step, sL, s, _input): + y = 'state_udo' + # s['hydra_state'].updateX().anon(perceive(s)) + s['state_udo'].updateX().perceive(s) + x = udoPipe(s['state_udo']) + return (y, x) + +def increment(y, incr_by): + return lambda _g, step, sL, s, _input: (y, s[y] + incr_by) + +def track(destination, source): + return lambda _g, step, sL, s, _input: (destination, s[source].x) + +def track_udo_policy(destination, source): + def val_switch(v): + if isinstance(v, pd.DataFrame) is True or isinstance(v, SilentDF) is True: + return SilentDF(v) + else: + return v.x + return lambda _g, step, sL, s, _input: (destination, tuple(val_switch(v) for _, v in s[source].items())) + +def track_state_udo_perception(destination, source): + def id(past_perception): + if len(past_perception) == 0: + return state_dict['state_udo_perception_tracker'] + else: + return past_perception + return lambda _g, step, sL, s, _input: (destination, id(s[source].perception)) + +def udo_policyA(_g, step, sL, s): + s['udo_policies']['udo_A'].updateX() + return {'udo_A': udoPipe(s['udo_policies']['udo_A'])} + +def udo_policyB(_g, step, sL, s): + s['udo_policies']['udo_B'].updateX() + return {'udo_B': udoPipe(s['udo_policies']['udo_B'])} + +policies = {"p1": udo_policyA, "p2": udo_policyB} + +states_with_ts = { + 'increment': increment('increment', 1), + 'state_udo_tracker_a': track('state_udo_tracker_a', 'state_udo'), + 'state_udo': state_udo_update, + 'state_udo_perception_tracker': track_state_udo_perception('state_udo_perception_tracker', 'state_udo'), + 'state_udo_tracker_b': track('state_udo_tracker_b', 'state_udo'), + 'udo_policy_tracker_a': track_udo_policy('udo_policy_tracker_a', 'udo_policies'), + 'udo_policies': view_udo_policy, + 'udo_policy_tracker_b': track_udo_policy('udo_policy_tracker_b', 'udo_policies'), + 'timestamp': time_model, +} +del states_with_ts['timestamp'] +states_without_ts = states_with_ts + +# needs M1&2 need behaviors +partial_state_update_blocks = { + 'PSUB1': { + 'policies': policies, + 'states': states_with_ts + }, + 'PSUB2': { + 'policies': policies, + 'states': states_without_ts + }, + 'PSUB3': { + 'policies': policies, + 'states': states_without_ts + } +} + +sim_config = config_sim({ + "N": 2, + "T": range(4) +}) + +append_configs( + sim_config, + state_dict, + {}, {}, {}, + partial_state_update_blocks, + policy_ops=[lambda a, b: {**a, **b}] +)