feature example refactoring pt. 1

This commit is contained in:
Joshua E. Jodesty 2019-04-25 11:02:27 -04:00
parent 9dbb866bd0
commit 30e1c336e6
12 changed files with 365 additions and 230 deletions

View File

@ -1,6 +1,5 @@
from typing import Dict, Callable, List, Tuple from typing import Dict, Callable, List, Tuple
from functools import reduce from functools import reduce
from fn.op import foldr
import pandas as pd import pandas as pd
from pandas.core.frame import DataFrame from pandas.core.frame import DataFrame
@ -13,9 +12,10 @@ from cadCAD.configuration.utils.depreciationHandler import sanitize_partial_stat
# policy_ops=[foldr(dict_elemwise_sum())] # policy_ops=[foldr(dict_elemwise_sum())]
# policy_ops=[reduce, lambda a, b: {**a, **b}] # policy_ops=[reduce, lambda a, b: {**a, **b}]
class Configuration(object): class Configuration(object):
def __init__(self, sim_config={}, initial_state={}, seeds={}, env_processes={}, def __init__(self, sim_config={}, initial_state={}, seeds={}, env_processes={},
exogenous_states={}, partial_state_update_blocks={}, policy_ops=[foldr(dict_elemwise_sum())], exogenous_states={}, partial_state_update_blocks={}, policy_ops=[lambda a, b: a + b],
**kwargs) -> None: **kwargs) -> None:
self.sim_config = sim_config self.sim_config = sim_config
self.initial_state = initial_state self.initial_state = initial_state
@ -30,7 +30,7 @@ class Configuration(object):
def append_configs(sim_configs={}, initial_state={}, seeds={}, raw_exogenous_states={}, env_processes={}, def append_configs(sim_configs={}, initial_state={}, seeds={}, raw_exogenous_states={}, env_processes={},
partial_state_update_blocks={}, policy_ops=[foldr(dict_elemwise_sum())], _exo_update_per_ts: bool = True) -> None: partial_state_update_blocks={}, policy_ops=[lambda a, b: a + b], _exo_update_per_ts: bool = True) -> None:
if _exo_update_per_ts is True: if _exo_update_per_ts is True:
exogenous_states = exo_update_per_ts(raw_exogenous_states) exogenous_states = exo_update_per_ts(raw_exogenous_states)
else: else:

View File

@ -13,8 +13,9 @@ def val_switch(v):
return v return v
class udcView(object): class udcView(object):
def __init__(self, d): def __init__(self, d, masked_members):
self.__dict__ = d self.__dict__ = d
self.masked_members = masked_members
# returns dict to dataframe # returns dict to dataframe
# def __repr__(self): # def __repr__(self):
@ -22,7 +23,7 @@ class udcView(object):
members = {} members = {}
variables = { variables = {
k: val_switch(v) for k, v in self.__dict__.items() k: val_switch(v) for k, v in self.__dict__.items()
if str(type(v)) != "<class 'method'>" and k != 'obj' # and isinstance(v, DataFrame) is not True if str(type(v)) != "<class 'method'>" and k not in self.masked_members # and isinstance(v, DataFrame) is not True
} }
members['methods'] = [k for k, v in self.__dict__.items() if str(type(v)) == "<class 'method'>"] members['methods'] = [k for k, v in self.__dict__.items() if str(type(v)) == "<class 'method'>"]
members.update(variables) members.update(variables)
@ -43,19 +44,17 @@ class udcBroker(object):
def get_members(self): def get_members(self):
return self.members_dict return self.members_dict
def get_view(self): def get_view(self, masked_members):
return udcView(self.members_dict) return udcView(self.members_dict, masked_members)
def get_namedtuple(self): def get_namedtuple(self):
return namedtuple("Hydra", self.members_dict.keys())(*self.members_dict.values()) return namedtuple("Hydra", self.members_dict.keys())(*self.members_dict.values())
def UDO(udo, masked_members=['obj']):
def UDO(udc): return udcBroker(udo).get_view(masked_members)
return udcBroker(udc).get_view()
def udoPipe(obj_view): def udoPipe(obj_view):
return UDO(obj_view.obj) return UDO(obj_view.obj, obj_view.masked_members)

View File

