readme pt. 2: new build

This commit is contained in:
Joshua E. Jodesty 2019-10-03 11:28:58 -04:00
parent 0a9b980ad7
commit 8cf3b09582
11 changed files with 276 additions and 63 deletions

3
.gitignore vendored
View File

@ -30,4 +30,5 @@ Simulation.md
monkeytype.sqlite3
distributed_produce/bash/
distributed_produce/notes.txt
distributed_produce/notes.txt
notes.txt

View File

@ -0,0 +1,88 @@
```
___ _ __ _ __ __ __
/ _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
/ // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
/____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
/ _ \ ____ ___ ___/ /__ __ ____ ___
/ ___// __// _ \/ _ // // // __// -_)
/_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
by Joshua E. Jodesty
```
##Description:
***Distributed Produce*** (**[distroduce](distroduce)**) is a message simulation and throughput benchmarking framework /
cadCAD execution mode that leverages Apache Spark and Kafka Producer for optimizing Kafka cluster configurations and
debugging real-time data transformations (i.e. Kafka Streams, Spark (Structured) Streaming). cadCAD's user-defined event
simulation framework is used to simulate messages sent to Kafka clusters.
##Use Cases:
* **Education:** I wanted to provide a tool for Insight Data Engineering Fellows implementing real-time data processing
that enables rapid and iterative design, debugging, and message publish benchmarking of Kafka clusters and real-time
data processing using Kafka Streams and Spark (Structured) Streaming.
* **IoT:** Competes with *AWS IoT Device Simulator* and *Azure IoT Solution Acceleration: Device Simulation*. Unlike
these products, *Distributed Produce* enables a user-defined state updates and agent actions, as well as message publish
benchmarking
##Get Started:
### 0. Set Up Local Development Environment: see [Kafka Quickstart](https://kafka.apache.org/quickstart)
**a.** Install `pyspark`
```bash
pip3 install pyspark
```
**b.** Install & Unzip Kafka, Create Kafka `test` topic, and Start Consumer
```bash
sh distroduce/configuration/launch_local_kafka.sh
```
**c.** Run Simulation locally
```bash
python3 distroduce/local_messaging_sim.py
```
### 1. Write Simulation:
* [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation)
* [Tutorials](https://github.com/BlockScience/cadCAD/tree/master/tutorials)
**Note:** State Update and Policy Functions now have the additional / undocumented parameter `kafkaConfig`
**a.** **Define Policy Functions:** Two users interacting on separate chat clients and entering / exiting chat rooms
* [Example](distroduce/action_policies.py)
* [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation#Policy-Functions)
**b.** **Define State Update Functions:** Used for logging and maintaining state of user actions defined by policies
* [Example](distroduce/state_updates.py)
* [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-update-functions)
**c.** **Define Initial Conditions & Partial State Update Block**
* **Initial Conditions** - State Variables and their initial values
* [Example](distroduce/messaging_sim.py)
* [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation#State-Variables)
* **Partial State Update Block (PSUB)** - a set of State Update Functions and Policy Functions that update state records
* [Example](distroduce/simulation.py)
* [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation#Partial-State-Update-Blocks)
![](https://i.imgur.com/9rlX9TG.png)
**d.** **Create Simulation Executor:** Used for running a simulation
* [Local](distroduce/local_messaging_sim.py)
* [EMR](distroduce/messaging_sim.py)
### 2. [Configure EMR Cluster](distroduce/configuration/cluster.py)
### 3. Launch EMR Cluster:
```python
from distroduce.emr.launch import launch_cluster
from distroduce.configuration.cluster import ec2_attributes, bootstrap_actions, instance_groups, configurations
region = 'us-east-1'
cluster_name = 'distibuted_produce'
launch_cluster(cluster_name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)
```
### 4. Execute Benchmark(s):
* **Step 1:** ssh unto master node
* **Step 2:** Spark Submit
```
PRIVATE_IP=`hostname -I | xargs`
spark-submit --master yarn --py-files distroduce.zip messaging_sim.py $PRIVATE_IP
```

View File

@ -1,7 +1,14 @@
from datetime import datetime
from kafka import KafkaProducer
# Actions
'''
Function that produces a single actions taken by a user and its accompanying message.
The results will be combined with a user-defined aggregation function (UDAF)
given to the `policy_ops` argument of `cadCAD.configuration.append_configs`.
Current UDAF: `policy_ops=[lambda a, b: a + b]`
'''
def messages(client_id, room, action, _input, sender, receiver=None):
return {
'types': [action],
@ -10,12 +17,15 @@ def messages(client_id, room, action, _input, sender, receiver=None):
'client': client_id, 'room': room, 'action': action,
'sender': sender, 'receiver': receiver,
'input': _input,
'creatred': datetime.now()
'created': datetime.now()
}
]
}
'''
Policy representing a user entering a chat room and its accompanying message
'''
def enter_action(state, room, user):
def f(_g, step, sL, s, kafkaConfig):
msgs = messages(state, room, 'enter', f"{user} enters {room}", user)
@ -24,8 +34,16 @@ def enter_action(state, room, user):
return msgs
return f
def message_actions(state, room, _input, sender, receiver):
msgs = messages(state, room, 'send', _input, sender, receiver)
'''
Policy representing a user sending a message to a receiver within a chat room
and its accompanying message
A Kafka Producer is used to send messages to a Kafka cluster.
The configuration of the Kafka Producer via `cadCAD.engine.ExecutionContext`
'''
def message_actions(state, room, message_input, sender, receiver):
msgs = messages(state, room, 'send', message_input, sender, receiver)
msgs_list = msgs['messages']
def send_action(_g, step, sL, s, kafkaConfig):
start_time = datetime.now()
@ -40,6 +58,10 @@ def message_actions(state, room, _input, sender, receiver):
return send_action
'''
Policy representing a user exiting a chat room and its accompanying message
'''
def exit_action(state, room, user):
def f(_g, step, sL, s, kafkaConfig):
msgs = messages(state, room, 'exit', f"{user} exited {room}", user)

View File

@ -1,6 +1,3 @@
import os, json, boto3
from typing import List
ec2_attributes = {
"KeyName":"joshua-IAM-keypair",
"InstanceProfile":"EMR_EC2_DefaultRole",
@ -80,46 +77,3 @@ configurations = [
}
}
]
def create_distroduce_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations):
def log_uri(name, region):
return f's3n://{name}-{region}/elasticmapreduce/'
os.system(f"""
aws emr create-cluster \
--applications Name=Hadoop Name=Hive Name=Spark \
--ec2-attributes '{json.dumps(ec2_attributes)}' \
--release-label emr-5.26.0 \
--log-uri '{str(log_uri(name, region))}' \
--instance-groups '{json.dumps(instance_groups)}' \
--configurations '{json.dumps(configurations)}' \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--bootstrap-actions '{json.dumps(bootstrap_actions)}' \
--ebs-root-volume-size 10 \
--service-role EMR_DefaultRole \
--enable-debugging \
--name '{name}' \
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
--region {region}
""")
def benchmark(names: List[str], region, ec2_attributes, bootstrap_actions, instance_groups, configurations):
current_dir = os.path.dirname(__file__)
s3 = boto3.client('s3')
bucket = 'insightde'
file = 'distroduce.sh'
abs_path = os.path.join(current_dir, file)
key = f'emr/bootstraps/{file}'
s3.upload_file(abs_path, bucket, key)
for name in names:
create_distroduce_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)
name = 'distibuted_produce'
region = 'us-east-1'
benchmark([name], region, ec2_attributes, bootstrap_actions, instance_groups, configurations)

