Merge pull request #48 from BlockScience/staging

Staging
This commit is contained in:
Joshua E. Jodesty 2019-03-29 09:30:45 -04:00 committed by GitHub
commit b2b466493b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 226 additions and 113 deletions

2
.gitignore vendored
View File

@ -19,3 +19,5 @@ cadCAD.egg-info
build
cadCAD.egg-info
SimCAD.egg-info
monkeytype.sqlite3

View File

@ -129,4 +129,4 @@ for raw_result, tensor_field in run2.main():
The above can be run in Jupyter.
```bash
jupyter notebook
```
```

View File

@ -1,9 +1,10 @@
from typing import Dict, Callable, List, Tuple
from functools import reduce
from fn.op import foldr
import pandas as pd
from pandas.core.frame import DataFrame
from cadCAD import configs
from cadCAD.utils import key_filter
from cadCAD.configuration.utils import exo_update_per_ts
from cadCAD.configuration.utils.policyAggregation import dict_elemwise_sum
@ -12,7 +13,8 @@ from cadCAD.configuration.utils.depreciationHandler import sanitize_partial_stat
class Configuration(object):
def __init__(self, sim_config={}, initial_state={}, seeds={}, env_processes={},
exogenous_states={}, partial_state_update_blocks={}, policy_ops=[foldr(dict_elemwise_sum())], **kwargs):
exogenous_states={}, partial_state_update_blocks={}, policy_ops=[foldr(dict_elemwise_sum())],
**kwargs) -> None:
self.sim_config = sim_config
self.initial_state = initial_state
self.seeds = seeds
@ -25,7 +27,8 @@ class Configuration(object):
sanitize_config(self)
def append_configs(sim_configs={}, initial_state={}, seeds={}, raw_exogenous_states={}, env_processes={}, partial_state_update_blocks={}, _exo_update_per_ts=True):
def append_configs(sim_configs={}, initial_state={}, seeds={}, raw_exogenous_states={}, env_processes={},
partial_state_update_blocks={}, _exo_update_per_ts: bool = True) -> None:
if _exo_update_per_ts is True:
exogenous_states = exo_update_per_ts(raw_exogenous_states)
else:
@ -55,22 +58,22 @@ def append_configs(sim_configs={}, initial_state={}, seeds={}, raw_exogenous_sta
class Identity:
def __init__(self, policy_id={'identity': 0}):
def __init__(self, policy_id: Dict[str, int] = {'identity': 0}) -> None:
self.beh_id_return_val = policy_id
def p_identity(self, var_dict, sub_step, sL, s):
return self.beh_id_return_val
def policy_identity(self, k):
def policy_identity(self, k: str) -> Callable:
return self.p_identity
def no_state_identity(self, var_dict, sub_step, sL, s, _input):
return None
def state_identity(self, k):
def state_identity(self, k: str) -> Callable:
return lambda var_dict, sub_step, sL, s, _input: (k, s[k])
def apply_identity_funcs(self, identity, df, cols):
def apply_identity_funcs(self, identity: Callable, df: DataFrame, cols: List[str]) -> List[DataFrame]:
def fillna_with_id_func(identity, df, col):
return df[[col]].fillna(value=identity(col))
@ -78,7 +81,7 @@ class Identity:
class Processor:
def __init__(self, id=Identity()):
def __init__(self, id: Identity = Identity()) -> None:
self.id = id
self.p_identity = id.p_identity
self.policy_identity = id.policy_identity
@ -86,7 +89,7 @@ class Processor:
self.state_identity = id.state_identity
self.apply_identity_funcs = id.apply_identity_funcs
def create_matrix_field(self, partial_state_updates, key):
def create_matrix_field(self, partial_state_updates, key: str) -> DataFrame:
if key == 'variables':
identity = self.state_identity
elif key == 'policies':
@ -99,7 +102,8 @@ class Processor:
else:
return pd.DataFrame({'empty': []})
def generate_config(self, initial_state, partial_state_updates, exo_proc):
def generate_config(self, initial_state, partial_state_updates, exo_proc
) -> List[Tuple[List[Callable], List[Callable]]]:
def no_update_handler(bdf, sdf):
if (bdf.empty == False) and (sdf.empty == True):
@ -135,4 +139,4 @@ class Processor:
sdf_values, bdf_values = only_ep_handler(initial_state)
zipped_list = list(zip(sdf_values, bdf_values))
return list(map(lambda x: (x[0] + exo_proc, x[1]), zipped_list))
return list(map(lambda x: (x[0] + exo_proc, x[1]), zipped_list))

View File

@ -6,7 +6,7 @@ import pandas as pd
# Temporary
from cadCAD.configuration.utils.depreciationHandler import sanitize_partial_state_updates
from cadCAD.utils import dict_filter, contains_type
from cadCAD.utils import dict_filter, contains_type, flatten_tabulated_dict, tabulate_dict
# ToDo: Fix - Returns empty when partial_state_update is missing in Configuration
@ -122,4 +122,23 @@ def exo_update_per_ts(ep):
else:
return y, s[y]
return {es: ep_decorator(f, es) for es, f in ep.items()}
return {es: ep_decorator(f, es) for es, f in ep.items()}
# Param Sweep enabling middleware
def config_sim(d):
def process_variables(d):
return flatten_tabulated_dict(tabulate_dict(d))
if "M" in d:
return [
{
"N": d["N"],
"T": d["T"],
"M": M
}
for M in process_variables(d["M"])
]
else:
d["M"] = [{}]
return d

View File

@ -30,10 +30,10 @@ def sanitize_partial_state_updates(partial_state_updates):
# Also for backwards compatibility, we accept partial state update blocks both as list or dict
# No need for a deprecation warning as it's already raised by cadCAD.utils.key_filter
if (type(new_partial_state_updates)==list):
if isinstance(new_partial_state_updates, list):
for v in new_partial_state_updates:
rename_keys(v)
elif (type(new_partial_state_updates)==dict):
elif isinstance(new_partial_state_updates, dict):
for k, v in new_partial_state_updates.items():
rename_keys(v)

View File

@ -1,20 +0,0 @@
from cadCAD.utils import flatten_tabulated_dict, tabulate_dict
def process_variables(d):
return flatten_tabulated_dict(tabulate_dict(d))
def config_sim(d):
if "M" in d:
return [
{
"N": d["N"],
"T": d["T"],
"M": M
}
for M in process_variables(d["M"])
]
else:
d["M"] = [{}]
return d

View File

@ -14,7 +14,7 @@ def get_base_value(x):
def policy_to_dict(v):
return dict(list(zip(map(lambda n: 'b' + str(n + 1), list(range(len(v)))), v)))
return dict(list(zip(map(lambda n: 'p' + str(n + 1), list(range(len(v)))), v)))
add = lambda a, b: a + b

View File

@ -1,33 +1,59 @@
from pathos.multiprocessing import ProcessingPool as Pool
from typing import Callable, Dict, List, Any, Tuple
from pathos.multiprocessing import ProcessingPool as PPool
from pandas.core.frame import DataFrame
from cadCAD.utils import flatten
from cadCAD.configuration import Processor
from cadCAD.configuration import Configuration, Processor
from cadCAD.configuration.utils import TensorFieldReport
from cadCAD.engine.simulation import Executor as SimExecutor
VarDictType = Dict[str, List[Any]]
StatesListsType = List[Dict[str, Any]]
ConfigsType = List[Tuple[List[Callable], List[Callable]]]
EnvProcessesType = Dict[str, Callable]
class ExecutionMode:
single_proc = 'single_proc'
multi_proc = 'multi_proc'
def single_proc_exec(
simulation_execs: List[Callable],
var_dict_list: List[VarDictType],
states_lists: List[StatesListsType],
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
Ns: List[int]
):
l = [simulation_execs, states_lists, configs_structs, env_processes_list, Ts, Ns]
simulation_exec, states_list, config, env_processes, T, N = list(map(lambda x: x.pop(), l))
result = simulation_exec(var_dict_list, states_list, config, env_processes, T, N)
return flatten(result)
def parallelize_simulations(
simulation_execs: List[Callable],
var_dict_list: List[VarDictType],
states_lists: List[StatesListsType],
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
Ns: List[int]
):
l = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns))
with PPool(len(configs_structs)) as p:
results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6]), l)
return results
class ExecutionContext:
def __init__(self, context=ExecutionMode.multi_proc):
def __init__(self, context: str = ExecutionMode.multi_proc) -> None:
self.name = context
self.method = None
def single_proc_exec(simulation_execs, var_dict, states_lists, configs_structs, env_processes_list, Ts, Ns):
l = [simulation_execs, states_lists, configs_structs, env_processes_list, Ts, Ns]
simulation, states_list, config, env_processes, T, N = list(map(lambda x: x.pop(), l))
result = simulation(var_dict, states_list, config, env_processes, T, N)
return flatten(result)
def parallelize_simulations(fs, var_dict_list, states_list, configs, env_processes, Ts, Ns):
l = list(zip(fs, var_dict_list, states_list, configs, env_processes, Ts, Ns))
with Pool(len(configs)) as p:
results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6]), l)
return results
if context == 'single_proc':
self.method = single_proc_exec
elif context == 'multi_proc':
@ -35,14 +61,14 @@ class ExecutionContext:
class Executor:
def __init__(self, exec_context, configs):
def __init__(self, exec_context: ExecutionContext, configs: List[Configuration]) -> None:
self.SimExecutor = SimExecutor
self.exec_method = exec_context.method
self.exec_context = exec_context.name
self.configs = configs
self.main = self.execute
def execute(self):
def execute(self) -> Tuple[List[Dict[str, Any]], DataFrame]:
config_proc = Processor()
create_tensor_field = TensorFieldReport(config_proc).create_tensor_field
@ -64,11 +90,13 @@ class Executor:
config_idx += 1
final_result = None
if self.exec_context == ExecutionMode.single_proc:
# ToDO: Deprication Handler - "sanitize" in appropriate place
tensor_field = create_tensor_field(partial_state_updates.pop(), eps.pop())
result = self.exec_method(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns)
return result, tensor_field
final_result = result, tensor_field
elif self.exec_context == ExecutionMode.multi_proc:
if len(self.configs) > 1:
simulations = self.exec_method(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns)
@ -76,4 +104,6 @@ class Executor:
for result, partial_state_updates, ep in list(zip(simulations, partial_state_updates, eps)):
results.append((flatten(result), create_tensor_field(partial_state_updates, ep)))
return results
final_result = results
return final_result

View File

@ -1,20 +1,38 @@
from typing import Any, Callable, Dict, List, Tuple
from pathos.pools import ThreadPool as TPool
from copy import deepcopy
from fn.op import foldr, call
from cadCAD.engine.utils import engine_exception
from cadCAD.utils import flatten
id_exception = engine_exception(KeyError, KeyError, None)
id_exception: Callable = engine_exception(KeyError, KeyError, None)
class Executor:
def __init__(
self,
policy_ops: List[Callable],
policy_update_exception: Callable = id_exception,
state_update_exception: Callable = id_exception
) -> None:
def __init__(self, policy_ops, policy_update_exception=id_exception, state_update_exception=id_exception):
self.policy_ops = policy_ops # behavior_ops
# behavior_ops
self.policy_ops = policy_ops
self.state_update_exception = state_update_exception
self.policy_update_exception = policy_update_exception # behavior_update_exception
self.policy_update_exception = policy_update_exception
# behavior_update_exception
# get_behavior_input # sL: State Window
def get_policy_input(
self,
var_dict: Dict[str, List[Any]],
sub_step: int,
sL: List[Dict[str, Any]],
s: Dict[str, Any],
funcs: List[Callable]
) -> Dict[str, Any]:
# get_behavior_input
def get_policy_input(self, var_dict, sub_step, sL, s, funcs):
ops = self.policy_ops[::-1]
def get_col_results(var_dict, sub_step, sL, s, funcs):
@ -22,23 +40,39 @@ class Executor:
return foldr(call, get_col_results(var_dict, sub_step, sL, s, funcs))(ops)
def apply_env_proc(self, env_processes, state_dict, sub_step):
def apply_env_proc(
self,
env_processes: Dict[str, Callable],
state_dict: Dict[str, Any],
sub_step: int
) -> None:
for state in state_dict.keys():
if state in list(env_processes.keys()):
env_state = env_processes[state]
env_state: Callable = env_processes[state]
if (env_state.__name__ == '_curried') or (env_state.__name__ == 'proc_trigger'):
state_dict[state] = env_state(sub_step)(state_dict[state])
else:
state_dict[state] = env_state(state_dict[state])
# mech_step
def partial_state_update(self, var_dict, sub_step, sL, state_funcs, policy_funcs, env_processes, time_step, run):
last_in_obj = sL[-1]
def partial_state_update(
self,
var_dict: Dict[str, List[Any]],
sub_step: int,
sL: Any,
state_funcs: List[Callable],
policy_funcs: List[Callable],
env_processes: Dict[str, Callable],
time_step: int,
run: int
) -> List[Dict[str, Any]]:
_input = self.policy_update_exception(self.get_policy_input(var_dict, sub_step, sL, last_in_obj, policy_funcs))
last_in_obj: Dict[str, Any] = sL[-1]
_input: Dict[str, Any] = self.policy_update_exception(self.get_policy_input(var_dict, sub_step, sL, last_in_obj, policy_funcs))
# ToDo: add env_proc generator to `last_in_copy` iterator as wrapper function
last_in_copy = dict(
last_in_copy: Dict[str, Any] = dict(
[
self.state_update_exception(f(var_dict, sub_step, sL, last_in_obj, _input)) for f in state_funcs
]
@ -52,52 +86,92 @@ class Executor:
self.apply_env_proc(env_processes, last_in_copy, last_in_copy['timestep'])
# ToDo: make 'substep' & 'timestep' reserve fields
last_in_copy['substep'], last_in_copy['timestep'], last_in_copy['run'] = sub_step, time_step, run
sL.append(last_in_copy)
del last_in_copy
return sL
# mech_pipeline - state_update_block
def state_update_pipeline(
self,
var_dict: Dict[str, List[Any]],
states_list: List[Dict[str, Any]],
configs: List[Tuple[List[Callable], List[Callable]]],
env_processes: Dict[str, Callable],
time_step: int,
run: int
) -> List[Dict[str, Any]]:
# mech_pipeline
def state_update_pipeline(self, var_dict, states_list, configs, env_processes, time_step, run):
sub_step = 0
states_list_copy = deepcopy(states_list)
genesis_states = states_list_copy[-1]
states_list_copy: List[Dict[str, Any]] = deepcopy(states_list)
genesis_states: Dict[str, Any] = states_list_copy[-1]
genesis_states['substep'], genesis_states['timestep'] = sub_step, time_step
states_list = [genesis_states]
states_list: List[Dict[str, Any]] = [genesis_states]
sub_step += 1
for config in configs:
s_conf, p_conf = config[0], config[1]
states_list = self.partial_state_update(var_dict, sub_step, states_list, s_conf, p_conf, env_processes, time_step, run)
states_list: List[Dict[str, Any]] = self.partial_state_update(
var_dict, sub_step, states_list, s_conf, p_conf, env_processes, time_step, run
)
sub_step += 1
time_step += 1
return states_list
def run_pipeline(self, var_dict, states_list, configs, env_processes, time_seq, run):
time_seq = [x + 1 for x in time_seq]
simulation_list = [states_list]
# state_update_pipeline
def run_pipeline(
self,
var_dict: Dict[str, List[Any]],
states_list: List[Dict[str, Any]],
configs: List[Tuple[List[Callable], List[Callable]]],
env_processes: Dict[str, Callable],
time_seq: range,
run: int
) -> List[List[Dict[str, Any]]]:
time_seq: List[int] = [x + 1 for x in time_seq]
simulation_list: List[List[Dict[str, Any]]] = [states_list]
for time_step in time_seq:
pipe_run = self.state_update_pipeline(var_dict, simulation_list[-1], configs, env_processes, time_step, run)
pipe_run: List[Dict[str, Any]] = self.state_update_pipeline(
var_dict, simulation_list[-1], configs, env_processes, time_step, run
)
_, *pipe_run = pipe_run
simulation_list.append(pipe_run)
return simulation_list
# ToDo: Muiltithreaded Runs
def simulation(self, var_dict, states_list, configs, env_processes, time_seq, runs):
pipe_run = []
for run in range(runs):
def simulation(
self,
var_dict: Dict[str, List[Any]],
states_list: List[Dict[str, Any]],
configs: List[Tuple[List[Callable], List[Callable]]],
env_processes: Dict[str, Callable],
time_seq: range,
runs: int
) -> List[List[Dict[str, Any]]]:
def execute_run(var_dict, states_list, configs, env_processes, time_seq, run) -> List[Dict[str, Any]]:
run += 1
states_list_copy = deepcopy(states_list)
states_list_copy: List[Dict[str, Any]] = deepcopy(states_list)
head, *tail = self.run_pipeline(var_dict, states_list_copy, configs, env_processes, time_seq, run)
genesis = head.pop()
genesis['substep'], genesis['timestep'], genesis['run'] = 0, 0, run
first_timestep_per_run = [genesis] + tail.pop(0)
pipe_run += [first_timestep_per_run] + tail
del states_list_copy
return pipe_run
genesis: Dict[str, Any] = head.pop()
genesis['substep'], genesis['timestep'], genesis['run'] = 0, 0, run
first_timestep_per_run: List[Dict[str, Any]] = [genesis] + tail.pop(0)
return [first_timestep_per_run] + tail
pipe_run: List[List[Dict[str, Any]]] = flatten(
TPool().map(
lambda run: execute_run(var_dict, states_list, configs, env_processes, time_seq, run),
list(range(runs))
)
)
return pipe_run

View File

@ -39,4 +39,4 @@ def engine_exception(ErrorType, error_message, exception_function, try_function)
def fit_param(param, x):
return x + param
# fit_param = lambda param: lambda x: x + param
# fit_param = lambda param: lambda x: x + param

View File

@ -1,7 +1,9 @@
from typing import Dict, List
from collections import defaultdict
from itertools import product
import warnings
def pipe(x):
return x
@ -41,11 +43,11 @@ def dict_filter(dictionary, condition):
return dict([(k, v) for k, v in dictionary.items() if condition(v)])
def get_max_dict_val_len(g):
def get_max_dict_val_len(g: Dict[str, List[int]]) -> int:
return len(max(g.values(), key=len))
def tabulate_dict(d):
def tabulate_dict(d: Dict[str, List[int]]) -> Dict[str, List[int]]:
max_len = get_max_dict_val_len(d)
_d = {}
for k, vl in d.items():
@ -57,7 +59,7 @@ def tabulate_dict(d):
return _d
def flatten_tabulated_dict(d):
def flatten_tabulated_dict(d: Dict[str, List[int]]) -> List[Dict[str, int]]:
max_len = get_max_dict_val_len(d)
dl = [{} for i in range(max_len)]
@ -133,4 +135,4 @@ def curry_pot(f, *argv):
# def decorator(f):
# f.__name__ = newname
# return f
# return decorator
# return decorator

Binary file not shown.

BIN
dist/cadCAD-0.2.1-py3-none-any.whl vendored Normal file

Binary file not shown.

View File

@ -11,7 +11,7 @@ long_description = "cadCAD is a differential games based simulation software pac
monte carlo analysis and other common numerical methods is provided."
setup(name='cadCAD',
version='0.2',
version='0.2.1',
description="cadCAD: a differential games based simulation software package for research, validation, and \
Computer Aided Design of economic systems",
long_description=long_description,

View File

@ -9,11 +9,11 @@ exec_mode = ExecutionMode()
print("Simulation Execution: Concurrent Execution")
multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
run2 = Executor(exec_context=multi_proc_ctx, configs=configs)
run = Executor(exec_context=multi_proc_ctx, configs=configs)
i = 0
config_names = ['config1', 'config2']
for raw_result, tensor_field in run2.main():
for raw_result, tensor_field in run.main():
result = pd.DataFrame(raw_result)
print()
print("Tensor Field: " + config_names[i])
@ -21,4 +21,4 @@ for raw_result, tensor_field in run2.main():
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))
print()
i += 1
i += 1

View File

@ -9,11 +9,11 @@ exec_mode = ExecutionMode()
print("Simulation Execution: Concurrent Execution")
multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
run2 = Executor(exec_context=multi_proc_ctx, configs=configs)
run = Executor(exec_context=multi_proc_ctx, configs=configs)
i = 0
config_names = ['sweep_config_A', 'sweep_config_B']
for raw_result, tensor_field in run2.main():
for raw_result, tensor_field in run.main():
result = pd.DataFrame(raw_result)
print()
print("Tensor Field: " + config_names[i])
@ -21,4 +21,4 @@ for raw_result, tensor_field in run2.main():
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))
print()
i += 1
i += 1

View File

@ -11,12 +11,13 @@ print("Simulation Execution: Single Configuration")
print()
first_config = configs # only contains config1
single_proc_ctx = ExecutionContext(context=exec_mode.single_proc)
run1 = Executor(exec_context=single_proc_ctx, configs=first_config)
run1_raw_result, tensor_field = run1.main()
result = pd.DataFrame(run1_raw_result)
run = Executor(exec_context=single_proc_ctx, configs=first_config)
raw_result, tensor_field = run.main()
result = pd.DataFrame(raw_result)
print()
print("Tensor Field: config1")
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))
print()
print()

View File

@ -3,8 +3,7 @@ import numpy as np
from datetime import timedelta
from cadCAD.configuration import append_configs
from cadCAD.configuration.utils import proc_trigger, bound_norm_random, ep_time_step
from cadCAD.configuration.utils.parameterSweep import config_sim
from cadCAD.configuration.utils import proc_trigger, bound_norm_random, ep_time_step, config_sim
seeds = {
@ -21,6 +20,8 @@ def p1m1(_g, step, sL, s):
def p2m1(_g, step, sL, s):
return {'param2': 4}
# []
def p1m2(_g, step, sL, s):
return {'param1': 'a', 'param2': 2}
def p2m2(_g, step, sL, s):

View File

@ -3,8 +3,8 @@ import numpy as np
from datetime import timedelta
from cadCAD.configuration import append_configs
from cadCAD.configuration.utils import proc_trigger, bound_norm_random, ep_time_step
from cadCAD.configuration.utils.parameterSweep import config_sim
from cadCAD.configuration.utils import proc_trigger, bound_norm_random, ep_time_step, config_sim
seeds = {
'z': np.random.RandomState(1),

View File

@ -3,8 +3,7 @@ import numpy as np
from datetime import timedelta
from cadCAD.configuration import append_configs
from cadCAD.configuration.utils import proc_trigger, bound_norm_random, ep_time_step
from cadCAD.configuration.utils.parameterSweep import config_sim
from cadCAD.configuration.utils import proc_trigger, bound_norm_random, ep_time_step, config_sim
seeds = {

View File

@ -4,8 +4,9 @@ from datetime import timedelta
import pprint
from cadCAD.configuration import append_configs
from cadCAD.configuration.utils import proc_trigger, ep_time_step
from cadCAD.configuration.utils.parameterSweep import config_sim
from cadCAD.configuration.utils import proc_trigger, ep_time_step, config_sim
from typing import Dict, List
pp = pprint.PrettyPrinter(indent=4)
@ -17,7 +18,7 @@ seeds = {
}
g = {
g: Dict[str, List[int]] = {
'alpha': [1],
'beta': [2, 5],
'gamma': [3, 4],