@ -94,6 +94,7 @@ class Executor:
var_dict: Dict[str, List[Any]], var_dict: Dict[str, List[Any]],
sub_step: int, sub_step: int,
sL: Any, sL: Any,
sH: Any,
state_funcs: List[Callable], state_funcs: List[Callable],
policy_funcs: List[Callable], policy_funcs: List[Callable],
env_processes: Dict[str, Callable], env_processes: Dict[str, Callable],
@ -101,16 +102,19 @@ class Executor:
run: int run: int
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
last_in_obj: Dict[str, Any] = deepcopy(sL[-1]) # last_in_obj: Dict[str, Any] = deepcopy(sL[-1])
# last_in_obj: Dict[str, Any] = sL[-1] last_in_obj: Dict[str, Any] = sL[-1]
# last_in_obj: Dict[str, Any] = sH[-1]
# print(last_in_obj)
# print(sH[-1])
_input: Dict[str, Any] = self.policy_update_exception(self.get_policy_input(var_dict, sub_step, sL, last_in_obj, policy_funcs)) _input: Dict[str, Any] = self.policy_update_exception(self.get_policy_input(var_dict, sub_step, sH, last_in_obj, policy_funcs))
# ToDo: add env_proc generator to `last_in_copy` iterator as wrapper function # ToDo: add env_proc generator to `last_in_copy` iterator as wrapper function
# ToDo: Can be multithreaded ?? # ToDo: Can be multithreaded ??
def generate_record(state_funcs): def generate_record(state_funcs):
for f in state_funcs: for f in state_funcs:
yield self.state_update_exception(f(var_dict, sub_step, sL, last_in_obj, _input)) yield self.state_update_exception(f(var_dict, sub_step, sH, last_in_obj, _input))
def transfer_missing_fields(source, destination): def transfer_missing_fields(source, destination):
for k in source: for k in source:
@ -127,13 +131,16 @@ class Executor:
sL.append(last_in_copy) sL.append(last_in_copy)
del last_in_copy del last_in_copy
# print(sL)
# print()
return sL return sL
# mech_pipeline - state_update_block # mech_pipeline - state_update_block
def state_update_pipeline( def state_update_pipeline(
self, self,
var_dict: Dict[str, List[Any]], var_dict: Dict[str, List[Any]],
states_list: List[Dict[str, Any]], simulation_list, #states_list: List[Dict[str, Any]],
configs: List[Tuple[List[Callable], List[Callable]]], configs: List[Tuple[List[Callable], List[Callable]]],
env_processes: Dict[str, Callable], env_processes: Dict[str, Callable],
time_step: int, time_step: int,
@ -141,19 +148,33 @@ class Executor:
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
sub_step = 0 sub_step = 0
states_list_copy: List[Dict[str, Any]] = deepcopy(states_list) # states_list_copy: List[Dict[str, Any]] = deepcopy(states_list)
# states_list_copy: List[Dict[str, Any]] = states_list
# ToDo: flatten first
states_list_copy: List[Dict[str, Any]] = simulation_list[-1]
# print(states_list_copy)
# ToDo: Causes Substep repeats in sL:
genesis_states: Dict[str, Any] = states_list_copy[-1] genesis_states: Dict[str, Any] = states_list_copy[-1]
if len(states_list_copy) == 1:
genesis_states['substep'] = sub_step
# genesis_states['timestep'] = 0
# else:
# genesis_states['timestep'] = time_step
del states_list_copy del states_list_copy
genesis_states['substep'], genesis_states['timestep'] = sub_step, time_step
states_list: List[Dict[str, Any]] = [genesis_states] states_list: List[Dict[str, Any]] = [genesis_states]
# ToDo: Causes Substep repeats in sL, use for yield
sub_step += 1 sub_step += 1
for [s_conf, p_conf] in configs: for [s_conf, p_conf] in configs:
states_list: List[Dict[str, Any]] = self.partial_state_update( states_list: List[Dict[str, Any]] = self.partial_state_update(
var_dict, sub_step, states_list, s_conf, p_conf, env_processes, time_step, run var_dict, sub_step, states_list, simulation_list, s_conf, p_conf, env_processes, time_step, run
) )
# print(sub_step)
# print(simulation_list)
# print(flatten(simulation_list))
sub_step += 1 sub_step += 1
time_step += 1 time_step += 1
@ -173,12 +194,20 @@ class Executor:
time_seq: List[int] = [x + 1 for x in time_seq] time_seq: List[int] = [x + 1 for x in time_seq]
simulation_list: List[List[Dict[str, Any]]] = [states_list] simulation_list: List[List[Dict[str, Any]]] = [states_list]
# print(simulation_list[-1])
# print()
# pipe_run = simulation_list[-1]
# print(simulation_list)
for time_step in time_seq: for time_step in time_seq:
pipe_run: List[Dict[str, Any]] = self.state_update_pipeline( pipe_run: List[Dict[str, Any]] = self.state_update_pipeline(
var_dict, simulation_list[-1], configs, env_processes, time_step, run var_dict, simulation_list, configs, env_processes, time_step, run
) )
_, *pipe_run = pipe_run _, *pipe_run = pipe_run
simulation_list.append(pipe_run) simulation_list.append(pipe_run)
# print(simulation_list)
# print()
return simulation_list return simulation_list
@ -197,7 +226,7 @@ class Executor:
def generate_init_sys_metrics(genesis_states_list): def generate_init_sys_metrics(genesis_states_list):
for d in genesis_states_list: for d in genesis_states_list:
d['run'], d['substep'], d['timestep'] = run, int(0), int(0) d['run'], d['substep'], d['timestep'] = run, 0, 0
yield d yield d
states_list_copy: List[Dict[str, Any]] = list(generate_init_sys_metrics(deepcopy(states_list))) states_list_copy: List[Dict[str, Any]] = list(generate_init_sys_metrics(deepcopy(states_list)))

View File

@ -9,14 +9,18 @@ from pandas import DataFrame
class SilentDF(DataFrame): class SilentDF(DataFrame):
def __repr__(self): def __repr__(self):
return f"{hex(id(DataFrame))})" #"pandas.core.frame.DataFrame" return str(hex(id(DataFrame))) #"pandas.core.frame.DataFrame"
def append_dict(dict, new_dict):
dict.update(new_dict)
return dict
def val_switch(v): # def val_switch(v):
if isinstance(v, DataFrame) is True or isinstance(v, SilentDF) is True: # if isinstance(v, DataFrame) is True or isinstance(v, SilentDF) is True:
return SilentDF(v) # return SilentDF(v)
else: # else:
return v.x # return v.x
class IndexCounter: class IndexCounter:

View File

@ -0,0 +1,26 @@
from cadCAD.configuration.utils import ep_time_step
def increment(y, incr_by):
return lambda _g, step, sL, s, _input: (y, s[y] + incr_by)
def track(y):
return lambda _g, step, sL, s, _input: (y, s[y].x)
def simple_state_update(y, x):
return lambda _g, step, sH, s, _input: (y, x)
def simple_policy_update(y):
return lambda _g, step, sH, s: y
def update_timestamp(y, timedelta, format):
return lambda _g, step, sL, s, _input: (
y,
ep_time_step(s, dt_str=s[y], fromat_str=format, _timedelta=timedelta)
)
# def repr(_g, step, sL, s, _input):
# y = 'z'
# x = s['state_udo'].__repr__()
# return (y, x)

View File

@ -4,7 +4,8 @@ from tabulate import tabulate
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
# from simulations.validation import config1_test_pipe # from simulations.validation import config1_test_pipe
# from simulations.validation import config1 # from simulations.validation import config1
from simulations.validation import externalds # from simulations.validation import externalds
from simulations.validation import incr_external_dataset
from cadCAD import configs from cadCAD import configs
exec_mode = ExecutionMode() exec_mode = ExecutionMode()

View File

@ -2,7 +2,7 @@ import pandas as pd
from tabulate import tabulate from tabulate import tabulate
# The following imports NEED to be in the exact order # The following imports NEED to be in the exact order
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from simulations.validation import config_udc_json3 from simulations.validation import udo
from cadCAD import configs from cadCAD import configs
@ -10,13 +10,21 @@ exec_mode = ExecutionMode()
print("Simulation Execution: Single Configuration") print("Simulation Execution: Single Configuration")
print() print()
first_config = configs # only contains config1 first_config = configs # only contains config1
single_proc_ctx = ExecutionContext(context=exec_mode.single_proc) single_proc_ctx = ExecutionContext(context=exec_mode.single_proc)
run = Executor(exec_context=single_proc_ctx, configs=first_config) run = Executor(exec_context=single_proc_ctx, configs=first_config)
# cols = configs[0].initial_state.keys()
cols = [
'increment',
'state_udo_tracker_a', 'state_udo', 'state_udo_perception_tracker', 'state_udo_tracker_b',
'udo_policy_tracker_a', 'udo_policies', 'udo_policy_tracker_b',
'timestamp'
]
raw_result, tensor_field = run.main() raw_result, tensor_field = run.main()
result = pd.DataFrame(raw_result) result = pd.DataFrame(raw_result)[['run', 'substep', 'timestep'] + cols]
result = pd.concat([result.drop(['c'], axis=1), result['c'].apply(pd.Series)], axis=1) # result = pd.concat([result.drop(['c'], axis=1), result['c'].apply(pd.Series)], axis=1)
# print(list(result['c'])) # print(list(result['c']))

View File

@ -40,23 +40,31 @@ def test_pipeline(_g, step, sL, s):
# Internal States per Mechanism # Internal States per Mechanism
def policies(_g, step, sL, s, _input): def s(y):
return lambda _g, step, sH, s, _input: (y, s[y] + 1)
def sH(_g, step, sH, s, _input):
y = 'sh'
x = sH
return (y, x)
def policies(_g, step, sH, s, _input):
y = 'policies' y = 'policies'
x = _input x = _input
return (y, x) return (y, x)
# Genesis States # Genesis States
genesis_states = { genesis_states = {
'policies': {} 'policies': {},
's1': 0,
's2': 0,
# 'sh': []
} }
raw_exogenous_states = {} raw_exogenous_states = {}
env_processes = {} env_processes = {}
partial_state_update_block = { partial_state_update_block = {
"m1": { "m1": {
"policies": { "policies": {
@ -64,6 +72,9 @@ partial_state_update_block = {
"b2": p2m1 "b2": p2m1
}, },
"variables": { "variables": {
's1': s('s1'),
's2': s('s2'),
# 'sh': sH,
"policies": policies "policies": policies
} }
}, },
@ -73,6 +84,9 @@ partial_state_update_block = {
"b2": p2m2 "b2": p2m2
}, },
"variables": { "variables": {
's1': s('s1'),
's2': s('s2'),
# 'sh': sH,
"policies": policies "policies": policies
} }
}, },
@ -82,6 +96,9 @@ partial_state_update_block = {
"b2": p2m3 "b2": p2m3
}, },
"variables": { "variables": {
's1': s('s1'),
's2': s('s2'),
# 'sh': sH,
"policies": policies "policies": policies
} }
} }
@ -90,11 +107,12 @@ partial_state_update_block = {
sim_config = config_sim( sim_config = config_sim(
{ {
"N": 2, "N": 1,
"T": range(5), "T": range(3),
} }
) )
append_configs( append_configs(
sim_configs=sim_config, sim_configs=sim_config,
initial_state=genesis_states, initial_state=genesis_states,
@ -102,5 +120,13 @@ append_configs(
raw_exogenous_states=raw_exogenous_states, raw_exogenous_states=raw_exogenous_states,
env_processes=env_processes, env_processes=env_processes,
partial_state_update_blocks=partial_state_update_block, partial_state_update_blocks=partial_state_update_block,
policy_ops=[lambda a, b: a + b] # , lambda y: y + 100, lambda y: y + 300 # policy_ops=[lambda a, b: a + b, lambda y: y + 100, lambda y: y + 300]
) )
# def p1m3(_g, step, sL, s):
# return {'param1': 1, 'param2': 2, 'param3': 3}
# def p2m3(_g, step, sL, s):
# return {'param1': 1, 'param2': 2, 'param3': 3}
#
# xx = {'param1': [1,1], 'param2': [2,2], 'param3': [3,3]}

View File

@ -1,186 +0,0 @@
from datetime import timedelta
from cadCAD.configuration import append_configs
from cadCAD.configuration.utils import ep_time_step, config_sim
# from cadCAD.configuration.utils.policyAggregation import dict_op, dict_elemwise_sum
from cadCAD.configuration.utils.udo import udcBroker, udoPipe, UDO
import pandas as pd
from cadCAD.utils import SilentDF, val_switch
ds = SilentDF(pd.read_csv('/Users/jjodesty/Projects/DiffyQ-SimCAD/simulations/output.csv'))
class MyClass(object):
def __init__(self, x, ds=None):
self.x = x
self.ds = ds # for setting ds initially or querying
def update(self):
self.x += 1
return self
def read(self, ds_uri):
self.ds = SilentDF(pd.read_csv(ds_uri))
return self
def write(self, ds_uri):
pd.to_csv(ds_uri)
def getMemID(self):
return str(hex(id(self)))
pass
# can be accessed after an update within the same substep and timestep
hydra_state_view = UDO(MyClass(0, ds))
udc_view_A = UDO(MyClass(0, ds))
udc_view_B = UDO(MyClass(0, ds))
print(udc_view_A)
# g: Dict[str, List[int]] = {'MyClassB'}
state_dict = {
'a': 0, 'b': 0, 'j': 0,
'k': (0, 0), 'q': (0, 0),
'hydra_state': hydra_state_view,
'policies': {'hydra_A': udc_view_A, 'hydra_B': udc_view_B},
'timestamp': '2019-01-01 00:00:00',
'c': {"ds1": None, "ds2": None, "ds3": None, "timestep": None}
}
def p1(_g, step, sL, s):
s['policies']['hydra_A'].update()
return {'hydra_A': udoPipe(s['policies']['hydra_A'])}
def p2(_g, step, sL, s):
s['policies']['hydra_B'].update()
# df = s['policies']['hydra_B'].ds
return {'hydra_B': udoPipe(s['policies']['hydra_B'])}
# ToDo: SilentDF(df) wont work
def C(_g, step, sL, s, _input):
y = 'c'
ds = _input['hydra_B'].ds
df = ds[(ds['run'] == s['run']) & (ds['substep'] == s['substep']) & (ds['timestep'] == s['timestep'])].drop(columns=['run', 'substep'])
def pop_if_not_empty(l):
if len(l) == 0:
return None
else:
return l.pop()
x = {k: pop_if_not_empty(list(v.values())) for k, v in df.to_dict().items()} # reomve idx
return (y, x)
def policies(_g, step, sL, s, _input):
y = 'policies'
x = _input
return (y, x)
timestep_duration = timedelta(minutes=1) # In this example, a timestep has a duration of 1 minute.
ts_format = '%Y-%m-%d %H:%M:%S'
def time_model(_g, step, sL, s, _input):
y = 'timestamp'
x = ep_time_step(s, dt_str=s['timestamp'], fromat_str=ts_format, _timedelta=timestep_duration)
return (y, x)
def HydraMembers(_g, step, sL, s, _input):
y = 'hydra_state'
s['hydra_state'].update()
x = udoPipe(s['hydra_state'])
return (y, x)
def repr(_g, step, sL, s, _input):
y = 'z'
x = s['hydra_members'].__repr__()
return (y, x)
def incriment(y, incr_val):
return lambda _g, step, sL, s, _input: (y, s[y] + incr_val)
def A(_g, step, sL, s, _input):
y = 'a'
x = s['a'] + 1
return (y, x)
def hydra_state_tracker(y):
return lambda _g, step, sL, s, _input: (y, s['hydra_state'].x)
def hydra_policy_tracker(y):
return lambda _g, step, sL, s, _input: (y, tuple(val_switch(v) for k, v in s['policies'].items()))
# needs M1&2 need behaviors
partial_state_update_blocks = {
'PSUB1': {
'policies': {
"b1": p1,
"b2": p2
},
'states': {
'a': A,
'b': hydra_state_tracker('b'),
'c': C,
'j': hydra_state_tracker('j'),
'k': hydra_policy_tracker('k'),
'q': hydra_policy_tracker('q'),
'hydra_state': HydraMembers,
'timestamp': time_model,
'policies': policies
}
},
'PSUB2': {
'policies': {
"b1": p1,
"b2": p2
},
'states': {
'a': A,
'b': hydra_state_tracker('b'),
'c': C,
'j': hydra_state_tracker('j'),
'k': hydra_policy_tracker('k'),
'q': hydra_policy_tracker('q'),
'hydra_state': HydraMembers,
'policies': policies
}
},
'PSUB3': {
'policies': {
"b1": p1,
"b2": p2
},
'states': {
'a': A,
'b': hydra_state_tracker('b'),
'c': C,
'j': hydra_state_tracker('j'),
'k': hydra_policy_tracker('k'),
'q': hydra_policy_tracker('q'),
'hydra_state': HydraMembers,
'policies': policies
}
}
}
sim_config = config_sim({
"N": 2,
"T": range(4)
})
z = {'z': 1}
def addZ(d, z):
d.update(z)
return d
append_configs(
sim_config,
state_dict,
{}, {}, {},
partial_state_update_blocks,
policy_ops=[lambda a, b: {**a, **b}]
)

View File

@ -3,7 +3,7 @@ from datetime import timedelta
from cadCAD.configuration import append_configs from cadCAD.configuration import append_configs
from cadCAD.configuration.utils import ep_time_step, config_sim from cadCAD.configuration.utils import ep_time_step, config_sim
from cadCAD.configuration.utils.policyAggregation import dict_op, dict_elemwise_sum from cadCAD.configuration.utils.policyAggregation import dict_op, dict_elemwise_sum
from cadCAD.configuration.utils.udo import udcBroker, udoPipe, UDO from cadCAD.configuration.utils.userDefinedObject import udcBroker, udoPipe, UDO
# ToDo: Create member for past value # ToDo: Create member for past value

View File

@ -0,0 +1,69 @@
from datetime import timedelta
from cadCAD.configuration import append_configs
from cadCAD.configuration.utils import ep_time_step, config_sim
# from cadCAD.configuration.utils.policyAggregation import dict_op, dict_elemwise_sum
from cadCAD.configuration.utils.userDefinedObject import udcBroker, udoPipe, UDO
import pandas as pd
from cadCAD.utils import SilentDF
df = SilentDF(pd.read_csv('/Users/jjodesty/Projects/DiffyQ-SimCAD/simulations/output.csv'))
state_dict = {
'increment': 0,
'external_data': {"ds1": None, "ds2": None, "ds3": None},
'policies': {}
}
def query(s, df):
return df[
(df['run'] == s['run']) & (df['substep'] == s['substep']) & (df['timestep'] == s['timestep'])
].drop(columns=['run', 'substep', "timestep"])
def p1(_g, substep, sL, s):
result_dict = query(s, df).to_dict()
del result_dict["ds3"]
return {k: list(v.values()).pop() for k, v in result_dict.items()}
def p2(_g, step, sL, s):
result_dict = query(s, df).to_dict()
del result_dict["ds1"], result_dict["ds2"]
return {k: list(v.values()).pop() for k, v in result_dict.items()}
# ToDo: SilentDF(df) wont work
#integrate_ext_dataset
def integrate_ext_dataset(_g, step, sL, s, _input):
result_dict = query(s, df).to_dict()
return 'external_data', {k: list(v.values()).pop() for k, v in result_dict.items()}
def increment(y, incr_by):
return lambda _g, step, sL, s, _input: (y, s[y] + incr_by)
increment = increment('increment', 1)
def view_policies(_g, step, sL, s, _input):
return 'policies', _input
policies = {"p1": p1, "p2": p2}
states = {'increment': increment, 'external_data': integrate_ext_dataset, 'policies': view_policies}
PSUB = {'policies': policies, 'states': states}
# needs M1&2 need behaviors
partial_state_update_blocks = {
'PSUB1': PSUB,
'PSUB2': PSUB,
'PSUB3': PSUB
}
sim_config = config_sim({
"N": 2,
"T": range(4)
})
append_configs(
sim_config,
state_dict,
{}, {}, {},
partial_state_update_blocks,
policy_ops=[lambda a, b: {**a, **b}]
)

View File

@ -0,0 +1,159 @@
from datetime import timedelta
from cadCAD.utils import SilentDF #, val_switch
from cadCAD.configuration import append_configs
from cadCAD.configuration.utils import ep_time_step, config_sim
from cadCAD.configuration.utils.userDefinedObject import udoPipe, UDO
import pandas as pd
from fn.func import curried
DF = SilentDF(pd.read_csv('/Users/jjodesty/Projects/DiffyQ-SimCAD/simulations/output.csv'))
class udoExample(object):
def __init__(self, x, dataset=None):
self.x = x
self.mem_id = str(hex(id(self)))
self.ds = dataset # for setting ds initially or querying
self.perception = {}
def anon(self, f):
return f(self)
def updateX(self):
self.x += 1
return self
def perceive(self, s):
self.perception = self.ds[
(self.ds['run'] == s['run']) & (self.ds['substep'] == s['substep']) & (self.ds['timestep'] == s['timestep'])
].drop(columns=['run', 'substep']).to_dict()
return self
def read(self, ds_uri):
self.ds = SilentDF(pd.read_csv(ds_uri))
return self
def write(self, ds_uri):
pd.to_csv(ds_uri)
# ToDo: Generic update function
pass
# can be accessed after an update within the same substep and timestep
state_udo = UDO(udo=udoExample(0, DF), masked_members=['obj', 'perception'])
policy_udoA = UDO(udo=udoExample(0, DF), masked_members=['obj', 'perception'])
policy_udoB = UDO(udo=udoExample(0, DF), masked_members=['obj', 'perception'])
# ToDo: DataFrame Column order
state_dict = {
'increment': 0,
'state_udo': state_udo, 'state_udo_tracker_a': 0, 'state_udo_tracker_b': 0,
'state_udo_perception_tracker': {"ds1": None, "ds2": None, "ds3": None, "timestep": None},
'udo_policies': {'udo_A': policy_udoA, 'udo_B': policy_udoB},
'udo_policy_tracker_a': (None, None), 'udo_policy_tracker_b': (None, None),
'timestamp': '2019-01-01 00:00:00',
}
@curried
def perceive(s, self):
self.perception = self.ds[
(self.ds['run'] == s['run']) & (self.ds['substep'] == s['substep']) & (self.ds['timestep'] == s['timestep'])
].drop(columns=['run', 'substep']).to_dict()
return self
def view_udo_policy(_g, step, sL, s, _input):
y = 'udo_policies'
x = _input
return (y, x)
def update_timestamp(y, timedelta, format):
return lambda _g, step, sL, s, _input: (
y,
ep_time_step(s, dt_str=s[y], fromat_str=format, _timedelta=timedelta)
)
time_model = update_timestamp('timestamp', timedelta(minutes=1), '%Y-%m-%d %H:%M:%S')
def state_udo_update(_g, step, sL, s, _input):
y = 'state_udo'
# s['hydra_state'].updateX().anon(perceive(s))
s['state_udo'].updateX().perceive(s)
x = udoPipe(s['state_udo'])
return (y, x)
def increment(y, incr_by):
return lambda _g, step, sL, s, _input: (y, s[y] + incr_by)
def track(destination, source):
return lambda _g, step, sL, s, _input: (destination, s[source].x)
def track_udo_policy(destination, source):
def val_switch(v):
if isinstance(v, pd.DataFrame) is True or isinstance(v, SilentDF) is True:
return SilentDF(v)
else:
return v.x
return lambda _g, step, sL, s, _input: (destination, tuple(val_switch(v) for _, v in s[source].items()))
def track_state_udo_perception(destination, source):
def id(past_perception):
if len(past_perception) == 0:
return state_dict['state_udo_perception_tracker']
else:
return past_perception
return lambda _g, step, sL, s, _input: (destination, id(s[source].perception))
def udo_policyA(_g, step, sL, s):
s['udo_policies']['udo_A'].updateX()
return {'udo_A': udoPipe(s['udo_policies']['udo_A'])}
def udo_policyB(_g, step, sL, s):
s['udo_policies']['udo_B'].updateX()
return {'udo_B': udoPipe(s['udo_policies']['udo_B'])}
policies = {"p1": udo_policyA, "p2": udo_policyB}
states_with_ts = {
'increment': increment('increment', 1),
'state_udo_tracker_a': track('state_udo_tracker_a', 'state_udo'),
'state_udo': state_udo_update,
'state_udo_perception_tracker': track_state_udo_perception('state_udo_perception_tracker', 'state_udo'),
'state_udo_tracker_b': track('state_udo_tracker_b', 'state_udo'),
'udo_policy_tracker_a': track_udo_policy('udo_policy_tracker_a', 'udo_policies'),
'udo_policies': view_udo_policy,
'udo_policy_tracker_b': track_udo_policy('udo_policy_tracker_b', 'udo_policies'),
'timestamp': time_model,
}
del states_with_ts['timestamp']
states_without_ts = states_with_ts
# needs M1&2 need behaviors
partial_state_update_blocks = {
'PSUB1': {
'policies': policies,
'states': states_with_ts
},
'PSUB2': {
'policies': policies,
'states': states_without_ts
},
'PSUB3': {
'policies': policies,
'states': states_without_ts
}
}
sim_config = config_sim({
"N": 2,
"T": range(4)
})
append_configs(
sim_config,
state_dict,
{}, {}, {},
partial_state_update_blocks,
policy_ops=[lambda a, b: {**a, **b}]
)