View File

@ -1,4 +1,8 @@
#!/bin/bash
wget http://apache.spinellicreations.com/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -xzf kafka_2.12-2.3.0.tgz
cd kafka_2.12-2.3.0
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Binary file not shown.

View File

@ -0,0 +1,39 @@
import os, json, boto3
from typing import List
def launch_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations):
def log_uri(name, region):
return f's3n://{name}-{region}/elasticmapreduce/'
os.system(f"""
aws emr create-cluster \
--applications Name=Hadoop Name=Hive Name=Spark \
--ec2-attributes '{json.dumps(ec2_attributes)}' \
--release-label emr-5.26.0 \
--log-uri '{str(log_uri(name, region))}' \
--instance-groups '{json.dumps(instance_groups)}' \
--configurations '{json.dumps(configurations)}' \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--bootstrap-actions '{json.dumps(bootstrap_actions)}' \
--ebs-root-volume-size 10 \
--service-role EMR_DefaultRole \
--enable-debugging \
--name '{name}' \
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
--region {region}
""")
# def benchmark(names: List[str], region, ec2_attributes, bootstrap_actions, instance_groups, configurations):
# current_dir = os.path.dirname(__file__)
# s3 = boto3.client('s3')
# bucket = 'insightde'
#
# file = 'distroduce.sh'
# abs_path = os.path.join(current_dir, file)
# key = f'emr/bootstraps/{file}'
#
# s3.upload_file(abs_path, bucket, key)
# for name in names:
# launch_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)

View File

@ -9,7 +9,8 @@ from distroduce.spark.session import sc
from distroduce.executor.spark import distributed_produce
if __name__ == "__main__":
intitial_conditions = {
# Initial States
initial_conditions = {
'record_creation': datetime.now(),
'client_a': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0},
'client_b': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0},
@ -17,20 +18,21 @@ if __name__ == "__main__":
'total_send_time': 0.000000
}
# N = 5 = 10000 / (500 x 4)
# T = 500
'''
Simulation Configuration:
N = Simulation Runs
T = Timesteps for each Partial State Update Block
'''
sim_config = config_sim(
{
"N": 1,
"T": range(10),
# "T": range(5000),
}
)
exec_mode = ExecutionMode()
kafkaConfig = {'send_topic': 'test', 'producer_config': {'bootstrap_servers': f'localhost:9092', 'acks': 'all'}}
# kafkaConfig = {'send_topic': 'test', 'producer_config': {'bootstrap_servers': f'{sys.argv[1]}:9092', 'acks': 'all'}}
dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc, method=distributed_produce, kafka_config=kafkaConfig)
run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc)
main(run, sim_config, intitial_conditions, sim_composition)
main(run, sim_config, initial_conditions, sim_composition)

