udc w/ policies

This commit is contained in:
Joshua E. Jodesty 2019-04-05 13:38:55 -04:00
parent 875f370c5e
commit c4863a838d
7 changed files with 148 additions and 128 deletions

View File

@ -28,7 +28,7 @@ class Configuration(object):
def append_configs(sim_configs={}, initial_state={}, seeds={}, raw_exogenous_states={}, env_processes={},
partial_state_update_blocks={}, _exo_update_per_ts: bool = True) -> None:
partial_state_update_blocks={}, policy_ops=[foldr(dict_elemwise_sum())], _exo_update_per_ts: bool = True) -> None:
if _exo_update_per_ts is True:
exogenous_states = exo_update_per_ts(raw_exogenous_states)
else:
@ -42,7 +42,8 @@ def append_configs(sim_configs={}, initial_state={}, seeds={}, raw_exogenous_sta
seeds=seeds,
exogenous_states=exogenous_states,
env_processes=env_processes,
partial_state_update_blocks=partial_state_update_blocks
partial_state_update_blocks=partial_state_update_blocks,
policy_ops=policy_ops
)
configs.append(config)
elif isinstance(sim_configs, dict):
@ -52,7 +53,8 @@ def append_configs(sim_configs={}, initial_state={}, seeds={}, raw_exogenous_sta
seeds=seeds,
exogenous_states=exogenous_states,
env_processes=env_processes,
partial_state_update_blocks=partial_state_update_blocks
partial_state_update_blocks=partial_state_update_blocks,
policy_ops=policy_ops
)
configs.append(config)

View File

@ -1,7 +1,6 @@
from fn.op import foldr
from fn.func import curried
def get_base_value(x):
if isinstance(x, str):
return ''

View File

@ -0,0 +1,47 @@
from collections import namedtuple
from inspect import getmembers, ismethod
class udcView(object):
def __init__(self, d):
self.__dict__ = d
# returns dict to dataframe
# def __repr__(self):
def __repr__(self):
members = {}
functionless = {k: v for k, v in self.__dict__.items() if str(type(v)) != "<class 'method'>" and k != 'obj'}
members['functions'] = [k for k, v in self.__dict__.items() if str(type(v)) == "<class 'method'>"]
members.update(functionless)
return f"{members}"
class udcBroker(object):
def __init__(self, obj, function_filter=['__init__']):
d = {}
funcs = dict(getmembers(obj, ismethod))
filtered_functions = {k: v for k, v in funcs.items() if k not in function_filter}
d['obj'] = obj
d.update(vars(obj)) # somehow is enough
d.update(filtered_functions)
self.members_dict = d
def get_members(self):
return self.members_dict
def get_view(self):
return udcView(self.members_dict)
def get_namedtuple(self):
return namedtuple("Hydra", self.members_dict.keys())(*self.members_dict.values())
def generate_udc_view(udc):
return udcBroker(udc).get_view()
def next_udc_view(obj_view):
return generate_udc_view(obj_view.obj)

View File

@ -1,7 +1,7 @@
from typing import Any, Callable, Dict, List, Tuple
from pathos.pools import ThreadPool as TPool
from copy import deepcopy
from fn.op import foldr, call
from functools import reduce
from cadCAD.engine.utils import engine_exception
from cadCAD.utils import flatten
@ -38,7 +38,10 @@ class Executor:
def get_col_results(var_dict, sub_step, sL, s, funcs):
return list(map(lambda f: f(var_dict, sub_step, sL, s), funcs))
return foldr(call, get_col_results(var_dict, sub_step, sL, s, funcs))(ops)
# return foldr(call, get_col_results(var_dict, sub_step, sL, s, funcs))(ops)
col_results = get_col_results(var_dict, sub_step, sL, s, funcs)
return reduce(lambda a, b: {**a, **b}, col_results)
def apply_env_proc(
self,
@ -68,6 +71,7 @@ class Executor:
) -> List[Dict[str, Any]]:
last_in_obj: Dict[str, Any] = deepcopy(sL[-1])
# last_in_obj: Dict[str, Any] = sL[-1]
_input: Dict[str, Any] = self.policy_update_exception(self.get_policy_input(var_dict, sub_step, sL, last_in_obj, policy_funcs))
@ -115,8 +119,7 @@ class Executor:
states_list: List[Dict[str, Any]] = [genesis_states]
sub_step += 1
for config in configs:
s_conf, p_conf = config[0], config[1]
for [s_conf, p_conf] in configs:
states_list: List[Dict[str, Any]] = self.partial_state_update(
var_dict, sub_step, states_list, s_conf, p_conf, env_processes, time_step, run
)

