From db2bc8c8b8ede9ac5eda322bcf62b14112516d52 Mon Sep 17 00:00:00 2001 From: "Joshua E. Jodesty" Date: Wed, 25 Sep 2019 21:56:39 -0400 Subject: [PATCH] distroduce integration pt 1 in prog --- cadCAD/engine/__init__.py | 108 ++++++++++++++++-- cadCAD/engine/simulation.py | 43 +++++-- .../dist_prod_exec.py | 2 +- simulations/distributed/multi_config_test.py | 36 ++++++ .../spark/session/__init__.py | 0 5 files changed, 166 insertions(+), 23 deletions(-) rename simulations/{test_executions => distributed}/dist_prod_exec.py (91%) create mode 100644 simulations/distributed/multi_config_test.py rename simulations/{test_executions => distributed}/spark/session/__init__.py (100%) diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 803a4eb..3f46a3f 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -1,8 +1,11 @@ from pprint import pprint from typing import Callable, Dict, List, Any, Tuple from pathos.multiprocessing import ProcessingPool as PPool +from pathos.multiprocessing import ThreadPool as TPool from pandas.core.frame import DataFrame from pyspark.context import SparkContext +from pyspark import cloudpickle +import pickle from cadCAD.utils import flatten from cadCAD.configuration import Configuration, Processor @@ -63,7 +66,6 @@ def parallelize_simulations( return results def distributed_simulations( - sc: SparkContext, simulation_execs: List[Callable], var_dict_list: List[VarDictType], states_lists: List[StatesListsType], @@ -74,13 +76,99 @@ def distributed_simulations( userIDs, sessionIDs, simulationIDs, - runIDs: List[int] + runIDs: List[int], + sc: SparkContext = None ): - params = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, - userIDs, sessionIDs, simulationIDs, runIDs)) - pprint(runIDs) - with PPool(len(configs_structs)) as p: - results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10]), params) + # params = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, + # userIDs, sessionIDs, simulationIDs, runIDs)) + # simulation_execs, configs_structs, env_processes_list + + func_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list)) + # val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns)) + # vals_rdd = sc.parallelize(val_params).map( + # lambda t: {'user_id': t[0], 'session_id': t[1], 'simulation_id': t[2], 'run_id': t[3], + # 'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]} + # ) + # vals_rdd.foreach(print) + + # func_params_dicts = dict(map( + # lambda t: {'user_id': t[0], 'session_id': t[1], 'simulation_id': t[2], 'run_id': t[3], + # 'executions': {'sim_exec': t[4], 'configs': t[5], 'env_proc': t[6]}}, + # func_params + # )) + + + + # func_params_dicts = dict(list(map( + # lambda t: {'key': (t[0], t[1], t[2], t[3]), + # 'executions': {'sim_exec': t[4], 'configs': t[5], 'env_proc': t[6]}}, + # func_params + # ))) + + func_keys = [(t[0], t[1], t[2], t[3]) for t in func_params] + func_values = [(t[4], t[5], t[6], t[7]) for t in func_params] + + d = {} + for key in func_keys: + d[key] = + + # func_params_dicts = dict([{(t[0], t[1], t[2], t[3]): {'sim_exec': t[4], 'configs': t[5], 'env_proc': t[6]}} for t in func_params]) + + pprint(func_params_dicts) + + # val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns)) + # vals_rdd = sc.parallelize(val_params).map( + # lambda t: {'key': (t[0], t[1], t[2], t[3]), + # 'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]} + # ) + + + # val_params_dicts = list(map( + # lambda t: {'user_id': t[0], 'session_id': t[1], 'simulation_id': t[2], 'run_id': t[3], + # 'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}, + # val_params + # )) + # pprint(val_params_dicts) + # print("Execution: simulation_execs") + # print("Configuation: configs_structs, env_processes_list") + # print() + + exit() + + # Pickle send to worker + # atom = params[0] + # # pprint(atom) + # sim_exec = atom[0] + # configs_struct = atom[3] + # env_processes = atom[4] + # pickled_sim_exec = cloudpickle.dumps(sim_exec) + # pickled_configs = cloudpickle.dumps(configs_struct) + # pickled_env_procs = cloudpickle.dumps(env_processes) + # + # def cucumber(pickled): + # unpickled = pickle.loads(pickled) + # return unpickled + # + # sc.parallelize([pickled_sim_exec]).map(cucumber).collect() + + + + # configs_structs, simulation_execs, env_processes_list + results = [t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10], sc) for t in params] + + # def pickle_magic(cucumber): + # cucumber[] + # cloudpickle.dump() + # cucumber = pickle.loads(pickled) + + + + pickled_params = cloudpickle.dump(params) + results = sc.parallelize(pickled_params).map( + lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10]) + ).collect() + # with PPool(len(configs_structs)) as p: + # results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10], sc), params) return results # _pickle.PicklingError: Can't pickle at 0x1115149e0>: attribute lookup on @@ -168,18 +256,16 @@ class Executor: results = [] for result, partial_state_updates, ep in list(zip(simulations, partial_state_updates, eps)): results.append((flatten(result), create_tensor_field(partial_state_updates, ep))) - final_result = results elif self.exec_context == ExecutionMode.dist_proc: # if len(self.configs) > 1: simulations = self.exec_method( - self.sc, simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, - userIDs, sessionIDs, simulationIDs, runIDs + userIDs, sessionIDs, simulationIDs, runIDs, self.sc ) results = [] for result, partial_state_updates, ep in list(zip(simulations, partial_state_updates, eps)): results.append((flatten(result), create_tensor_field(partial_state_updates, ep))) - final_result = None + final_result = results return final_result diff --git a/cadCAD/engine/simulation.py b/cadCAD/engine/simulation.py index 44e5b40..24949c0 100644 --- a/cadCAD/engine/simulation.py +++ b/cadCAD/engine/simulation.py @@ -1,8 +1,11 @@ +from pprint import pprint from typing import Any, Callable, Dict, List, Tuple from pathos.pools import ThreadPool as TPool from copy import deepcopy from functools import reduce +from pyspark import SparkContext + from cadCAD.engine.utils import engine_exception from cadCAD.utils import flatten @@ -206,7 +209,8 @@ class Executor: user_id, session_id, simulation_id, - run_id + run_id, + sc: SparkContext = None ) -> List[List[Dict[str, Any]]]: def execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run) -> List[Dict[str, Any]]: @@ -226,16 +230,33 @@ class Executor: del states_list_copy return first_timestep_per_run + # + # pprint() + # + # exit() + + pipe_run = flatten( + [execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run) for run in range(runs)] + ) + + return pipe_run + + # ******** Only has one run + # pipe_run: List[List[Dict[str, Any]]] = flatten( + # sc.parallelize(list(range(runs))).map( + # lambda run: execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run) + # ).collect() + # ) # print(type(run_id)) # print(runs) - tp = TPool(runs) - pipe_run: List[List[Dict[str, Any]]] = flatten( - tp.map( - lambda run: execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run), - list(range(runs)) - ) - ) - - tp.clear() - return pipe_run + # tp = TPool(runs) + # pipe_run: List[List[Dict[str, Any]]] = flatten( + # tp.map( + # lambda run: execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run), + # list(range(runs)) + # ) + # ) + # + # tp.clear() + r diff --git a/simulations/test_executions/dist_prod_exec.py b/simulations/distributed/dist_prod_exec.py similarity index 91% rename from simulations/test_executions/dist_prod_exec.py rename to simulations/distributed/dist_prod_exec.py index 548b549..aaca497 100644 --- a/simulations/test_executions/dist_prod_exec.py +++ b/simulations/distributed/dist_prod_exec.py @@ -3,7 +3,7 @@ from pprint import pprint import pandas as pd from tabulate import tabulate -from simulations.test_executions.spark.session import spark_context as sc +from simulations.distributed.spark.session import spark_context as sc from simulations.regression_tests import config1, config2 from cadCAD.engine import ExecutionMode, ExecutionContext, Executor diff --git a/simulations/distributed/multi_config_test.py b/simulations/distributed/multi_config_test.py new file mode 100644 index 0000000..8468102 --- /dev/null +++ b/simulations/distributed/multi_config_test.py @@ -0,0 +1,36 @@ +import pandas as pd +from tabulate import tabulate +# The following imports NEED to be in the exact order +from cadCAD.engine import ExecutionMode, ExecutionContext, Executor +from cadCAD.utils import arrange_cols +from simulations.regression_tests import config1, config2 +from cadCAD import configs + +from simulations.distributed.spark.session import spark + +exec_mode = ExecutionMode() + +print("Simulation Execution: Concurrent Execution") +multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc) +run = Executor(exec_context=multi_proc_ctx, configs=configs) + +# print(configs) +tf = None +i = 0 +config_names = ['config1', 'config2'] +for raw_result, tensor_field in run.execute(): + result = arrange_cols(pd.DataFrame(raw_result), False) + print() + # print(f"Tensor Field: {config_names[i]}") + tf = tensor_field + # print(tabulate(tensor_field, headers='keys', tablefmt='psql')) + print("Output:") + # print(tabulate(result, headers='keys', tablefmt='psql')) + print() + i += 1 + +spark.conf.set("spark.sql.execution.arrow.enabled", "true") + +df = spark.createDataFrame(tf) + +df.show() \ No newline at end of file diff --git a/simulations/test_executions/spark/session/__init__.py b/simulations/distributed/spark/session/__init__.py similarity index 100% rename from simulations/test_executions/spark/session/__init__.py rename to simulations/distributed/spark/session/__init__.py