This commit is contained in:
Joshua E. Jodesty 2019-03-29 09:10:31 -04:00
parent ac44a7bee8
commit d56d60d7a3
7 changed files with 573 additions and 83 deletions

View File

@ -1,22 +1,22 @@
from collections import namedtuple
from typing import Any, Callable, Dict, List, Tuple
from pathos.pools import ThreadPool as TPool
from copy import deepcopy
from fn.op import foldr, call
from cadCAD.engine.utils import engine_exception
from cadCAD.utils import flatten
import pprint as pp
from cadCAD.utils import flatten, UDC_Wrapper
id_exception: Callable = engine_exception(KeyError, KeyError, None)
class Executor:
def __init__(
self,
policy_ops: List[Callable],
policy_update_exception: Callable = id_exception,
state_update_exception: Callable = id_exception
) -> None:
self,
policy_ops: List[Callable],
policy_update_exception: Callable = id_exception,
state_update_exception: Callable = id_exception
) -> None:
# behavior_ops
self.policy_ops = policy_ops
@ -69,6 +69,7 @@ class Executor:
) -> List[Dict[str, Any]]:
last_in_obj: Dict[str, Any] = deepcopy(sL[-1])
udc = var_dict[0]['udc']
# last_in_obj: Dict[str, Any] = sL[-1]
_input: Dict[str, Any] = self.policy_update_exception(self.get_policy_input(var_dict, sub_step, sL, last_in_obj, policy_funcs))
@ -76,38 +77,37 @@ class Executor:
# ToDo: add env_proc generator to `last_in_copy` iterator as wrapper function
# ToDo: Can be multithreaded ??
# ToDo: Create Separate past state paradigm for which users specify the use of identity / past function
# ToDo: UDC / any class must be deepcopy before every update
# vs an assumed update
def generate_record(state_funcs, alt_udc_dict):
for k, v in last_in_obj.items():
if isinstance(v, dict) and hasattr(v, 'class_id'):
del last_in_obj[k]
# last_class = deepcopy(last_in_obj['classX'])
# def HydraObj(_g, step, sL, s, _input):
# y = 'hydra_obj'
# # x = s['hydra_obj']
# x = namedtuple("Hydra", s['hydra_members'].keys())(*s['hydra_members'].values())
# return (y, x)
# incoming
# past_attr_dict = {k: v for k, v in last_in_obj.items() if
# hasattr(v, 'past_attr') and k == v.past_attr}
# incoming_attr_dict = {k: deepcopy(v) for k, v in last_in_obj.items() if
# hasattr(v, 'past_attr') and k != v.past_attr}
# udcs = {k: deepcopy(v) for k, v in last_in_obj.items() if hasattr(v, 'class_id')}
# non_udcs = {k: deepcopy(v) for k, v in last_in_obj.items() if not hasattr(v, 'class_id')}
new_last_in_obj = dict(list(last_in_obj.items()) + list(alt_udc_dict.items()))
# for f in state_funcs + [HydraObj]:
for f in state_funcs:
# ToDo: Create Named Tuple Here
y, x = f(var_dict, sub_step, sL, new_last_in_obj, _input)
# if isinstance(x, dict) and x['hydra_type'] == Dict and 'class_id' in x.keys():
# x = namedtuple("Hydra", x.keys())(*x.values())
yield self.state_update_exception((y, x))
# past_attr_dict = {k: v for k, v in last_in_obj.items() if 'past' in v.keys()}
# incoming_attr_dict = {k: v for k, v in last_in_obj.items() if 'current' in v.keys()}
# ToDo: Previous Record Cache
# last_in_copy_staging = deepcopy(last_in_obj)
# past_udc = deepcopy(last_in_obj['classX']['current'])
last_in_copy: Dict[str, Any] = dict(
[
self.state_update_exception(f(var_dict, sub_step, sL, last_in_obj, _input)) for f in state_funcs
]
)
# a b c d e f g
udc_dict = {
k: UDC_Wrapper(
v['current'],
udc(**v['current'].__dict__),
current_functions=['update']
).get_hybrid_members()
for k, v in last_in_obj.items() if isinstance(v, dict) and 'current' in v.keys()
}
last_in_copy: Dict[str, Any] = dict(generate_record(state_funcs, udc_dict))
del udc_dict
for k in last_in_obj:
if k not in last_in_copy:
@ -120,16 +120,6 @@ class Executor:
# ToDo: make 'substep' & 'timestep' reserve fields
last_in_copy['substep'], last_in_copy['timestep'], last_in_copy['run'] = sub_step, time_step, run
# # ToDo: Handle conditions
# for k_past, _ in past_attr_dict.items():
# for _, v_current in incoming_attr_dict.items():
# last_in_copy[k_past] = v_current
# last_in_copy['pastX'] = last_class
# last_in_copy['classX']['past'] = past_udc
# last_in_copy['pastX_str'] = past_udc
sL.append(last_in_copy)
del last_in_copy
@ -149,14 +139,6 @@ class Executor:
sub_step = 0
states_list_copy: List[Dict[str, Any]] = deepcopy(states_list)
# for d1 in states_list:
# for d2 in states_list_copy:
# d2['classX'] = d1['classX']
# print()
# pp.pprint(states_list_copy)
# print()
genesis_states: Dict[str, Any] = states_list_copy[-1]
del states_list_copy
genesis_states['substep'], genesis_states['timestep'] = sub_step, time_step
@ -165,7 +147,6 @@ class Executor:
sub_step += 1
for config in configs:
s_conf, p_conf = config[0], config[1]
# states_list["classX"] = deepcopy(classX)
states_list: List[Dict[str, Any]] = self.partial_state_update(
var_dict, sub_step, states_list, s_conf, p_conf, env_processes, time_step, run
)
@ -212,10 +193,6 @@ class Executor:
run += 1
states_list_copy: List[Dict[str, Any]] = deepcopy(states_list)
# for d1 in states_list:
# for d2 in states_list_copy:
# d2['classX'] = d1['classX']
head, *tail = self.run_pipeline(var_dict, states_list_copy, configs, env_processes, time_seq, run)
del states_list_copy