View File

@ -1,43 +1,16 @@
from typing import Dict, List
from collections import defaultdict
from collections import defaultdict, Counter
from itertools import product
import warnings
from collections import namedtuple
class objectview(object):
def __init__(self, d):
self.__dict__ = d
class IndexCounter:
def __init__(self):
self.i = 0
def __str__(self):
filtered_members = {k: v for k, v in self.__dict__.items() if k != 'obj'}
return f"{filtered_members}"
class UDC(object):
def __init__(self, obj):
d = vars(obj) # somehow is enough
d['obj'] = obj
self.members_dict = d
def get_members(self):
return self.members_dict
def get_object(self):
return objectview(self.members_dict)
def get_namedtuple(self):
return namedtuple("Hydra", self.members_dict.keys())(*self.members_dict.values())
# class UDC_Wrapper2(object):
# def __init__(self, obj, functions):
#
# self.obj = obj
#
# def get_object(self):
# return objectview(self.obj.__dict__)
def __call__(self):
self.i += 1
return self.i
def pipe(x):

View File

@ -5,6 +5,7 @@ from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from simulations.validation import config_udc_json3
from cadCAD import configs
import pprint as pp
exec_mode = ExecutionMode()
@ -21,4 +22,5 @@ print("Tensor Field: config1")
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))
print()
print()
print(result.info(verbose=True))

View File

