From cef6e652b3991287e94bb94e65217c09b26b715c Mon Sep 17 00:00:00 2001 From: "Joshua E. Jodesty" Date: Thu, 26 Sep 2019 14:16:13 -0400 Subject: [PATCH] integration works knock on wood --- cadCAD/engine/__init__.py | 76 +++---------------- .../event_bench/spark/session/__init__.py | 4 +- .../distributed/spark/session/__init__.py | 5 +- 3 files changed, 16 insertions(+), 69 deletions(-) diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 6093e6f..a41109f 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -86,11 +86,10 @@ def distributed_simulations( zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list) ) - func_params_dict = dict( - [((t[0], t[1], t[2], t[3]), {'sim_exec': t[4], 'config': t[5], 'env_procs': t[6]}) for t in func_params] - ) - - pprint(func_params_dict) + func_params_dict = [ + # ((t[0], t[1], t[2], t[3]), {'sim_exec': t[4], 'config': t[5], 'env_procs': t[6]}) for t in func_params + ((t[0], t[1], t[2], t[3]), (t[4], t[5], t[6])) for t in func_params + ] val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns)) val_params_list = [ @@ -100,73 +99,20 @@ def distributed_simulations( ) for t in val_params ] - # pprint(val_params_dict) def simulate(k, v): - (sim_exec, config, env_procs) = func_params_dict(k) + (sim_exec, config, env_procs) = [f[1] for f in func_params_dict if f[0] == k][0] + print(env_procs) results = sim_exec( v['var_dict'], v['states_lists'], config, env_procs, v['Ts'], v['Ns'], - k['user_id'], k['session_id'], k['sim_id'], k['run_id'] + k[0], k[1], k[2], k[3] ) return results - - vals_rdd = sc.parallelize(val_params_list).map(lambda x: simulate(x[0], x[1])) #.map(lambda x: tuple(x[0].values())) - pprint(vals_rdd.take(1)) - # vals_rdd.foreach(print) - # .map( - # lambda t: {(t[0], t[1], t[2], t[3]): {'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}} - # ) - - # params = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, - # userIDs, sessionIDs, simulationIDs, runIDs)) - # simulation_execs, configs_structs, env_processes_list - - # val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns)) - # vals_rdd = sc.parallelize(val_params).map( - # lambda t: {'user_id': t[0], 'session_id': t[1], 'simulation_id': t[2], 'run_id': t[3], - # 'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]} - # ) - # vals_rdd.foreach(print) - - # func_params_dicts = dict(map( - # lambda t: {'user_id': t[0], 'session_id': t[1], 'simulation_id': t[2], 'run_id': t[3], - # 'executions': {'sim_exec': t[4], 'configs': t[5], 'env_proc': t[6]}}, - # func_params - # )) - - - - # func_params_dicts = dict(list(map( - # lambda t: {'key': (t[0], t[1], t[2], t[3]), - # 'executions': {'sim_exec': t[4], 'configs': t[5], 'env_proc': t[6]}}, - # func_params - # ))) - - # func_keys = [(t[0], t[1], t[2], t[3]) for t in func_params] - # func_values = [(t[4], t[5], t[6], t[7]) for t in func_params] - - - # func_params_dicts = dict([{(t[0], t[1], t[2], t[3]): {'sim_exec': t[4], 'configs': t[5], 'env_proc': t[6]}} for t in func_params]) - - # pprint(func_params_dicts) - - # val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns)) - # vals_rdd = sc.parallelize(val_params).map( - # lambda t: {'key': (t[0], t[1], t[2], t[3]), - # 'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]} - # ) - - - # val_params_dicts = list(map( - # lambda t: {'user_id': t[0], 'session_id': t[1], 'simulation_id': t[2], 'run_id': t[3], - # 'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}, - # val_params - # )) - # pprint(val_params_dicts) - # print("Execution: simulation_execs") - # print("Configuation: configs_structs, env_processes_list") - # print() + # tuple(x[0].values()) + # x[1] + vals_rdd = sc.parallelize(val_params_list).map(lambda x: simulate(tuple(x[0].values()), x[1])) + pprint(vals_rdd.take(3)) exit() diff --git a/distributed_produce/examples/event_bench/spark/session/__init__.py b/distributed_produce/examples/event_bench/spark/session/__init__.py index 9562c2a..98359b1 100644 --- a/distributed_produce/examples/event_bench/spark/session/__init__.py +++ b/distributed_produce/examples/event_bench/spark/session/__init__.py @@ -2,8 +2,8 @@ from pyspark.sql import SparkSession from pyspark.context import SparkContext import os -os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3' -os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3' +# os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3' +# os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3' spark = SparkSession\ .builder\ diff --git a/simulations/distributed/spark/session/__init__.py b/simulations/distributed/spark/session/__init__.py index 0ad2588..7afaf0c 100644 --- a/simulations/distributed/spark/session/__init__.py +++ b/simulations/distributed/spark/session/__init__.py @@ -2,8 +2,9 @@ from pyspark.sql import SparkSession from pyspark.context import SparkContext import os -os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3' -os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3' +# os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3' +# os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3' +# os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES' spark = SparkSession\ .builder\