cleanup pt. 1

This commit is contained in:
Joshua E. Jodesty 2019-10-01 08:16:07 -04:00
parent ff44b0bacf
commit 964c0b9123
8 changed files with 55 additions and 67 deletions

View File

@ -114,22 +114,10 @@ def distributed_simulations(
class ExecutionContext:
def __init__(self,
context=ExecutionMode.multi_proc,
# spark_context=None,
# kafka_config=None,
# spark_data_transformation=None,
method=None) -> None:
self.name = context
# self.method = method
# def dist_proc_closure(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
# userIDs, sessionIDs, simulationIDs, runIDs,
# sc=spark_context, kafkaConfig=kafka_config):
# return distributed_simulations(
# simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
# userIDs, sessionIDs, simulationIDs, runIDs,
# spark_context, spark_data_transformation, kafka_config
# )
if context == 'single_proc':
self.method = single_proc_exec
elif context == 'multi_proc':

0
simulations/__init__.py Normal file
View File

View File

View File

@ -0,0 +1 @@
from simulations.distributed.executor.spark.jobs import distributed_simulations

View File

@ -0,0 +1,46 @@
from simulations.distributed.kafkaConfig import config
from simulations.distributed.spark.session import sc
def distributed_simulations(
simulation_execs,
var_dict_list,
states_lists,
configs_structs,
env_processes_list,
Ts,
Ns,
userIDs,
sessionIDs,
simulationIDs,
runIDs,
spark_context=sc,
kafkaConfig=config
):
func_params_zipped = list(
zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list)
)
func_params_kv = [((t[0], t[1], t[2], t[3]), (t[4], t[5], t[6])) for t in func_params_zipped]
def simulate(k, v):
from kafka import KafkaProducer
prod_config = kafkaConfig['producer_config']
kafkaConfig['producer'] = KafkaProducer(**prod_config)
(sim_exec, config, env_procs) = [f[1] for f in func_params_kv if f[0] == k][0]
results = sim_exec(
v['var_dict'], v['states_lists'], config, env_procs, v['Ts'], v['Ns'],
k[0], k[1], k[2], k[3], kafkaConfig
)
return results
val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns))
val_params_kv = [
(
(t[0], t[1], t[2], t[3]),
{'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}
) for t in val_params
]
results_rdd = spark_context.parallelize(val_params_kv).coalesce(35)
return list(results_rdd.map(lambda x: simulate(*x)).collect())

View File

@ -0,0 +1 @@
config = {'send_topic': 'test', 'producer_config': {'bootstrap_servers': 'localhost:9092', 'acks': 'all'}}

View File

@ -18,20 +18,15 @@ from pyspark.context import SparkContext
from kafka import KafkaProducer
from simulations.distributed.executor.spark.jobs import distributed_simulations
from simulations.distributed.spark.session import sc
def count(start, step):
while True:
yield start
start += step
spark = SparkSession\
.builder\
.appName("distroduce")\
.getOrCreate()
sc: SparkContext = spark.sparkContext
print(f"Spark UI: {sc.uiWebUrl}")
print()
# session = enters('room_1', ['A', 'B'])
intitial_conditions = {
'record_creation': datetime.now(),
@ -203,50 +198,7 @@ append_configs(
exec_mode = ExecutionMode()
print("Simulation Execution: Distributed Execution")
kafka_config = {'send_topic': 'test', 'producer_config': {'bootstrap_servers': 'localhost:9092', 'acks': 'all'}}
def distributed_simulations(
simulation_execs,
var_dict_list,
states_lists,
configs_structs,
env_processes_list,
Ts,
Ns,
userIDs,
sessionIDs,
simulationIDs,
runIDs,
sc=sc,
kafkaConfig=kafka_config
):
func_params_zipped = list(
zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list)
)
func_params_kv = [((t[0], t[1], t[2], t[3]), (t[4], t[5], t[6])) for t in func_params_zipped]
def simulate(k, v):
from kafka import KafkaProducer
prod_config = kafkaConfig['producer_config']
kafkaConfig['producer'] = KafkaProducer(**prod_config)
(sim_exec, config, env_procs) = [f[1] for f in func_params_kv if f[0] == k][0]
results = sim_exec(
v['var_dict'], v['states_lists'], config, env_procs, v['Ts'], v['Ns'],
k[0], k[1], k[2], k[3], kafkaConfig
)
return results
val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns))
val_params_kv = [
(
(t[0], t[1], t[2], t[3]),
{'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}
) for t in val_params
]
results_rdd = sc.parallelize(val_params_kv).coalesce(35)
return list(results_rdd.map(lambda x: simulate(*x)).collect())
dist_proc_ctx = ExecutionContext(
context=exec_mode.dist_proc, method=distributed_simulations
@ -257,7 +209,7 @@ i = 0
for raw_result, tensor_field in run.execute():
result = arrange_cols(pd.DataFrame(raw_result), False)[
[
'user_id', 'session_id', 'simulation_id', 'run_id', 'timestep', 'substep',
'run_id', 'timestep', 'substep',
'record_creation', 'total_msg_count', 'total_send_time'
]
]

View File

@ -11,6 +11,6 @@ spark = SparkSession\
.appName("distroduce")\
.getOrCreate()
spark_context: SparkContext = spark.sparkContext
print(f"Spark UI: {spark_context.uiWebUrl}")
sc: SparkContext = spark.sparkContext
print(f"Spark UI: {sc.uiWebUrl}")
print()