View File

@ -2,6 +2,44 @@ from typing import Dict, List
from collections import defaultdict
from itertools import product
import warnings
from inspect import getmembers, ismethod
from copy import deepcopy
from collections import namedtuple
class objectview(object):
def __init__(self, d):
self.__dict__ = d
class UDC_Wrapper(object):
def __init__(self, current, past, current_functions, past_functions=[]):
current_funcs = dict(getmembers(current, ismethod))
# current_funcs['object'] = current
filtered_current_funcs = {k: v for k, v in current_funcs.items() if k in current_functions}
# current_members = filtered_current_funcs.update(vars(current))
# past_funcs = dict(getmembers(past, ismethod))
# past_funcs['object'] = past
# filtered_past_funcs = {k: v for k, v in past_funcs.items() if k in past_functions}
# past_members = filtered_past_funcs.update(vars(past))
filtered_current_funcs['hydra_type'] = Dict
filtered_current_funcs.update(vars(past))
# print(filtered_current_funcs)
filtered_current_funcs['current'] = current
filtered_current_funcs['past'] = past
self.hybrid_members = filtered_current_funcs
def get_hybrid_members(self):
return self.hybrid_members
def get_namedtuple(self):
return namedtuple("Hydra", self.hybrid_members.keys())(*self.hybrid_members.values())
# def hybrid_members_values(self):
# return [v for k, v in self.hybrid_members.keys()]
def pipe(x):

View File

@ -2,7 +2,7 @@ import pandas as pd
from tabulate import tabulate
# The following imports NEED to be in the exact order
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from simulations.validation import config_udc
from simulations.validation import config_udc_json2
from cadCAD import configs

View File

