From ca2a9db8ff929998c68d742eb3278718538f6b94 Mon Sep 17 00:00:00 2001 From: "Joshua E. Jodesty" Date: Fri, 27 Sep 2019 12:43:30 -0400 Subject: [PATCH] missing chat config --- cadCAD/engine/__init__.py | 34 ++--- .../examples/event_bench/consumer.py | 1 - .../event_bench/spark/session/__init__.py | 3 +- simulations/distributed/config1.py | 138 ++++++++++++++++++ 4 files changed, 156 insertions(+), 20 deletions(-) create mode 100644 simulations/distributed/config1.py diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 98dcb54..2f9f781 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -65,6 +65,7 @@ def parallelize_simulations( results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10]), params) return results + def distributed_simulations( simulation_execs: List[Callable], var_dict_list: List[VarDictType], @@ -105,6 +106,7 @@ def distributed_simulations( return list(results_rdd.collect()) + class ExecutionContext: def __init__(self, context: str = ExecutionMode.multi_proc) -> None: self.name = context @@ -160,8 +162,6 @@ class Executor: config_idx += 1 - final_result = None - if self.exec_context == ExecutionMode.single_proc: tensor_field = create_tensor_field(partial_state_updates.pop(), eps.pop()) result = self.exec_method( @@ -169,22 +169,20 @@ class Executor: userIDs, sessionIDs, simulationIDs, runIDs ) final_result = result, tensor_field - elif self.exec_context == ExecutionMode.multi_proc: - # if len(self.configs) > 1: - simulations = self.exec_method( - simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, - userIDs, sessionIDs, simulationIDs, runIDs - ) - results = [] - for result, partial_state_updates, ep in list(zip(simulations, partial_state_updates, eps)): - results.append((flatten(result), create_tensor_field(partial_state_updates, ep))) - final_result = results - elif self.exec_context == ExecutionMode.dist_proc: - # if len(self.configs) > 1: - simulations = self.exec_method( - simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, - userIDs, sessionIDs, simulationIDs, runIDs, self.sc - ) + else: + if self.exec_context == ExecutionMode.multi_proc: + # if len(self.configs) > 1: + simulations = self.exec_method( + simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, + userIDs, sessionIDs, simulationIDs, runIDs + ) + + elif self.exec_context == ExecutionMode.dist_proc: + simulations = self.exec_method( + simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, + userIDs, sessionIDs, simulationIDs, runIDs, self.sc + ) + results = [] for result, partial_state_updates, ep in list(zip(simulations, partial_state_updates, eps)): results.append((flatten(result), create_tensor_field(partial_state_updates, ep))) diff --git a/distributed_produce/examples/event_bench/consumer.py b/distributed_produce/examples/event_bench/consumer.py index 9c65289..d6718b8 100644 --- a/distributed_produce/examples/event_bench/consumer.py +++ b/distributed_produce/examples/event_bench/consumer.py @@ -1,7 +1,6 @@ from kafka import KafkaConsumer from datetime import datetime - consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092']) i = 1 diff --git a/distributed_produce/examples/event_bench/spark/session/__init__.py b/distributed_produce/examples/event_bench/spark/session/__init__.py index 98359b1..3bf3498 100644 --- a/distributed_produce/examples/event_bench/spark/session/__init__.py +++ b/distributed_produce/examples/event_bench/spark/session/__init__.py @@ -1,9 +1,10 @@ from pyspark.sql import SparkSession from pyspark.context import SparkContext -import os +# import os # os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3' # os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3' +# os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES' spark = SparkSession\ .builder\ diff --git a/simulations/distributed/config1.py b/simulations/distributed/config1.py new file mode 100644 index 0000000..1e32d9f --- /dev/null +++ b/simulations/distributed/config1.py @@ -0,0 +1,138 @@ +import numpy as np +from datetime import timedelta + +from cadCAD.configuration import append_configs +from cadCAD.configuration.utils import bound_norm_random, config_sim, time_step, env_trigger + +seeds = { + 'z': np.random.RandomState(1), + 'a': np.random.RandomState(2), + 'b': np.random.RandomState(3), + 'c': np.random.RandomState(4) +} + + +# Policies per Mechanism +def p1m1(_g, step, sL, s): + return {'param1': 1} +def p2m1(_g, step, sL, s): + return {'param1': 1, 'param2': 4} + +def p1m2(_g, step, sL, s): + return {'param1': 'a', 'param2': 2} +def p2m2(_g, step, sL, s): + return {'param1': 'b', 'param2': 4} + +def p1m3(_g, step, sL, s): + return {'param1': ['c'], 'param2': np.array([10, 100])} +def p2m3(_g, step, sL, s): + return {'param1': ['d'], 'param2': np.array([20, 200])} + + +def s1m1(_g, step, sL, s, _input): + y = 's1' + x = s['s1'] + 1 + return (y, x) +def s2m1(_g, step, sL, s, _input): + y = 's2' + x = _input['param2'] + return (y, x) + +def s1m2(_g, step, sL, s, _input): + y = 's1' + x = s['s1'] + 1 + return (y, x) +def s2m2(_g, step, sL, s, _input): + y = 's2' + x = _input['param2'] + return (y, x) + +def s1m3(_g, step, sL, s, _input): + y = 's1' + x = s['s1'] + 1 + return (y, x) +def s2m3(_g, step, sL, s, _input): + y = 's2' + x = _input['param2'] + return (y, x) + +def policies(_g, step, sL, s, _input): + y = 'policies' + x = _input + return (y, x) + + +def update_timestamp(_g, step, sL, s, _input): + y = 'timestamp' + return y, time_step(dt_str=s[y], dt_format='%Y-%m-%d %H:%M:%S', _timedelta=timedelta(days=0, minutes=0, seconds=1)) + + +# Genesis States +genesis_states = { + 's1': 0.0, + 's2': 0.0, + 's3': 1.0, + 's4': 1.0, + 'timestamp': '2018-10-01 15:16:24' +} + + +# Environment Process +# ToDo: Depreciation Waring for env_proc_trigger convention +trigger_timestamps = ['2018-10-01 15:16:25', '2018-10-01 15:16:27', '2018-10-01 15:16:29'] +env_processes = { + "s3": [lambda _g, x: 5], + "s4": env_trigger(3)(trigger_field='timestamp', trigger_vals=trigger_timestamps, funct_list=[lambda _g, x: 10]) +} + + +partial_state_update_block = [ + { + "policies": { + "b1": p1m1, + "b2": p2m1 + }, + "variables": { + "s1": s1m1, + "s2": s2m1, + "timestamp": update_timestamp + } + }, + { + "policies": { + "b1": p1m2, + "b2": p2m2 + }, + "variables": { + "s1": s1m2, + "s2": s2m2 + } + }, + { + "policies": { + "b1": p1m3, + "b2": p2m3 + }, + "variables": { + "s1": s1m3, + "s2": s2m3 + } + } +] + + +sim_config = config_sim( + { + "N": 1, + "T": range(5), + } +) + +append_configs( + user_id='user_a', + sim_configs=sim_config, + initial_state=genesis_states, + env_processes=env_processes, + partial_state_update_blocks=partial_state_update_block, + policy_ops=[lambda a, b: a + b] +) \ No newline at end of file