@ -1,64 +1,58 @@
from datetime import timedelta
from cadCAD.utils import UDC
from cadCAD.configuration import append_configs
from cadCAD.configuration.utils import ep_time_step, config_sim
from typing import Dict, List
from cadCAD.configuration.utils.policyAggregation import dict_op, dict_elemwise_sum
from cadCAD.configuration.utils.udc import udcBroker, next_udc_view, generate_udc_view
# ToDo: Create member for past value
class MyClassA(object):
class MyClass(object):
def __init__(self, x):
self.x = x
print(f"Instance of MyClass (mem_id {hex(id(self))}) created with value {self.x}")
def update(self):
self.x += 1
print(f"Instance of MyClass (mem_id {hex(id(self))}) has been updated, has now value {self.x}")
# return self.x
return self
def getMemID(self):
return str(hex(id(self)))
# can be accessed after an update within the same substep and timestep
# ToDo: id sensitive to lineage, rerepresent
def __str__(self):
# return f"{self.__class__.__name__} - {hex(id(self))} - {self.__dict__}"
return f"{self.__dict__}"
pass
# a is Correct, and classX's value is Incorrect
# Expected: a == classX's value
# b should be tracking classX's value and a:
# b should be the same value as the previous classX value and the previous a value
# https://pymotw.com/2/multiprocessing/communication.html
# ccc = MyClassA
# udc = ccc(0)
# print(MyClassA(**udc.__dict__).__dict__)
# can be accessed after an update within the same substep and timestep
g: Dict[str, List[MyClassA]] = {'udc': [MyClassA]}
# udc = MyClassA(0)
# wrapped_udc = UDC(udc)
# hydra_members = wrapped_udc.get_object()
hydra_state_view = generate_udc_view(MyClass(0))
udc_view_B = generate_udc_view(MyClass(0))
udc_view_C = generate_udc_view(MyClass(0))
# udcB = MyClassB()
# z = MyClass()
# pointer(z)
# separate thread/process for UCD with async calls to this thread/process
# genesis state
# udc_obj = MyClassA(0)
# hydra = UDC_Wrapper(udc, udc, current_functions=['update'])
# hydra = UDC_Wrapper(udc_obj, functions=['update'])
hydra = UDC(MyClassA(0))
hydra_members = hydra.get_object()
# g: Dict[str, List[int]] = {'MyClassB'}
state_dict = {
'a': 0,
'b': 0,
'j': 0,
"hydra_members": hydra_members,
'a': 0, 'b': 0, 'j': 0,
'k': (0, 0), 'q': (0, 0),
'hydra_state': hydra_state_view,
'policies': {'hydra_B': udc_view_B, 'hydra_C': udc_view_C},
'timestamp': '2019-01-01 00:00:00'
}
def p1(_g, step, sL, s):
s['policies']['hydra_B'].update()
return {'hydra_B': next_udc_view(s['policies']['hydra_B'])}
def p2(_g, step, sL, s):
s['policies']['hydra_C'].update()
return {'hydra_C': next_udc_view(s['policies']['hydra_C'])}
def policies(_g, step, sL, s, _input):
y = 'policies'
x = _input
return (y, x)
timestep_duration = timedelta(minutes=1) # In this example, a timestep has a duration of 1 minute.
ts_format = '%Y-%m-%d %H:%M:%S'
def time_model(_g, step, sL, s, _input):
@ -68,90 +62,90 @@ def time_model(_g, step, sL, s, _input):
def HydraMembers(_g, step, sL, s, _input):
y = 'hydra_members'
obj = s['hydra_members'].obj
obj.update()
x = UDC(obj).get_object()
y = 'hydra_state'
# PROBLEM:
# s['hydra_members'].update()
# x = s['hydra_members']
# SOLUTION:
s['hydra_state'].update()
x = next_udc_view(s['hydra_state'])
return (y, x)
def repr(_g, step, sL, s, _input):
y = 'z'
x = s['hydra_members'].__repr__()
return (y, x)
def A(_g, step, sL, s, _input):
y = 'a'
x = s['a'] + 1
return (y, x)
def B(_g, step, sL, s, _input):
y = 'b'
# x = s['hydra_members']['x']
x = s['hydra_members'].x
# x = s['hydra_obj'].x
return (y, x)
def hydra_state_tracker(y):
return lambda _g, step, sL, s, _input: (y, s['hydra_state'].x)
def J(_g, step, sL, s, _input):
y = 'j'
# x = s['hydra_members']['x']
x = s['hydra_members'].x
# x = s['hydra_obj'].x
# x = s['hydra_view'].x
return (y, x)
def hydra_policy_tracker(y):
return lambda _g, step, sL, s, _input: (y, tuple(v.x for k, v in s['policies'].items()))
# needs M1&2 need behaviors
partial_state_update_blocks = {
'PSUB1': {
'behaviors': {
'policies': {
"b1": p1,
"b2": p2
},
'states': {
# 'ca': CA,
'a': A,
'b': B,
# 'hydra': Hydra,
'hydra_members': HydraMembers,
# 'hydra_obj': HydraObj,
# 'hydra_view': HydraView,
# 'i': I,
'j': J,
# 'k': K,
'b': hydra_state_tracker('b'),
'j': hydra_state_tracker('j'),
'k': hydra_policy_tracker('k'),
'q': hydra_policy_tracker('q'),
'hydra_state': HydraMembers,
'timestamp': time_model,
'policies': policies
}
},
'PSUB2': {
'behaviors': {
'policies': {
"b1": p1,
"b2": p2
},
'states': {
# 'ca': CA,
'a': A,
'b': B,
# 'hydra': Hydra,
'hydra_members': HydraMembers,
# 'hydra_obj': HydraObj,
# 'hydra_view': HydraView,
# 'i': I,
'j': J,
# 'k': K,
'b': hydra_state_tracker('b'),
'j': hydra_state_tracker('j'),
'k': hydra_policy_tracker('k'),
'q': hydra_policy_tracker('q'),
'hydra_state': HydraMembers,
'policies': policies
}
},
'PSUB3': {
'behaviors': {
'policies': {
"b1": p1,
"b2": p2
},
'states': {
'a': A,
'b': B,
# 'hydra': Hydra,
'hydra_members': HydraMembers,
# 'hydra_obj': HydraObj,
# 'hydra_view': HydraView,
# 'i': I,
'j': J,
# 'k': K,
'b': hydra_state_tracker('b'),
'j': hydra_state_tracker('j'),
'k': hydra_policy_tracker('k'),
'q': hydra_policy_tracker('q'),
'hydra_state': HydraMembers,
'policies': policies
}
}
}
sim_config = config_sim({
"N": 2,
"T": range(4),
"M": g
"T": range(4)
})
append_configs(sim_config, state_dict, {}, {}, {}, partial_state_update_blocks)
append = lambda a, b: [a, b]
update_dict = lambda a, b: a.update(b)
take_first = lambda a, b: [a, b]
append_configs(sim_config, state_dict, {}, {}, {}, partial_state_update_blocks)#, policy_ops=[foldr(dict_op(take_first))])