From 0a9b980ad75d39eeaf0b4b5a8488e0b0c9d1ab96 Mon Sep 17 00:00:00 2001 From: "Joshua E. Jodesty" Date: Thu, 3 Oct 2019 09:57:41 -0400 Subject: [PATCH] readme pt. 1 --- distroduce/README.md | 0 .../{launch_cluster.py => cluster.py} | 0 .../configuration/launch_local_kafka.sh | 4 + distroduce/emr/launch.py | 0 distroduce/local_messaging_sim.py | 36 +++++++++ distroduce/messaging_sim.py | 79 +------------------ distroduce/simulation.py | 0 7 files changed, 44 insertions(+), 75 deletions(-) create mode 100644 distroduce/README.md rename distroduce/configuration/{launch_cluster.py => cluster.py} (100%) create mode 100644 distroduce/configuration/launch_local_kafka.sh create mode 100644 distroduce/emr/launch.py create mode 100644 distroduce/local_messaging_sim.py create mode 100644 distroduce/simulation.py diff --git a/distroduce/README.md b/distroduce/README.md new file mode 100644 index 0000000..e69de29 diff --git a/distroduce/configuration/launch_cluster.py b/distroduce/configuration/cluster.py similarity index 100% rename from distroduce/configuration/launch_cluster.py rename to distroduce/configuration/cluster.py diff --git a/distroduce/configuration/launch_local_kafka.sh b/distroduce/configuration/launch_local_kafka.sh new file mode 100644 index 0000000..497e88f --- /dev/null +++ b/distroduce/configuration/launch_local_kafka.sh @@ -0,0 +1,4 @@ +#!/bin/bash +bin/zookeeper-server-start.sh config/zookeeper.properties & +bin/kafka-server-start.sh config/server.properties & +bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test diff --git a/distroduce/emr/launch.py b/distroduce/emr/launch.py new file mode 100644 index 0000000..e69de29 diff --git a/distroduce/local_messaging_sim.py b/distroduce/local_messaging_sim.py new file mode 100644 index 0000000..de7bd59 --- /dev/null +++ b/distroduce/local_messaging_sim.py @@ -0,0 +1,36 @@ +from datetime import datetime + +from cadCAD import configs +from cadCAD.configuration.utils import config_sim +from cadCAD.engine import ExecutionMode, ExecutionContext, Executor + +from distroduce.simulation import main, sim_composition +from distroduce.spark.session import sc +from distroduce.executor.spark import distributed_produce + +if __name__ == "__main__": + intitial_conditions = { + 'record_creation': datetime.now(), + 'client_a': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0}, + 'client_b': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0}, + 'total_msg_count': 0, + 'total_send_time': 0.000000 + } + + # N = 5 = 10000 / (500 x 4) + # T = 500 + sim_config = config_sim( + { + "N": 1, + "T": range(10), + # "T": range(5000), + } + ) + + exec_mode = ExecutionMode() + kafkaConfig = {'send_topic': 'test', 'producer_config': {'bootstrap_servers': f'localhost:9092', 'acks': 'all'}} + # kafkaConfig = {'send_topic': 'test', 'producer_config': {'bootstrap_servers': f'{sys.argv[1]}:9092', 'acks': 'all'}} + dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc, method=distributed_produce, kafka_config=kafkaConfig) + run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc) + + main(run, sim_config, intitial_conditions, sim_composition) diff --git a/distroduce/messaging_sim.py b/distroduce/messaging_sim.py index 1040b18..5d80ca7 100644 --- a/distroduce/messaging_sim.py +++ b/distroduce/messaging_sim.py @@ -1,54 +1,14 @@ -import sys -import pandas as pd from datetime import datetime -from tabulate import tabulate from cadCAD import configs -from cadCAD.utils import arrange_cols -from cadCAD.configuration import append_configs from cadCAD.configuration.utils import config_sim from cadCAD.engine import ExecutionMode, ExecutionContext, Executor +from distroduce.simulation import main, sim_composition from distroduce.spark.session import sc from distroduce.executor.spark import distributed_produce -from distroduce.action_policies import enter_action, message_actions, exit_action -from distroduce.state_updates import send_message, count_messages, add_send_time, current_time - -# State Updates -variables = { - 'client_a': send_message('client_a'), - 'client_b': send_message('client_b'), - 'total_msg_count': count_messages, - 'total_send_time': add_send_time, - 'record_creation': current_time('record_creation') -} - -# Action Policies -policy_group_1 = { - "action_1": enter_action('server', 'room_1', 'A'), - "action_2": enter_action('server', 'room_1', 'B') -} - -policy_group_2 = { - "action_1": message_actions('client_A', 'room_1', "Hi B", 'A', 'B'), - "action_2": message_actions('client_B', 'room_1', "Hi A", 'B', 'A') -} - -policy_group_3 = { - "action_1": message_actions('client_A', 'room_1', "Bye B", 'A', 'B'), - "action_2": message_actions('client_B', 'room_1', "Bye A", 'B', 'A') -} - -policy_group_4 = { - "action_1": exit_action('server', 'room_1', 'A'), - "action_2": exit_action('server', 'room_1', 'B') -} - -policy_groups = [policy_group_1, policy_group_2, policy_group_3, policy_group_4] -sim_composition = [{'policies': policy_group, 'variables': variables} for policy_group in policy_groups] if __name__ == "__main__": - print("Distributed Simulation: Chat Clients") intitial_conditions = { 'record_creation': datetime.now(), 'client_a': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0}, @@ -56,48 +16,17 @@ if __name__ == "__main__": 'total_msg_count': 0, 'total_send_time': 0.000000 } - exec_mode = ExecutionMode() - - # N = 5 = 10000 / (500 x 4) - # T = 500 sim_config = config_sim( { "N": 1, - "T": range(10), - # "T": range(5000), + "T": range(5000), } ) - append_configs( - user_id='Joshua', - sim_configs=sim_config, - initial_state=intitial_conditions, - partial_state_update_blocks=sim_composition, - policy_ops=[lambda a, b: a + b] - ) - + exec_mode = ExecutionMode() kafkaConfig = {'send_topic': 'test', 'producer_config': {'bootstrap_servers': f'{sys.argv[1]}:9092', 'acks': 'all'}} dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc, method=distributed_produce, kafka_config=kafkaConfig) run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc) - i = 0 - for raw_result, tensor_field in run.execute(): - result = arrange_cols(pd.DataFrame(raw_result), False)[ - [ - 'run_id', 'timestep', 'substep', - 'record_creation', 'total_msg_count', 'total_send_time' - ] - ] - print() - if i == 0: - print(tabulate(tensor_field, headers='keys', tablefmt='psql')) - last = result.tail(1) - last['msg_per_sec'] = last['total_msg_count']/last['total_send_time'] - print("Output: Head") - print(tabulate(result.head(5), headers='keys', tablefmt='psql')) - print("Output: Tail") - print(tabulate(result.tail(5), headers='keys', tablefmt='psql')) - print(tabulate(last, headers='keys', tablefmt='psql')) - print() - i += 1 + main(run, sim_config, intitial_conditions, sim_composition) diff --git a/distroduce/simulation.py b/distroduce/simulation.py new file mode 100644 index 0000000..e69de29