diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index e06b716..1510000 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -114,22 +114,10 @@ def distributed_simulations( class ExecutionContext: def __init__(self, context=ExecutionMode.multi_proc, - # spark_context=None, - # kafka_config=None, - # spark_data_transformation=None, method=None) -> None: self.name = context # self.method = method - # def dist_proc_closure(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, - # userIDs, sessionIDs, simulationIDs, runIDs, - # sc=spark_context, kafkaConfig=kafka_config): - # return distributed_simulations( - # simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, - # userIDs, sessionIDs, simulationIDs, runIDs, - # spark_context, spark_data_transformation, kafka_config - # ) - if context == 'single_proc': self.method = single_proc_exec elif context == 'multi_proc': diff --git a/simulations/__init__.py b/simulations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/simulations/distributed/__init__.py b/simulations/distributed/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/simulations/distributed/executor/__init__.py b/simulations/distributed/executor/__init__.py new file mode 100644 index 0000000..557a929 --- /dev/null +++ b/simulations/distributed/executor/__init__.py @@ -0,0 +1 @@ +from simulations.distributed.executor.spark.jobs import distributed_simulations \ No newline at end of file diff --git a/simulations/distributed/executor/spark/jobs/__init__.py b/simulations/distributed/executor/spark/jobs/__init__.py new file mode 100644 index 0000000..a3ccdf4 --- /dev/null +++ b/simulations/distributed/executor/spark/jobs/__init__.py @@ -0,0 +1,46 @@ +from simulations.distributed.kafkaConfig import config +from simulations.distributed.spark.session import sc + + +def distributed_simulations( + simulation_execs, + var_dict_list, + states_lists, + configs_structs, + env_processes_list, + Ts, + Ns, + userIDs, + sessionIDs, + simulationIDs, + runIDs, + spark_context=sc, + kafkaConfig=config + ): + + func_params_zipped = list( + zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list) + ) + func_params_kv = [((t[0], t[1], t[2], t[3]), (t[4], t[5], t[6])) for t in func_params_zipped] + def simulate(k, v): + from kafka import KafkaProducer + prod_config = kafkaConfig['producer_config'] + kafkaConfig['producer'] = KafkaProducer(**prod_config) + (sim_exec, config, env_procs) = [f[1] for f in func_params_kv if f[0] == k][0] + results = sim_exec( + v['var_dict'], v['states_lists'], config, env_procs, v['Ts'], v['Ns'], + k[0], k[1], k[2], k[3], kafkaConfig + ) + + return results + + val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns)) + val_params_kv = [ + ( + (t[0], t[1], t[2], t[3]), + {'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]} + ) for t in val_params + ] + results_rdd = spark_context.parallelize(val_params_kv).coalesce(35) + + return list(results_rdd.map(lambda x: simulate(*x)).collect()) diff --git a/simulations/distributed/kafkaConfig/__init__.py b/simulations/distributed/kafkaConfig/__init__.py new file mode 100644 index 0000000..84c76cd --- /dev/null +++ b/simulations/distributed/kafkaConfig/__init__.py @@ -0,0 +1 @@ +config = {'send_topic': 'test', 'producer_config': {'bootstrap_servers': 'localhost:9092', 'acks': 'all'}} \ No newline at end of file diff --git a/simulations/distributed/messaging_app.py b/simulations/distributed/messaging_app.py index fe3618c..5e7cb66 100644 --- a/simulations/distributed/messaging_app.py +++ b/simulations/distributed/messaging_app.py @@ -18,20 +18,15 @@ from pyspark.context import SparkContext from kafka import KafkaProducer +from simulations.distributed.executor.spark.jobs import distributed_simulations +from simulations.distributed.spark.session import sc + + def count(start, step): while True: yield start start += step -spark = SparkSession\ - .builder\ - .appName("distroduce")\ - .getOrCreate() - -sc: SparkContext = spark.sparkContext -print(f"Spark UI: {sc.uiWebUrl}") -print() - # session = enters('room_1', ['A', 'B']) intitial_conditions = { 'record_creation': datetime.now(), @@ -203,50 +198,7 @@ append_configs( exec_mode = ExecutionMode() print("Simulation Execution: Distributed Execution") -kafka_config = {'send_topic': 'test', 'producer_config': {'bootstrap_servers': 'localhost:9092', 'acks': 'all'}} -def distributed_simulations( - simulation_execs, - var_dict_list, - states_lists, - configs_structs, - env_processes_list, - Ts, - Ns, - userIDs, - sessionIDs, - simulationIDs, - runIDs, - sc=sc, - kafkaConfig=kafka_config - ): - - func_params_zipped = list( - zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list) - ) - func_params_kv = [((t[0], t[1], t[2], t[3]), (t[4], t[5], t[6])) for t in func_params_zipped] - def simulate(k, v): - from kafka import KafkaProducer - prod_config = kafkaConfig['producer_config'] - kafkaConfig['producer'] = KafkaProducer(**prod_config) - (sim_exec, config, env_procs) = [f[1] for f in func_params_kv if f[0] == k][0] - results = sim_exec( - v['var_dict'], v['states_lists'], config, env_procs, v['Ts'], v['Ns'], - k[0], k[1], k[2], k[3], kafkaConfig - ) - - return results - - val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns)) - val_params_kv = [ - ( - (t[0], t[1], t[2], t[3]), - {'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]} - ) for t in val_params - ] - results_rdd = sc.parallelize(val_params_kv).coalesce(35) - - return list(results_rdd.map(lambda x: simulate(*x)).collect()) dist_proc_ctx = ExecutionContext( context=exec_mode.dist_proc, method=distributed_simulations @@ -257,7 +209,7 @@ i = 0 for raw_result, tensor_field in run.execute(): result = arrange_cols(pd.DataFrame(raw_result), False)[ [ - 'user_id', 'session_id', 'simulation_id', 'run_id', 'timestep', 'substep', + 'run_id', 'timestep', 'substep', 'record_creation', 'total_msg_count', 'total_send_time' ] ] diff --git a/simulations/distributed/spark/session/__init__.py b/simulations/distributed/spark/session/__init__.py index 7afaf0c..be63d66 100644 --- a/simulations/distributed/spark/session/__init__.py +++ b/simulations/distributed/spark/session/__init__.py @@ -11,6 +11,6 @@ spark = SparkSession\ .appName("distroduce")\ .getOrCreate() -spark_context: SparkContext = spark.sparkContext -print(f"Spark UI: {spark_context.uiWebUrl}") +sc: SparkContext = spark.sparkContext +print(f"Spark UI: {sc.uiWebUrl}") print() \ No newline at end of file