View File

@ -9,7 +9,8 @@ from distroduce.spark.session import sc
from distroduce.executor.spark import distributed_produce
if __name__ == "__main__":
intitial_conditions = {
# Initial States
initial_conditions = {
'record_creation': datetime.now(),
'client_a': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0},
'client_b': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0},
@ -17,6 +18,11 @@ if __name__ == "__main__":
'total_send_time': 0.000000
}
'''
Simulation Configuration:
N = Simulation Runs
T = Timesteps for each Partial State Update Block
'''
sim_config = config_sim(
{
"N": 1,
@ -29,4 +35,4 @@ if __name__ == "__main__":
dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc, method=distributed_produce, kafka_config=kafkaConfig)
run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc)
main(run, sim_config, intitial_conditions, sim_composition)
main(run, sim_config, initial_conditions, sim_composition)

View File

@ -0,0 +1,77 @@
import sys
from pprint import pprint
import pandas as pd
from tabulate import tabulate
from cadCAD.utils import arrange_cols
from cadCAD.configuration import append_configs
from distroduce.action_policies import enter_action, message_actions, exit_action
from distroduce.state_updates import send_message, count_messages, add_send_time, current_time
# State Updates
variables = {
'client_a': send_message('client_a'),
'client_b': send_message('client_b'),
'total_msg_count': count_messages,
'total_send_time': add_send_time,
'record_creation': current_time('record_creation')
}
# Action Policies
policy_group_1 = {
"action_1": enter_action('server', 'room_1', 'A'),
"action_2": enter_action('server', 'room_1', 'B')
}
policy_group_2 = {
"action_1": message_actions('client_A', 'room_1', "Hi B", 'A', 'B'),
"action_2": message_actions('client_B', 'room_1', "Hi A", 'B', 'A')
}
policy_group_3 = {
"action_1": message_actions('client_A', 'room_1', "Bye B", 'A', 'B'),
"action_2": message_actions('client_B', 'room_1', "Bye A", 'B', 'A')
}
policy_group_4 = {
"action_1": exit_action('server', 'room_1', 'A'),
"action_2": exit_action('server', 'room_1', 'B')
}
policy_groups = [policy_group_1, policy_group_2, policy_group_3, policy_group_4]
# Partial State update Block
sim_composition = [{'policies': policy_group, 'variables': variables} for policy_group in policy_groups]
def main(executor, sim_config, intitial_conditions, sim_composition):
append_configs(
user_id='Joshua',
sim_configs=sim_config,
initial_state=intitial_conditions,
partial_state_update_blocks=sim_composition,
policy_ops=[lambda a, b: a + b]
)
pprint(sim_composition)
i = 0
for raw_result, tensor_field in executor.execute():
result = arrange_cols(pd.DataFrame(raw_result), False)[
[
'run_id', 'timestep', 'substep',
'record_creation', 'total_msg_count', 'total_send_time'
]
]
print()
if i == 0:
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
last = result.tail(1)
last['msg_per_sec'] = last['total_msg_count'] / last['total_send_time']
print("Output: Head")
print(tabulate(result.head(5), headers='keys', tablefmt='psql'))
print("Output: Tail")
print(tabulate(result.tail(5), headers='keys', tablefmt='psql'))
print(tabulate(last, headers='keys', tablefmt='psql'))
print()
i += 1

View File

@ -2,7 +2,12 @@ from copy import deepcopy
from datetime import datetime
from functools import reduce
# State Updates
add = lambda a, b: a + b
'''
Function that maintains the state of users in a chat room
'''
def update_users(users, actions, action_types=['send','enter','exit']):
users = deepcopy(users)
for action_type in action_types:
@ -18,13 +23,17 @@ def update_users(users, actions, action_types=['send','enter','exit']):
users.remove(user) # remove_exited
return users
add = lambda a, b: a + b
'''
State Update used to counts sent messages
'''
def count_messages(_g, step, sL, s, actions, kafkaConfig):
return 'total_msg_count', s['total_msg_count'] + reduce(add, actions['msg_counts'])
def add_send_time(_g, step, sL, s, actions, kafkaConfig):
return 'total_send_time', s['total_send_time'] + reduce(add, actions['send_times'])
'''
State Update used to create to log message events
'''
def send_message(state):
return lambda _g, step, sL, s, actions, kafkaConfig: (
state,
@ -36,5 +45,16 @@ def send_message(state):
}
)
'''
State Update used to sum the time taken for the Kafka Producer to send messages between users
'''
def add_send_time(_g, step, sL, s, actions, kafkaConfig):
return 'total_send_time', s['total_send_time'] + reduce(add, actions['send_times'])
'''
State Update used to record the event record creation time
'''
def current_time(state):
return lambda _g, step, sL, s, actions, kafkaConfig: (state, datetime.now())