readme pt. 3: new build

This commit is contained in:
Joshua E. Jodesty 2019-10-03 12:29:34 -04:00
parent 8cf3b09582
commit ea5df50659
5 changed files with 83 additions and 66 deletions

View File

@ -9,20 +9,24 @@
by Joshua E. Jodesty
```
##Description:
## What?: *Description*
***Distributed Produce*** (**[distroduce](distroduce)**) is a message simulation and throughput benchmarking framework /
cadCAD execution mode that leverages Apache Spark and Kafka Producer for optimizing Kafka cluster configurations and
debugging real-time data transformations (i.e. Kafka Streams, Spark (Structured) Streaming). cadCAD's user-defined event
simulation framework is used to simulate messages sent to Kafka clusters.
debugging real-time data transformations. *distroduce* leverages cadCAD's user-defined event simulation template and
framework to simulate messages sent to Kafka clusters. This enables rapid and iterative design, debugging, and message
publish benchmarking of Kafka clusters and real-time data processing using Kafka Streams and Spark (Structured)
Streaming.
##Use Cases:
* **Education:** I wanted to provide a tool for Insight Data Engineering Fellows implementing real-time data processing
that enables rapid and iterative design, debugging, and message publish benchmarking of Kafka clusters and real-time
data processing using Kafka Streams and Spark (Structured) Streaming.
* **IoT:** Competes with *AWS IoT Device Simulator* and *Azure IoT Solution Acceleration: Device Simulation*. Unlike
these products, *Distributed Produce* enables a user-defined state updates and agent actions, as well as message publish
benchmarking
##How?: *A Tail of Two Clusters*
***Distributed Produce*** is a Spark Application used as a cadCAD Execution Mode that distributes Kafka Producers,
message simulation, and message publishing to worker nodes of an EMR cluster. Messages published from these workers are
sent to Kafka topics on a Kafka cluster from a Spark bootstrapped EMR cluster.
##Why?: *Use Case*
* **IoT Event / Device Simulation:** Competes with *AWS IoT Device Simulator* and *Azure IoT Solution Acceleration:
Device Simulation*. Unlike these products, *Distributed Produce* enables a user-defined state updates and agent actions,
as well as message publish benchmarking
* **Development Environment for Real-Time Data Processing / Routing:**
##Get Started:
@ -40,29 +44,32 @@ sh distroduce/configuration/launch_local_kafka.sh
python3 distroduce/local_messaging_sim.py
```
### 1. Write Simulation:
### 1. Write cadCAD Simulation:
* [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation)
* [Tutorials](https://github.com/BlockScience/cadCAD/tree/master/tutorials)
* **Terminology:**
* ***Initial Conditions*** - State Variables and their initial values (Start event of Simulation)
* ***[Policy Functions:](https://github.com/BlockScience/cadCAD/tree/master/documentation#Policy-Functions)*** -
computes one or more signals to be passed to State Update Functions
* ***[State Update Functions]((https://github.com/BlockScience/cadCAD/tree/master/documentation#state-update-functions)):*** -
updates state variables change over time
* ***[Partial State Update Block](https://github.com/BlockScience/cadCAD/tree/master/documentation#State-Variables) (PSUB):*** -
a set of State Update Functions and Policy Functions that update state records
![](https://i.imgur.com/9rlX9TG.png)
**Note:** State Update and Policy Functions now have the additional / undocumented parameter `kafkaConfig`
**a.** **Define Policy Functions:** Two users interacting on separate chat clients and entering / exiting chat rooms
* [Example](distroduce/action_policies.py)
* [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation#Policy-Functions)
**a.** **Define Policy Functions:**
* [Example:](distroduce/action_policies.py) Two users interacting on separate chat clients and entering / exiting chat
**b.** **Define State Update Functions:** Used for logging and maintaining state of user actions defined by policies
* [Example](distroduce/state_updates.py)
* [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-update-functions)
**b.** **Define State Update Functions:**
* [Example:](distroduce/state_updates.py) Used for logging and maintaining state of user actions defined by policies
**c.** **Define Initial Conditions & Partial State Update Block**
* **Initial Conditions** - State Variables and their initial values
* [Example](distroduce/messaging_sim.py)
* [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation#State-Variables)
* **Partial State Update Block (PSUB)** - a set of State Update Functions and Policy Functions that update state records
* [Example](distroduce/simulation.py)
* [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation#Partial-State-Update-Blocks)
![](https://i.imgur.com/9rlX9TG.png)
**c.** **Define Initial Conditions & Partial State Update Block:**
* **Initial Conditions:** [Example](distroduce/messaging_sim.py)
* **Partial State Update Block (PSUB):** [Example](distroduce/simulation.py)
**d.** **Create Simulation Executor:** Used for running a simulation
* [Local](distroduce/local_messaging_sim.py)
@ -71,6 +78,11 @@ python3 distroduce/local_messaging_sim.py
### 2. [Configure EMR Cluster](distroduce/configuration/cluster.py)
### 3. Launch EMR Cluster:
**Option A:** Preconfigured Launch
```bash
python3 distroduce/emr/launch.py
```
**Option B:** Custom Launch - [Example](distroduce/emr/launch.py)
```python
from distroduce.emr.launch import launch_cluster
from distroduce.configuration.cluster import ec2_attributes, bootstrap_actions, instance_groups, configurations

Binary file not shown.

View File

@ -0,0 +1,39 @@
import os, json, boto3
from typing import List
def launch_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations):
def log_uri(name, region):
return f's3n://{name}-{region}/elasticmapreduce/'
os.system(f"""
aws emr create-cluster \
--applications Name=Hadoop Name=Hive Name=Spark \
--ec2-attributes '{json.dumps(ec2_attributes)}' \
--release-label emr-5.26.0 \
--log-uri '{str(log_uri(name, region))}' \
--instance-groups '{json.dumps(instance_groups)}' \
--configurations '{json.dumps(configurations)}' \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--bootstrap-actions '{json.dumps(bootstrap_actions)}' \
--ebs-root-volume-size 10 \
--service-role EMR_DefaultRole \
--enable-debugging \
--name '{name}' \
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
--region {region}
""")
# def benchmark(names: List[str], region, ec2_attributes, bootstrap_actions, instance_groups, configurations):
# current_dir = os.path.dirname(__file__)
# s3 = boto3.client('s3')
# bucket = 'insightde'
#
# file = 'distroduce.sh'
# abs_path = os.path.join(current_dir, file)
# key = f'emr/bootstraps/{file}'
#
# s3.upload_file(abs_path, bucket, key)
# for name in names:
# launch_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)

View File

@ -1,39 +1,5 @@
import os, json, boto3
from typing import List
def launch_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations):
def log_uri(name, region):
return f's3n://{name}-{region}/elasticmapreduce/'
os.system(f"""
aws emr create-cluster \
--applications Name=Hadoop Name=Hive Name=Spark \
--ec2-attributes '{json.dumps(ec2_attributes)}' \
--release-label emr-5.26.0 \
--log-uri '{str(log_uri(name, region))}' \
--instance-groups '{json.dumps(instance_groups)}' \
--configurations '{json.dumps(configurations)}' \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--bootstrap-actions '{json.dumps(bootstrap_actions)}' \
--ebs-root-volume-size 10 \
--service-role EMR_DefaultRole \
--enable-debugging \
--name '{name}' \
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
--region {region}
""")
# def benchmark(names: List[str], region, ec2_attributes, bootstrap_actions, instance_groups, configurations):
# current_dir = os.path.dirname(__file__)
# s3 = boto3.client('s3')
# bucket = 'insightde'
#
# file = 'distroduce.sh'
# abs_path = os.path.join(current_dir, file)
# key = f'emr/bootstraps/{file}'
#
# s3.upload_file(abs_path, bucket, key)
# for name in names:
# launch_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)
from distroduce.emr import launch_cluster
from distroduce.configuration.cluster import ec2_attributes, bootstrap_actions, instance_groups, configurations
region = 'us-east-1'
cluster_name = 'distibuted_produce'
launch_cluster(cluster_name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)