distroduce integration pt 1 in prog

This commit is contained in:
Joshua E. Jodesty 2019-09-25 21:56:39 -04:00
parent 71c334d5f6
commit db2bc8c8b8
5 changed files with 166 additions and 23 deletions

View File

@ -1,8 +1,11 @@
from pprint import pprint from pprint import pprint
from typing import Callable, Dict, List, Any, Tuple from typing import Callable, Dict, List, Any, Tuple
from pathos.multiprocessing import ProcessingPool as PPool from pathos.multiprocessing import ProcessingPool as PPool
from pathos.multiprocessing import ThreadPool as TPool
from pandas.core.frame import DataFrame from pandas.core.frame import DataFrame
from pyspark.context import SparkContext from pyspark.context import SparkContext
from pyspark import cloudpickle
import pickle
from cadCAD.utils import flatten from cadCAD.utils import flatten
from cadCAD.configuration import Configuration, Processor from cadCAD.configuration import Configuration, Processor
@ -63,7 +66,6 @@ def parallelize_simulations(
return results return results
def distributed_simulations( def distributed_simulations(
sc: SparkContext,
simulation_execs: List[Callable], simulation_execs: List[Callable],
var_dict_list: List[VarDictType], var_dict_list: List[VarDictType],
states_lists: List[StatesListsType], states_lists: List[StatesListsType],
@ -74,13 +76,99 @@ def distributed_simulations(
userIDs, userIDs,
sessionIDs, sessionIDs,
simulationIDs, simulationIDs,
runIDs: List[int] runIDs: List[int],
sc: SparkContext = None
): ):
params = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, # params = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs)) # userIDs, sessionIDs, simulationIDs, runIDs))
pprint(runIDs) # simulation_execs, configs_structs, env_processes_list
with PPool(len(configs_structs)) as p:
results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10]), params) func_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list))
# val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns))
# vals_rdd = sc.parallelize(val_params).map(
# lambda t: {'user_id': t[0], 'session_id': t[1], 'simulation_id': t[2], 'run_id': t[3],
# 'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}
# )
# vals_rdd.foreach(print)
# func_params_dicts = dict(map(
# lambda t: {'user_id': t[0], 'session_id': t[1], 'simulation_id': t[2], 'run_id': t[3],
# 'executions': {'sim_exec': t[4], 'configs': t[5], 'env_proc': t[6]}},
# func_params
# ))
# func_params_dicts = dict(list(map(
# lambda t: {'key': (t[0], t[1], t[2], t[3]),
# 'executions': {'sim_exec': t[4], 'configs': t[5], 'env_proc': t[6]}},
# func_params
# )))
func_keys = [(t[0], t[1], t[2], t[3]) for t in func_params]
func_values = [(t[4], t[5], t[6], t[7]) for t in func_params]
d = {}
for key in func_keys:
d[key] =
# func_params_dicts = dict([{(t[0], t[1], t[2], t[3]): {'sim_exec': t[4], 'configs': t[5], 'env_proc': t[6]}} for t in func_params])
pprint(func_params_dicts)
# val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns))
# vals_rdd = sc.parallelize(val_params).map(
# lambda t: {'key': (t[0], t[1], t[2], t[3]),
# 'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}
# )
# val_params_dicts = list(map(
# lambda t: {'user_id': t[0], 'session_id': t[1], 'simulation_id': t[2], 'run_id': t[3],
# 'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]},
# val_params
# ))
# pprint(val_params_dicts)
# print("Execution: simulation_execs")
# print("Configuation: configs_structs, env_processes_list")
# print()
exit()
# Pickle send to worker
# atom = params[0]
# # pprint(atom)
# sim_exec = atom[0]
# configs_struct = atom[3]
# env_processes = atom[4]
# pickled_sim_exec = cloudpickle.dumps(sim_exec)
# pickled_configs = cloudpickle.dumps(configs_struct)
# pickled_env_procs = cloudpickle.dumps(env_processes)
#
# def cucumber(pickled):
# unpickled = pickle.loads(pickled)
# return unpickled
#
# sc.parallelize([pickled_sim_exec]).map(cucumber).collect()
# configs_structs, simulation_execs, env_processes_list
results = [t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10], sc) for t in params]
# def pickle_magic(cucumber):
# cucumber[]
# cloudpickle.dump()
# cucumber = pickle.loads(pickled)
pickled_params = cloudpickle.dump(params)
results = sc.parallelize(pickled_params).map(
lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10])
).collect()
# with PPool(len(configs_structs)) as p:
# results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10], sc), params)
return results return results
# _pickle.PicklingError: Can't pickle <function <lambda> at 0x1115149e0>: attribute lookup <lambda> on # _pickle.PicklingError: Can't pickle <function <lambda> at 0x1115149e0>: attribute lookup <lambda> on
@ -168,18 +256,16 @@ class Executor:
results = [] results = []
for result, partial_state_updates, ep in list(zip(simulations, partial_state_updates, eps)): for result, partial_state_updates, ep in list(zip(simulations, partial_state_updates, eps)):
results.append((flatten(result), create_tensor_field(partial_state_updates, ep))) results.append((flatten(result), create_tensor_field(partial_state_updates, ep)))
final_result = results final_result = results
elif self.exec_context == ExecutionMode.dist_proc: elif self.exec_context == ExecutionMode.dist_proc:
# if len(self.configs) > 1: # if len(self.configs) > 1:
simulations = self.exec_method( simulations = self.exec_method(
self.sc,
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs userIDs, sessionIDs, simulationIDs, runIDs, self.sc
) )
results = [] results = []
for result, partial_state_updates, ep in list(zip(simulations, partial_state_updates, eps)): for result, partial_state_updates, ep in list(zip(simulations, partial_state_updates, eps)):
results.append((flatten(result), create_tensor_field(partial_state_updates, ep))) results.append((flatten(result), create_tensor_field(partial_state_updates, ep)))
final_result = None final_result = results
return final_result return final_result

