diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 1510000..925db94 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -112,18 +112,25 @@ def distributed_simulations( class ExecutionContext: - def __init__(self, - context=ExecutionMode.multi_proc, - method=None) -> None: + def __init__(self, context=ExecutionMode.multi_proc, method=None, kafka_config=None) -> None: self.name = context - # self.method = method - if context == 'single_proc': self.method = single_proc_exec elif context == 'multi_proc': self.method = parallelize_simulations elif context == 'dist_proc': - self.method = method + def distroduce_proc( + simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, + userIDs, sessionIDs, simulationIDs, runIDs, + sc, kafkaConfig=kafka_config + ): + return method( + simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, + userIDs, sessionIDs, simulationIDs, runIDs, + sc, kafkaConfig + ) + + self.method = distroduce_proc class Executor: diff --git a/dist/cadCAD-0.0.2-py3-none-any.whl b/dist/cadCAD-0.0.2-py3-none-any.whl index bb931c5..8a48f7c 100644 Binary files a/dist/cadCAD-0.0.2-py3-none-any.whl and b/dist/cadCAD-0.0.2-py3-none-any.whl differ diff --git a/dist/cadCAD-0.0.2.tar.gz b/dist/cadCAD-0.0.2.tar.gz index ee8ebc9..83d3518 100644 Binary files a/dist/cadCAD-0.0.2.tar.gz and b/dist/cadCAD-0.0.2.tar.gz differ diff --git a/simulations/distributed/__init__.py b/distroduce/__init__.py similarity index 100% rename from simulations/distributed/__init__.py rename to distroduce/__init__.py diff --git a/simulations/distributed/policies.py b/distroduce/action_policies.py similarity index 100% rename from simulations/distributed/policies.py rename to distroduce/action_policies.py diff --git a/distroduce/bash/bootstrap.sh b/distroduce/bash/bootstrap.sh new file mode 100644 index 0000000..a6b78a8 --- /dev/null +++ b/distroduce/bash/bootstrap.sh @@ -0,0 +1,81 @@ +#!/bin/bash +# SSH into all machines +ssh -i ~/.ssh/joshua-IAM-keypair.pem hadoop@ +sudo python3 -m pip install --upgrade pip +sudo python3 -m pip install pathos kafka-python + +# SetUp Window: head node +cd ~ +#sudo python3 -m pip install --upgrade pip +#sudo python3 -m pip install pathos kafka-python +sudo sed -i -e '$a\export PYSPARK_PYTHON=/usr/bin/python3' /etc/spark/conf/spark-env.sh +wget http://apache.spinellicreations.com/kafka/2.3.0/kafka_2.12-2.3.0.tgz +tar -xzf kafka_2.12-2.3.0.tgz +cd kafka_2.12-2.3.0 +bin/zookeeper-server-start.sh config/zookeeper.properties & +bin/kafka-server-start.sh config/server.properties & +# get ip +bin/kafka-topics.sh --create --bootstrap-server 10.0.0.9:9092 --replication-factor 1 --partitions 1 --topic test +# bin/kafka-topics.sh --list --bootstrap-server 10.0.0.9:9092 + +# Consume (Window): head node +kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.9:9092 --topic test --from-beginning +# DELETE +# bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test + + +# local +python3 setup.py sdist bdist_wheel +pip3 install dist/*.whl +# SCP: all nodes +scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-18-206-12-181.compute-1.amazonaws.com:/home/hadoop/ +scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-34-239-171-181.compute-1.amazonaws.com:/home/hadoop/ +scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-3-230-154-170.compute-1.amazonaws.com:/home/hadoop/ +scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-18-232-52-219.compute-1.amazonaws.com:/home/hadoop/ +scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-34-231-70-210.compute-1.amazonaws.com:/home/hadoop/ +scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-34-231-243-101.compute-1.amazonaws.com:/home/hadoop/ + + +sudo python3 -m pip install *.whl + + +# SCP head node +# ToDo: zip build for cadCAD OR give py library after whl install +scp -i ~/.ssh/joshua-IAM-keypair.pem distroduce/dist/distroduce.zip hadoop@ec2-18-206-12-181.compute-1.amazonaws.com:/home/hadoop/ +scp -i ~/.ssh/joshua-IAM-keypair.pem distroduce/messaging_sim.py hadoop@ec2-18-206-12-181.compute-1.amazonaws.com:/home/hadoop/ +#scp -i ~/.ssh/joshua-IAM-keypair.pem dist/distroduce.zip hadoop@ec2-18-232-54-233.compute-1.amazonaws.com:/home/hadoop/ +#scp -i ~/.ssh/joshua-IAM-keypair.pem examples/event_bench/main.py hadoop@ec2-18-232-54-233.compute-1.amazonaws.com:/home/hadoop/ + + +# Run Window: Head Node +spark-submit --master yarn messaging_app.py +# spark-submit --master yarn --py-files distroduce.zip main.py + +# Cluster Config +#[ +# { +# "Classification": "spark-env", +# "Configurations": [ +# { +# "Classification": "export", +# "ConfigurationProperties": { +# "PYSPARK_PYTHON": "/usr/bin/python3", +# "PYSPARK_DRIVER_PYTHON": "/usr/bin/python3" +# } +# } +# ] +# }, +# { +# "Classification": "spark-defaults", +# "ConfigurationProperties": { +# "spark.sql.execution.arrow.enabled": "true" +# } +# }, +# { +# "Classification": "spark", +# "Properties": { +# "maximizeResourceAllocation": "true" +# } +# } +#] + diff --git a/distroduce/bash/build_source.sh b/distroduce/bash/build_source.sh new file mode 100644 index 0000000..522bbca --- /dev/null +++ b/distroduce/bash/build_source.sh @@ -0,0 +1,4 @@ +#!/bin/bash +pip3 install -r requirements.txt +#zip -rq dist/distroduce.zip distroduce/ +zip -rq distroduce/dist/distroduce.zip distroduce/ \ No newline at end of file diff --git a/distroduce/bash/run.sh b/distroduce/bash/run.sh new file mode 100644 index 0000000..fee2dd0 --- /dev/null +++ b/distroduce/bash/run.sh @@ -0,0 +1,3 @@ +#!/bin/bash +#spark-submit --master yarn event_bench.py +spark-submit --py-files dist/distroduce.zip examples/event_bench/main.py \ No newline at end of file diff --git a/distroduce/bash/toEC2.sh b/distroduce/bash/toEC2.sh new file mode 100644 index 0000000..da3ca27 --- /dev/null +++ b/distroduce/bash/toEC2.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +scp -i ~/.ssh/joshua-IAM-keypair.pem ~/Projects/event-bench/event_bench.py \ +hadoop@ec2-3-230-158-62.compute-1.amazonaws.com:/home/hadoop/ + diff --git a/distroduce/dist/distroduce.zip b/distroduce/dist/distroduce.zip new file mode 100644 index 0000000..0e42bec Binary files /dev/null and b/distroduce/dist/distroduce.zip differ diff --git a/distroduce/executor/__init__.py b/distroduce/executor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/distroduce/executor/spark/__init__.py b/distroduce/executor/spark/__init__.py new file mode 100644 index 0000000..208b668 --- /dev/null +++ b/distroduce/executor/spark/__init__.py @@ -0,0 +1 @@ +from distroduce.executor.spark.jobs import distributed_produce \ No newline at end of file diff --git a/simulations/distributed/executor/spark/jobs/__init__.py b/distroduce/executor/spark/jobs/__init__.py similarity index 78% rename from simulations/distributed/executor/spark/jobs/__init__.py rename to distroduce/executor/spark/jobs/__init__.py index a3ccdf4..5b442f9 100644 --- a/simulations/distributed/executor/spark/jobs/__init__.py +++ b/distroduce/executor/spark/jobs/__init__.py @@ -1,8 +1,7 @@ -from simulations.distributed.kafkaConfig import config -from simulations.distributed.spark.session import sc +from distroduce.spark.session import sc -def distributed_simulations( +def distributed_produce( simulation_execs, var_dict_list, states_lists, @@ -15,21 +14,20 @@ def distributed_simulations( simulationIDs, runIDs, spark_context=sc, - kafkaConfig=config + kafka_config=None ): - func_params_zipped = list( zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list) ) func_params_kv = [((t[0], t[1], t[2], t[3]), (t[4], t[5], t[6])) for t in func_params_zipped] def simulate(k, v): from kafka import KafkaProducer - prod_config = kafkaConfig['producer_config'] - kafkaConfig['producer'] = KafkaProducer(**prod_config) + prod_config = kafka_config['producer_config'] + kafka_config['producer'] = KafkaProducer(**prod_config) (sim_exec, config, env_procs) = [f[1] for f in func_params_kv if f[0] == k][0] results = sim_exec( v['var_dict'], v['states_lists'], config, env_procs, v['Ts'], v['Ns'], - k[0], k[1], k[2], k[3], kafkaConfig + k[0], k[1], k[2], k[3], kafka_config ) return results diff --git a/distroduce/messaging_sim.py b/distroduce/messaging_sim.py new file mode 100644 index 0000000..b982000 --- /dev/null +++ b/distroduce/messaging_sim.py @@ -0,0 +1,102 @@ +import pandas as pd +from datetime import datetime +from tabulate import tabulate + +from cadCAD import configs +from cadCAD.utils import arrange_cols +from cadCAD.configuration import append_configs +from cadCAD.configuration.utils import config_sim +from cadCAD.engine import ExecutionMode, ExecutionContext, Executor + +from distroduce.spark.session import sc +from distroduce.executor.spark import distributed_produce +from distroduce.action_policies import enter_action, message_actions, exit_action +from distroduce.state_updates import send_message, count_messages, add_send_time, current_time + +# State Update +variables = { + 'client_a': send_message('client_a'), + 'client_b': send_message('client_b'), + 'total_msg_count': count_messages, + 'total_send_time': add_send_time, + 'record_creation': current_time('record_creation') +} + +# Action Policies +policy_group_1 = { + "action_1": enter_action('server', 'room_1', 'A'), + "action_2": enter_action('server', 'room_1', 'B') +} + +policy_group_2 = { + "action_1": message_actions('client_A', 'room_1', "Hi B", 'A', 'B'), + "action_2": message_actions('client_B', 'room_1', "Hi A", 'B', 'A') +} + +policy_group_3 = { + "action_1": message_actions('client_A', 'room_1', "Bye B", 'A', 'B'), + "action_2": message_actions('client_B', 'room_1', "Bye A", 'B', 'A') +} + +policy_group_4 = { + "action_1": exit_action('server', 'room_1', 'A'), + "action_2": exit_action('server', 'room_1', 'B') +} + +policy_groups = [policy_group_1, policy_group_2, policy_group_3, policy_group_4] +sim_composition = [{'policies': policy_group, 'variables': variables} for policy_group in policy_groups] + +if __name__ == "__main__": + print("Distributed Simulation: Chat Clients") + intitial_conditions = { + 'record_creation': datetime.now(), + 'client_a': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0}, + 'client_b': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0}, + 'total_msg_count': 0, + 'total_send_time': 0.000000 + } + exec_mode = ExecutionMode() + + + # N = 5 = 10000 / (500 x 4) + # T = 500 + sim_config = config_sim( + { + "N": 1, + "T": range(10), + # "T": range(5000), + } + ) + + append_configs( + user_id='Joshua', + sim_configs=sim_config, + initial_state=intitial_conditions, + partial_state_update_blocks=sim_composition, + policy_ops=[lambda a, b: a + b] + ) + + kafkaConfig = {'send_topic': 'test', 'producer_config': {'bootstrap_servers': 'localhost:9092', 'acks': 'all'}} + dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc, method=distributed_produce, kafka_config=kafkaConfig) + run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc) + + i = 0 + for raw_result, tensor_field in run.execute(): + result = arrange_cols(pd.DataFrame(raw_result), False)[ + [ + 'run_id', 'timestep', 'substep', + 'record_creation', 'total_msg_count', 'total_send_time' + ] + ] + print() + if i == 0: + print(tabulate(tensor_field, headers='keys', tablefmt='psql')) + last = result.tail(1) + last['msg_per_sec'] = last['total_msg_count']/last['total_send_time'] + print("Output: Head") + print(tabulate(result.head(5), headers='keys', tablefmt='psql')) + print("Output: Tail") + print(tabulate(result.tail(5), headers='keys', tablefmt='psql')) + print(tabulate(last, headers='keys', tablefmt='psql')) + print() + i += 1 diff --git a/distroduce/spark/__init__.py b/distroduce/spark/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/simulations/distributed/spark/session/__init__.py b/distroduce/spark/session/__init__.py similarity index 100% rename from simulations/distributed/spark/session/__init__.py rename to distroduce/spark/session/__init__.py diff --git a/simulations/distributed/state_updates.py b/distroduce/state_updates.py similarity index 100% rename from simulations/distributed/state_updates.py rename to distroduce/state_updates.py diff --git a/simulations/distributed/executor/__init__.py b/simulations/distributed/executor/__init__.py deleted file mode 100644 index 557a929..0000000 --- a/simulations/distributed/executor/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from simulations.distributed.executor.spark.jobs import distributed_simulations \ No newline at end of file diff --git a/simulations/distributed/kafkaConfig/__init__.py b/simulations/distributed/kafkaConfig/__init__.py deleted file mode 100644 index 84c76cd..0000000 --- a/simulations/distributed/kafkaConfig/__init__.py +++ /dev/null @@ -1 +0,0 @@ -config = {'send_topic': 'test', 'producer_config': {'bootstrap_servers': 'localhost:9092', 'acks': 'all'}} \ No newline at end of file diff --git a/simulations/distributed/messaging_app.py b/simulations/distributed/messaging_app.py deleted file mode 100644 index db54a3f..0000000 --- a/simulations/distributed/messaging_app.py +++ /dev/null @@ -1,144 +0,0 @@ -from functools import reduce - -import pandas as pd -from copy import deepcopy -from datetime import datetime - -from tabulate import tabulate - -from cadCAD.configuration import append_configs -from cadCAD.configuration.utils import config_sim - -from cadCAD.engine import ExecutionMode, ExecutionContext, Executor -from cadCAD.utils import arrange_cols -from cadCAD import configs - -from pyspark.sql import SparkSession -from pyspark.context import SparkContext - -from kafka import KafkaProducer - -from simulations.distributed.executor.spark.jobs import distributed_simulations -from simulations.distributed.policies import enter_action, message_actions, exit_action -from simulations.distributed.spark.session import sc -from simulations.distributed.state_updates import send_message, count_messages, add_send_time, current_time - - -def count(start, step): - while True: - yield start - start += step - -# session = enters('room_1', ['A', 'B']) -intitial_conditions = { - 'record_creation': datetime.now(), - 'client_a': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0}, - 'client_b': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0}, - 'total_msg_count': 0, - 'total_send_time': 0.000000 -} - -# State Updates - -sim_composition = [ - { - "policies": { - "b1": enter_action('server', 'room_1', 'A'), - "b2": enter_action('server', 'room_1', 'B') - }, - "variables": { - 'client_a': send_message('client_a'), - 'client_b': send_message('client_b'), - 'total_msg_count': count_messages, - 'total_send_time': add_send_time, - 'record_creation': current_time('record_creation') - } - }, - { - "policies": { - "b1": message_actions('client_A', 'room_1', "Hi B", 'A', 'B'), - "b2": message_actions('client_B', 'room_1', "Hi A", 'B', 'A') - }, - "variables": { - 'client_a': send_message('client_a'), - 'client_b': send_message('client_b'), - 'total_msg_count': count_messages, - 'total_send_time': add_send_time, - 'record_creation': current_time('record_creation') - } - }, - { - "policies": { - "b1": message_actions('client_A', 'room_1', "Bye B", 'A', 'B'), - "b2": message_actions('client_B', 'room_1', "Bye A", 'B', 'A') - }, - "variables": { - 'client_a': send_message('client_a'), - 'client_b': send_message('client_b'), - 'total_msg_count': count_messages, - 'total_send_time': add_send_time, - 'record_creation': current_time('record_creation') - } - }, - { - "policies": { - "b1": exit_action('server', 'room_1', 'A'), - "b2": exit_action('server', 'room_1', 'B') - }, - "variables": { - 'client_a': send_message('client_a'), - 'client_b': send_message('client_b'), - 'total_msg_count': count_messages, - 'total_send_time': add_send_time, - 'record_creation': current_time('record_creation') - } - } -] - -# N = 5 = 10000 / (500 x 4) -# T = 500 -sim_config = config_sim( - { - "N": 1, - "T": range(10), - # "T": range(5000), - } -) - -append_configs( - user_id='user_a', - sim_configs=sim_config, - initial_state=intitial_conditions, - partial_state_update_blocks=sim_composition, - policy_ops=[lambda a, b: a + b] -) - -exec_mode = ExecutionMode() - -print("Simulation Execution: Distributed Execution") - - -dist_proc_ctx = ExecutionContext( - context=exec_mode.dist_proc, method=distributed_simulations -) -run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc) - -i = 0 -for raw_result, tensor_field in run.execute(): - result = arrange_cols(pd.DataFrame(raw_result), False)[ - [ - 'run_id', 'timestep', 'substep', - 'record_creation', 'total_msg_count', 'total_send_time' - ] - ] - print() - if i == 0: - print(tabulate(tensor_field, headers='keys', tablefmt='psql')) - last = result.tail(1) - last['msg_per_sec'] = last['total_msg_count']/last['total_send_time'] - print("Output:") - print(tabulate(result.head(5), headers='keys', tablefmt='psql')) - print(tabulate(result.tail(5), headers='keys', tablefmt='psql')) - print(tabulate(last, headers='keys', tablefmt='psql')) - print() - i += 1 \ No newline at end of file