@ -0,0 +1,24 @@
import pandas as pd
from tabulate import tabulate
# The following imports NEED to be in the exact order
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from simulations.validation import config_udc_json2
from cadCAD import configs
exec_mode = ExecutionMode()
print("Simulation Execution: Concurrent Execution")
multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
run = Executor(exec_context=multi_proc_ctx, configs=configs)
i = 0
config_names = ['sweep_config_A', 'sweep_config_B']
for raw_result, tensor_field in run.main():
result = pd.DataFrame(raw_result)
print()
print("Tensor Field: " + config_names[i])
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))
print()
i += 1

View File

@ -1,27 +1,26 @@
from datetime import timedelta
from cadCAD.configuration import append_configs
from cadCAD.configuration.utils import ep_time_step, config_sim
from copy import deepcopy
from copy import deepcopy, copy
# ToDo: Create member for past value
class MyClass:
def __init__(self, past_attr):
self.past_self = self
# self.past = self
self.past_attr = past_attr
self.class_id = None
self.x = 0
print(f"Instance of MyClass (mem_id {hex(id(self))}) created with value {self.x}")
def update(self):
# self.past_self = deepcopy(self)
# self.past = copy(self)
self.x += 1
print(f"Instance of MyClass (mem_id {hex(id(self))}) has been updated, has now value {self.x}")
return self.x #self #old_self #self.x
def past(self):
return self.past_self
def getMemID(self):
return str(hex(id(self)))
@ -37,8 +36,8 @@ class MyClass:
# Expected: a == classX's value
# b should be tracking classX's value and a:
# b should be the same value as the previous classX value and the previous a value
udc = MyClass('pastX')
# https://pymotw.com/2/multiprocessing/communication.html
udc = MyClass(0)
# z = MyClass()
# pointer(z)
@ -49,8 +48,9 @@ udc = MyClass('pastX')
# udc_json = {'udc': udc, 'udc-1': udc}
state_dict = {
'classX': udc,
'classX_MemID': udc.getMemID(),
# 'pastX': udc,
# 'classX_MemID': udc.getMemID(),
'pastX': udc,
'otherX': udc,
# 'pastX_MemID': udc.getMemID(),
'a': 0,
'b': udc.x,
@ -72,21 +72,26 @@ def trackClassX(_g, step, sL, s, _input):
x = s['classX']
return (y, x)
def trackClassX_str(_g, step, sL, s, _input):
y = 'classX_MemID'
x = s['classX'].getMemID()
return (y, x)
# def trackClassX_str(_g, step, sL, s, _input):
# y = 'classX_MemID'
# x = s['classX'].getMemID()
# return (y, x)
def updatePastX(_g, step, sL, s, _input):
y = 'pastX'
x = s['pastX']
return (y, x)
def updatePastX_str(_g, step, sL, s, _input):
y = 'pastX_MemID'
x = s['pastX'].getMemID()
def updateOtherX(_g, step, sL, s, _input):
y = 'otherX'
x = s['otherX']
return (y, x)
# def updatePastX_str(_g, step, sL, s, _input):
# y = 'pastX_MemID'
# x = s['pastX'].getMemID()
# return (y, x)
def updateA(_g, step, sL, s, _input):
y = 'a'
x = s['a'] + 1
@ -109,7 +114,7 @@ def updateZ(_g, step, sL, s, _input):
def updateC2(_g, step, sL, s, _input):
y = 'c2'
x = s['classX'].x
x = s['pastX'].x
return (y, x)
partial_state_update_blocks = {
@ -122,10 +127,9 @@ partial_state_update_blocks = {
'c': updateC,
'c2': updateC2,
'classX': trackClassX,
'otherX': updateOtherX,
'timestamp': time_model,
'classX_MemID': trackClassX_str,
# 'pastX': updatePastX,
# 'pastX_MemID': updatePastX_str,
'pastX': updatePastX,
'z': updateZ
}
},
@ -138,8 +142,9 @@ partial_state_update_blocks = {
'c': updateC,
'c2': updateC2,
'classX': trackClassX,
'classX_MemID': trackClassX_str,
# 'pastX': updatePastX,
'otherX': updateOtherX,
# 'classX_MemID': trackClassX_str,
'pastX': updatePastX,
# 'pastX_MemID': updatePastX_str,
'z': updateZ
}
@ -153,8 +158,9 @@ partial_state_update_blocks = {
'c': updateC,
'c2': updateC2,
'classX': trackClassX,
'classX_MemID': trackClassX_str,
# 'pastX': updatePastX,
'otherX': updateOtherX,
# 'classX_MemID': trackClassX_str,
'pastX': updatePastX,
# 'pastX_MemID': updatePastX_str,
'z': updateZ
}

View File

@ -0,0 +1,210 @@
from datetime import timedelta
from cadCAD.configuration import append_configs
from cadCAD.configuration.utils import ep_time_step, config_sim
# ToDo: Create member for past value
class MyClassA:
def __init__(self):
self.class_id = None
self.x = 0
print(f"Instance of MyClass (mem_id {hex(id(self))}) created with value {self.x}")
def update(self):
# self.past = copy(self)
self.x += 1
print(f"Instance of MyClass (mem_id {hex(id(self))}) has been updated, has now value {self.x}")
return self.x #self #old_self #self.x
def getMemID(self):
return str(hex(id(self)))
# can be accessed after an update within the same substep and timestep
# ToDo: id sensitive to lineage, rerepresent
def __str__(self):
# return str(self.x)
return f"{hex(id(self))} - {self.x}"
class MyClassB:
def __init__(self):
self.class_id = None
self.x = 5
print(f"Instance of MyClass (mem_id {hex(id(self))}) created with value {self.x}")
def update(self):
# self.past = copy(self)
self.x += 1
print(f"Instance of MyClass (mem_id {hex(id(self))}) has been updated, has now value {self.x}")
return self.x #self #old_self #self.x
def getMemID(self):
return str(hex(id(self)))
# can be accessed after an update within the same substep and timestep
# ToDo: id sensitive to lineage, rerepresent
def __str__(self):
# return str(self.x)
return f"{hex(id(self))} - {self.x}"
# a is Correct, and classX's value is Incorrect
# Expected: a == classX's value
# b should be tracking classX's value and a:
# b should be the same value as the previous classX value and the previous a value
# https://pymotw.com/2/multiprocessing/communication.html
udcA = MyClassA()
udcB = MyClassB()
# z = MyClass()
# pointer(z)
# separate thread/process for UCD with async calls to this thread/process
# genesis state
# udc_json = {'udc': udc, 'udc-1': udc}
state_dict = {
'ca': 0,
'cb': udcA.x,
'cblassX': udcA,
'cc': udcA.x,
'cz': udcA.x,
'da': 5,
'db': udcB.x,
'dblassX': udcB,
'dc': udcB.x,
'dz': udcB.x,
'timestamp': '2019-01-01 00:00:00'
}
timestep_duration = timedelta(minutes=1) # In this example, a timestep has a duration of 1 minute.
ts_format = '%Y-%m-%d %H:%M:%S'
def time_model(_g, step, sL, s, _input):
y = 'timestamp'
x = ep_time_step(s, dt_str=s['timestamp'], fromat_str=ts_format, _timedelta=timestep_duration)
return (y, x)
def CBlassX(_g, step, sL, s, _input):
y = 'cblassX'
# x = s['cblassX']
x = _g['cblassX']
return (y, x)
def DBlassX(_g, step, sL, s, _input):
y = 'dblassX'
# x = s['dblassX']
x = _g['dblassX']
return (y, x)
def CA(_g, step, sL, s, _input):
y = 'ca'
x = s['ca'] + 1
return (y, x)
def DA(_g, step, sL, s, _input):
y = 'da'
x = s['da'] + 1
return (y, x)
def CB(_g, step, sL, s, _input):
y = 'cb'
x = _g['cblassX'].x
# x = s['cblassX'].x
return (y, x)
def DB(_g, step, sL, s, _input):
y = 'db'
x = _g['dblassX'].x
# x = s['dblassX'].x
return (y, x)
def CC(_g, step, sL, s, _input):
y = 'cc'
# x = s['cblassX'].update()
x = _g['cblassX'].update()
return (y, x)
def DC(_g, step, sL, s, _input):
# s['dblassX'] = _g['dblassX'].update()
y = 'dc'
# x = s['dblassX'].update()
x = _g['dblassX'].update()
return (y, x)
def CZ(_g, step, sL, s, _input):
y = 'cz'
x = _g['cblassX'].x
# x = s['cblassX'].x
return (y, x)
def DZ(_g, step, sL, s, _input):
y = 'dz'
x = _g['dblassX'].x
# x = s['dblassX'].x
return (y, x)
partial_state_update_blocks = {
'PSUB1': {
'behaviors': {
},
'states': {
'ca': CA,
'cb': CB,
'cblassX': CBlassX,
'cc': CC,
'cz': CZ,
'da': DA,
'db': DB,
'dblassX': DBlassX,
'dc': DC,
'dz': DZ,
'timestamp': time_model,
}
},
'PSUB2': {
'behaviors': {
},
'states': {
'ca': CA,
'cb': CB,
'cblassX': CBlassX,
'cc': CC,
'cz': CZ,
'da': DA,
'db': DB,
'dblassX': DBlassX,
'dc': DC,
'dz': DZ,
}
},
'PSUB3': {
'behaviors': {
},
'states': {
'ca': CA,
'cb': CB,
'cblassX': CBlassX,
'cc': CC,
'cz': CZ,
'da': DA,
'db': DB,
'dblassX': DBlassX,
'dc': DC,
'dz': DZ,
}
}
}
sim_config = config_sim({
"N": 2,
"T": range(4)
})
append_configs(sim_config, state_dict, {}, {}, {}, partial_state_update_blocks)

View File

@ -0,0 +1,235 @@
from copy import deepcopy, copy
from datetime import timedelta
from cadCAD.utils import UDC_Wrapper, objectview
from cadCAD.configuration import append_configs
from cadCAD.configuration.utils import ep_time_step, config_sim
from typing import Dict, List, Any
from collections import namedtuple
# ToDo: Create member for past value
class MyClassA(object):
def __init__(self, x, class_id=None):
self.class_id = class_id
self.x = x
print(f"Instance of MyClass (mem_id {hex(id(self))}) created with value {self.x}")
def update(self):
# self.past = copy(self)
self.x += 1
print(f"Instance of MyClass (mem_id {hex(id(self))}) has been updated, has now value {self.x}")
return self.x #self #old_self #self.x
def getMemID(self):
return str(hex(id(self)))
# can be accessed after an update within the same substep and timestep
# ToDo: id sensitive to lineage, rerepresent
def __str__(self):
# return str(self.x)
return f"{hex(id(self))} - {self.x}"
class MyClassB:
def __init__(self, x):
self.class_id = None
self.x = x
print(f"Instance of MyClass (mem_id {hex(id(self))}) created with value {self.x}")
def update(self):
# self.past = copy(self)
self.x += 1
print(f"Instance of MyClass (mem_id {hex(id(self))}) has been updated, has now value {self.x}")
return self.x #self #old_self #self.x
def getMemID(self):
return str(hex(id(self)))
# can be accessed after an update within the same substep and timestep
# ToDo: id sensitive to lineage, rerepresent
def __str__(self):
# return str(self.x)
return f"{hex(id(self))} - {self.x}"
# a is Correct, and classX's value is Incorrect
# Expected: a == classX's value
# b should be tracking classX's value and a:
# b should be the same value as the previous classX value and the previous a value
# https://pymotw.com/2/multiprocessing/communication.html
# ccc = MyClassA
# udc = ccc(0)
# print(MyClassA(**udc.__dict__).__dict__)
g: Dict[str, List[MyClassA]] = {'udc': [MyClassA]}
# udcB = MyClassB()
# z = MyClass()
# pointer(z)
# separate thread/process for UCD with async calls to this thread/process
# genesis state
udc = MyClassA(0)
# namedtuple("Hydra", self.hybrid_members.keys())(*self.hybrid_members.values())
udc_json = {'current': udc, 'past': udc}
hydra = UDC_Wrapper(udc, udc, current_functions=['update'])
hydra_members = hydra.get_hybrid_members()
hydra_obj = namedtuple("Hydra", hydra_members.keys())(*hydra_members.values())
state_dict = {
'a': 0,
'b': 0,
'i': 0,
'j': 0,
'k': 0,
# "hydra": hydra,
"hydra_members": hydra_members,
"hydra_obj": hydra_obj,
'hydra_view': objectview(hydra_members),
'timestamp': '2019-01-01 00:00:00'
}
timestep_duration = timedelta(minutes=1) # In this example, a timestep has a duration of 1 minute.
ts_format = '%Y-%m-%d %H:%M:%S'
def time_model(_g, step, sL, s, _input):
y = 'timestamp'
x = ep_time_step(s, dt_str=s['timestamp'], fromat_str=ts_format, _timedelta=timestep_duration)
return (y, x)
def Hydra(_g, step, sL, s, _input):
y = 'hydra'
x = s['hydra']
return (y, x)
def HydraMembers(_g, step, sL, s, _input):
y = 'hydra_members'
x = s['hydra_members'] #.get_hybrid_members()
return (y, x)
def HydraObj(_g, step, sL, s, _input):
y = 'hydra_obj'
# x = s['hydra_obj']
# v = list(map(lambda x: copy(x), list(s['hydra_members'].values())))
# hydra_members = s['hydra_members']
# def generate_var_deepcopy(hydra_members):
# for k, v in hydra_members.items():
# if k == 'x':
# yield k, deepcopy(v)
# else:
# yield k, v
#
# hydra_nt = namedtuple("Hydra", s['hydra_members'].keys())(*s['hydra_members'].values())
# new_hydra = dict(generate_var_deepcopy(hydra_nt))
# new_hydra_members = dict(generate_var_deepcopy(hydra_members))
x = namedtuple("Hydra", s['hydra_members'].keys())(*s['hydra_members'].values())
# x = namedtuple("Hydra", new_hydra.keys())(*new_hydra.values())
# print(x.x)
return (y, x)
def HydraView(_g, step, sL, s, _input):
y = 'hydra_view'
x = objectview(s['hydra_members'])
return (y, x)
def A(_g, step, sL, s, _input):
y = 'a'
x = s['a'] + 1
return (y, x)
def B(_g, step, sL, s, _input):
y = 'b'
x = s['hydra_members']['x']
# x = s['hydra_members'].x
# x = s['hydra_obj'].x
return (y, x)
def I(_g, step, sL, s, _input):
y = 'i'
# x = s['hydra_members']['update']()
# Either works
# x = s['hydra_members'].update()
x = s['hydra_obj'].update()
return (y, x)
def J(_g, step, sL, s, _input):
y = 'j'
x = s['hydra_members']['x']
# x = s['hydra_members'].x
# x = s['hydra_obj'].x
return (y, x)
def K(_g, step, sL, s, _input):
y = 'k'
# x = s['hydra_view'].x
x = s['hydra_obj'].x
return (y, x)
partial_state_update_blocks = {
'PSUB1': {
'behaviors': {
},
'states': {
# 'ca': CA,
'a': A,
'b': B,
# 'hydra': Hydra,
'hydra_members': HydraMembers,
'hydra_obj': HydraObj,
'hydra_view': HydraView,
'i': I,
'j': J,
'k': K,
'timestamp': time_model,
}
},
'PSUB2': {
'behaviors': {
},
'states': {
# 'ca': CA,
'a': A,
'b': B,
# 'hydra': Hydra,
'hydra_members': HydraMembers,
'hydra_obj': HydraObj,
'hydra_view': HydraView,
'i': I,
'j': J,
'k': K,
}
},
'PSUB3': {
'behaviors': {
},
'states': {
'a': A,
'b': B,
# 'hydra': Hydra,
'hydra_members': HydraMembers,
'hydra_obj': HydraObj,
'hydra_view': HydraView,
'i': I,
'j': J,
'k': K,
}
}
}
sim_config = config_sim({
"N": 2,
"T": range(4),
"M": g
})
append_configs(sim_config, state_dict, {}, {}, {}, partial_state_update_blocks)