cadCAD/SimCAD/configuration/utils/__init__.py

200 lines
6.0 KiB
Python

from datetime import datetime, timedelta
from decimal import Decimal
from copy import deepcopy
from fn.func import curried
import pandas as pd
from SimCAD.utils import rename
from SimCAD.utils import groupByKey, dict_filter, contains_type
from SimCAD.utils import flatMap
from funcy import curry
import pprint
pp = pprint.PrettyPrinter(indent=4)
class TensorFieldReport:
def __init__(self, config_proc):
self.config_proc = config_proc
def create_tensor_field(self, mechanisms, exo_proc, keys=['behaviors', 'states']):
dfs = [self.config_proc.create_matrix_field(mechanisms, k) for k in keys]
df = pd.concat(dfs, axis=1)
for es, i in zip(exo_proc, range(len(exo_proc))):
df['es' + str(i + 1)] = es
df['m'] = df.index + 1
return df
# def s_update(y, x):
# return lambda step, sL, s, _input: (y, x)
#
#
def state_update(y, x):
return lambda step, sL, s, _input: (y, x)
def bound_norm_random(rng, low, high):
res = rng.normal((high+low)/2,(high-low)/6)
if (res<low or res>high):
res = bound_norm_random(rng, low, high)
return Decimal(res)
@curried
def proc_trigger(trigger_step, update_f, step):
if step == trigger_step:
return update_f
else:
return lambda x: x
t_delta = timedelta(days=0, minutes=0, seconds=30)
def time_step(dt_str, dt_format='%Y-%m-%d %H:%M:%S', _timedelta = t_delta):
dt = datetime.strptime(dt_str, dt_format)
t = dt + _timedelta
return t.strftime(dt_format)
t_delta = timedelta(days=0, minutes=0, seconds=1)
def ep_time_step(s, dt_str, fromat_str='%Y-%m-%d %H:%M:%S', _timedelta = t_delta):
if s['mech_step'] == 0:
return time_step(dt_str, fromat_str, _timedelta)
else:
return dt_str
def mech_sweep_filter(mech_field, mechanisms):
mech_dict = dict([(k, v[mech_field]) for k, v in mechanisms.items()])
return dict([
(k, dict_filter(v, lambda v: isinstance(v, list))) for k, v in mech_dict.items()
if contains_type(list(v.values()), list)
])
def state_sweep_filter(raw_exogenous_states):
return dict([(k, v) for k, v in raw_exogenous_states.items() if isinstance(v, list)])
@curried
def sweep_mechs(_type, in_config):
configs = []
filtered_mech_states = mech_sweep_filter(_type, in_config.mechanisms)
if len(filtered_mech_states) > 0:
for mech, state_dict in filtered_mech_states.items():
for state, state_funcs in state_dict.items():
for f in state_funcs:
config = deepcopy(in_config)
config.mechanisms[mech][_type][state] = f
configs.append(config)
del config
else:
configs = [in_config]
return configs
@curried
def sweep_states(state_type, states, in_config):
configs = []
filtered_states = state_sweep_filter(states)
if len(filtered_states) > 0:
for state, state_funcs in filtered_states.items():
for f in state_funcs:
config = deepcopy(in_config)
exploded_states = deepcopy(states)
exploded_states[state] = f
if state_type == 'exogenous':
config.exogenous_states = exploded_states
elif state_type == 'environmental':
config.env_processes = exploded_states
configs.append(config)
del config, exploded_states
else:
configs = [in_config]
return configs
def param_sweep(config, raw_exogenous_states):
return flatMap(
sweep_states('environmental', config.env_processes),
flatMap(
sweep_states('exogenous', raw_exogenous_states),
flatMap(
sweep_mechs('states'),
sweep_mechs('behaviors', config)
)
)
)
def exo_update_per_ts(ep):
@curried
def ep_decorator(f, y, step, sL, s, _input):
if s['mech_step'] + 1 == 1:
return f(step, sL, s, _input)
else:
return (y, s[y])
return {es: ep_decorator(f, es) for es, f in ep.items()}
def sweep(params, sweep_f):
return [rename("sweep_"+sweep_f.__name__+"_"+str(i), curry(sweep_f)(param)) for param, i in zip(params, range(len(params)))]
def parameterize_mechanism(mechanisms):
sweep_lists = []
new_mechanisms = deepcopy(mechanisms)
for mech, update_types in new_mechanisms.items():
for update_type, fkv in update_types.items():
for sk, vfs in fkv.items():
id_sweep_lists = []
if isinstance(vfs, list):
for vf in vfs:
id_sweep_lists.append({mech: {update_type: {sk: vf}}})
if len(id_sweep_lists) != 0:
sweep_lists.append(id_sweep_lists)
zipped_sweep_lists = []
it = iter(sweep_lists)
the_len = len(next(it))
if all(len(l) == the_len for l in it):
zipped_sweep_lists = list(map(lambda x: list(x), list(zip(*sweep_lists))))
else:
raise ValueError('lists have different lengths!')
if len(sweep_lists) == 0:
return [mechanisms]
mechanisms_configs = []
for f_list in zipped_sweep_lists:
mechanisms_copy = deepcopy(mechanisms)
for f_dict in f_list:
updates = list(f_dict.values()).pop()
functs = list(updates.values()).pop()
mech = list(f_dict.keys()).pop()
update_type = list(updates.keys()).pop()
sk = list(functs.keys()).pop()
vf = list(functs.values()).pop()
mechanisms_copy[mech][update_type][sk] = vf
mechanisms_configs.append(mechanisms_copy)
del mechanisms_copy
# pp.pprint(sweep_lists)
# print()
# pp.pprint(zipped_sweep_lists)
# print()
return mechanisms_configs
# def ep_decorator(f, y, step, sL, s, _input):
# if s['mech_step'] + 1 == 1:
# return f(step, sL, s, _input)
# else:
# return (y, s[y])
# return {es: ep_decorator(f, es) for es, f in ep.items()}