integration works knock on wood

This commit is contained in:
Joshua E. Jodesty 2019-09-26 14:16:13 -04:00
parent 50f9dd9c40
commit cef6e652b3
3 changed files with 16 additions and 69 deletions

View File

@ -86,11 +86,10 @@ def distributed_simulations(
zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list)
)
func_params_dict = dict(
[((t[0], t[1], t[2], t[3]), {'sim_exec': t[4], 'config': t[5], 'env_procs': t[6]}) for t in func_params]
)
pprint(func_params_dict)
func_params_dict = [
# ((t[0], t[1], t[2], t[3]), {'sim_exec': t[4], 'config': t[5], 'env_procs': t[6]}) for t in func_params
((t[0], t[1], t[2], t[3]), (t[4], t[5], t[6])) for t in func_params
]
val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns))
val_params_list = [
@ -100,73 +99,20 @@ def distributed_simulations(
) for t in val_params
]
# pprint(val_params_dict)
def simulate(k, v):
(sim_exec, config, env_procs) = func_params_dict(k)
(sim_exec, config, env_procs) = [f[1] for f in func_params_dict if f[0] == k][0]
print(env_procs)
results = sim_exec(
v['var_dict'], v['states_lists'], config, env_procs, v['Ts'], v['Ns'],
k['user_id'], k['session_id'], k['sim_id'], k['run_id']
k[0], k[1], k[2], k[3]
)
return results
vals_rdd = sc.parallelize(val_params_list).map(lambda x: simulate(x[0], x[1])) #.map(lambda x: tuple(x[0].values()))
pprint(vals_rdd.take(1))
# vals_rdd.foreach(print)
# .map(
# lambda t: {(t[0], t[1], t[2], t[3]): {'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}}
# )
# params = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
# userIDs, sessionIDs, simulationIDs, runIDs))
# simulation_execs, configs_structs, env_processes_list
# val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns))
# vals_rdd = sc.parallelize(val_params).map(
# lambda t: {'user_id': t[0], 'session_id': t[1], 'simulation_id': t[2], 'run_id': t[3],
# 'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}
# )
# vals_rdd.foreach(print)
# func_params_dicts = dict(map(
# lambda t: {'user_id': t[0], 'session_id': t[1], 'simulation_id': t[2], 'run_id': t[3],
# 'executions': {'sim_exec': t[4], 'configs': t[5], 'env_proc': t[6]}},
# func_params
# ))
# func_params_dicts = dict(list(map(
# lambda t: {'key': (t[0], t[1], t[2], t[3]),
# 'executions': {'sim_exec': t[4], 'configs': t[5], 'env_proc': t[6]}},
# func_params
# )))
# func_keys = [(t[0], t[1], t[2], t[3]) for t in func_params]
# func_values = [(t[4], t[5], t[6], t[7]) for t in func_params]
# func_params_dicts = dict([{(t[0], t[1], t[2], t[3]): {'sim_exec': t[4], 'configs': t[5], 'env_proc': t[6]}} for t in func_params])
# pprint(func_params_dicts)
# val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns))
# vals_rdd = sc.parallelize(val_params).map(
# lambda t: {'key': (t[0], t[1], t[2], t[3]),
# 'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}
# )
# val_params_dicts = list(map(
# lambda t: {'user_id': t[0], 'session_id': t[1], 'simulation_id': t[2], 'run_id': t[3],
# 'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]},
# val_params
# ))
# pprint(val_params_dicts)
# print("Execution: simulation_execs")
# print("Configuation: configs_structs, env_processes_list")
# print()
# tuple(x[0].values())
# x[1]
vals_rdd = sc.parallelize(val_params_list).map(lambda x: simulate(tuple(x[0].values()), x[1]))
pprint(vals_rdd.take(3))
exit()

View File

@ -2,8 +2,8 @@ from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import os
os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3'
# os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3'
# os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3'
spark = SparkSession\
.builder\

View File

@ -2,8 +2,9 @@ from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import os
os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3'
# os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3'
# os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3'
# os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'
spark = SparkSession\
.builder\