View File

@ -1,8 +1,11 @@
from pprint import pprint
from typing import Any, Callable, Dict, List, Tuple from typing import Any, Callable, Dict, List, Tuple
from pathos.pools import ThreadPool as TPool from pathos.pools import ThreadPool as TPool
from copy import deepcopy from copy import deepcopy
from functools import reduce from functools import reduce
from pyspark import SparkContext
from cadCAD.engine.utils import engine_exception from cadCAD.engine.utils import engine_exception
from cadCAD.utils import flatten from cadCAD.utils import flatten
@ -206,7 +209,8 @@ class Executor:
user_id, user_id,
session_id, session_id,
simulation_id, simulation_id,
run_id run_id,
sc: SparkContext = None
) -> List[List[Dict[str, Any]]]: ) -> List[List[Dict[str, Any]]]:
def execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run) -> List[Dict[str, Any]]: def execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run) -> List[Dict[str, Any]]:
@ -226,16 +230,33 @@ class Executor:
del states_list_copy del states_list_copy
return first_timestep_per_run return first_timestep_per_run
#
# pprint()
#
# exit()
pipe_run = flatten(
[execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run) for run in range(runs)]
)
return pipe_run
# ******** Only has one run
# pipe_run: List[List[Dict[str, Any]]] = flatten(
# sc.parallelize(list(range(runs))).map(
# lambda run: execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run)
# ).collect()
# )
# print(type(run_id)) # print(type(run_id))
# print(runs) # print(runs)
tp = TPool(runs) # tp = TPool(runs)
pipe_run: List[List[Dict[str, Any]]] = flatten( # pipe_run: List[List[Dict[str, Any]]] = flatten(
tp.map( # tp.map(
lambda run: execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run), # lambda run: execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run),
list(range(runs)) # list(range(runs))
) # )
) # )
#
tp.clear() # tp.clear()
return pipe_run r

View File

@ -3,7 +3,7 @@ from pprint import pprint
import pandas as pd import pandas as pd
from tabulate import tabulate from tabulate import tabulate
from simulations.test_executions.spark.session import spark_context as sc from simulations.distributed.spark.session import spark_context as sc
from simulations.regression_tests import config1, config2 from simulations.regression_tests import config1, config2
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor from cadCAD.engine import ExecutionMode, ExecutionContext, Executor

View File

@ -0,0 +1,36 @@
import pandas as pd
from tabulate import tabulate
# The following imports NEED to be in the exact order
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from cadCAD.utils import arrange_cols
from simulations.regression_tests import config1, config2
from cadCAD import configs
from simulations.distributed.spark.session import spark
exec_mode = ExecutionMode()
print("Simulation Execution: Concurrent Execution")
multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
run = Executor(exec_context=multi_proc_ctx, configs=configs)
# print(configs)
tf = None
i = 0
config_names = ['config1', 'config2']
for raw_result, tensor_field in run.execute():
result = arrange_cols(pd.DataFrame(raw_result), False)
print()
# print(f"Tensor Field: {config_names[i]}")
tf = tensor_field
# print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:")
# print(tabulate(result, headers='keys', tablefmt='psql'))
print()
i += 1
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df = spark.createDataFrame(tf)
df.show()