diff --git a/distroduce/README.md b/distroduce/README.md index a057d01..5d5b3ab 100644 --- a/distroduce/README.md +++ b/distroduce/README.md @@ -9,20 +9,24 @@ by Joshua E. Jodesty ``` - -##Description: +## What?: *Description* ***Distributed Produce*** (**[distroduce](distroduce)**) is a message simulation and throughput benchmarking framework / cadCAD execution mode that leverages Apache Spark and Kafka Producer for optimizing Kafka cluster configurations and -debugging real-time data transformations (i.e. Kafka Streams, Spark (Structured) Streaming). cadCAD's user-defined event -simulation framework is used to simulate messages sent to Kafka clusters. +debugging real-time data transformations. *distroduce* leverages cadCAD's user-defined event simulation template and +framework to simulate messages sent to Kafka clusters. This enables rapid and iterative design, debugging, and message +publish benchmarking of Kafka clusters and real-time data processing using Kafka Streams and Spark (Structured) +Streaming. -##Use Cases: -* **Education:** I wanted to provide a tool for Insight Data Engineering Fellows implementing real-time data processing -that enables rapid and iterative design, debugging, and message publish benchmarking of Kafka clusters and real-time -data processing using Kafka Streams and Spark (Structured) Streaming. -* **IoT:** Competes with *AWS IoT Device Simulator* and *Azure IoT Solution Acceleration: Device Simulation*. Unlike -these products, *Distributed Produce* enables a user-defined state updates and agent actions, as well as message publish -benchmarking +##How?: *A Tail of Two Clusters* +***Distributed Produce*** is a Spark Application used as a cadCAD Execution Mode that distributes Kafka Producers, +message simulation, and message publishing to worker nodes of an EMR cluster. Messages published from these workers are +sent to Kafka topics on a Kafka cluster from a Spark bootstrapped EMR cluster. + +##Why?: *Use Case* +* **IoT Event / Device Simulation:** Competes with *AWS IoT Device Simulator* and *Azure IoT Solution Acceleration: +Device Simulation*. Unlike these products, *Distributed Produce* enables a user-defined state updates and agent actions, +as well as message publish benchmarking +* **Development Environment for Real-Time Data Processing / Routing:** ##Get Started: @@ -40,29 +44,32 @@ sh distroduce/configuration/launch_local_kafka.sh python3 distroduce/local_messaging_sim.py ``` -### 1. Write Simulation: +### 1. Write cadCAD Simulation: * [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation) * [Tutorials](https://github.com/BlockScience/cadCAD/tree/master/tutorials) +* **Terminology:** + * ***Initial Conditions*** - State Variables and their initial values (Start event of Simulation) + * ***[Policy Functions:](https://github.com/BlockScience/cadCAD/tree/master/documentation#Policy-Functions)*** - + computes one or more signals to be passed to State Update Functions + * ***[State Update Functions]((https://github.com/BlockScience/cadCAD/tree/master/documentation#state-update-functions)):*** - + updates state variables change over time + * ***[Partial State Update Block](https://github.com/BlockScience/cadCAD/tree/master/documentation#State-Variables) (PSUB):*** - + a set of State Update Functions and Policy Functions that update state records + ![](https://i.imgur.com/9rlX9TG.png) + + **Note:** State Update and Policy Functions now have the additional / undocumented parameter `kafkaConfig` -**a.** **Define Policy Functions:** Two users interacting on separate chat clients and entering / exiting chat rooms -* [Example](distroduce/action_policies.py) -* [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation#Policy-Functions) +**a.** **Define Policy Functions:** +* [Example:](distroduce/action_policies.py) Two users interacting on separate chat clients and entering / exiting chat -**b.** **Define State Update Functions:** Used for logging and maintaining state of user actions defined by policies -* [Example](distroduce/state_updates.py) -* [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-update-functions) +**b.** **Define State Update Functions:** +* [Example:](distroduce/state_updates.py) Used for logging and maintaining state of user actions defined by policies -**c.** **Define Initial Conditions & Partial State Update Block** -* **Initial Conditions** - State Variables and their initial values - * [Example](distroduce/messaging_sim.py) - * [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation#State-Variables) - -* **Partial State Update Block (PSUB)** - a set of State Update Functions and Policy Functions that update state records - * [Example](distroduce/simulation.py) - * [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation#Partial-State-Update-Blocks) - ![](https://i.imgur.com/9rlX9TG.png) +**c.** **Define Initial Conditions & Partial State Update Block:** +* **Initial Conditions:** [Example](distroduce/messaging_sim.py) +* **Partial State Update Block (PSUB):** [Example](distroduce/simulation.py) **d.** **Create Simulation Executor:** Used for running a simulation * [Local](distroduce/local_messaging_sim.py) @@ -71,6 +78,11 @@ python3 distroduce/local_messaging_sim.py ### 2. [Configure EMR Cluster](distroduce/configuration/cluster.py) ### 3. Launch EMR Cluster: +**Option A:** Preconfigured Launch +```bash +python3 distroduce/emr/launch.py +``` +**Option B:** Custom Launch - [Example](distroduce/emr/launch.py) ```python from distroduce.emr.launch import launch_cluster from distroduce.configuration.cluster import ec2_attributes, bootstrap_actions, instance_groups, configurations diff --git a/distroduce/configuration/launch_cluster.sh b/distroduce/configuration/launch_cluster.txt similarity index 100% rename from distroduce/configuration/launch_cluster.sh rename to distroduce/configuration/launch_cluster.txt diff --git a/distroduce/dist/distroduce.zip b/distroduce/dist/distroduce.zip index d571a21..f652340 100644 Binary files a/distroduce/dist/distroduce.zip and b/distroduce/dist/distroduce.zip differ diff --git a/distroduce/emr/__init__.py b/distroduce/emr/__init__.py new file mode 100644 index 0000000..8257b54 --- /dev/null +++ b/distroduce/emr/__init__.py @@ -0,0 +1,39 @@ +import os, json, boto3 +from typing import List + + +def launch_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations): + def log_uri(name, region): + return f's3n://{name}-{region}/elasticmapreduce/' + + os.system(f""" + aws emr create-cluster \ + --applications Name=Hadoop Name=Hive Name=Spark \ + --ec2-attributes '{json.dumps(ec2_attributes)}' \ + --release-label emr-5.26.0 \ + --log-uri '{str(log_uri(name, region))}' \ + --instance-groups '{json.dumps(instance_groups)}' \ + --configurations '{json.dumps(configurations)}' \ + --auto-scaling-role EMR_AutoScaling_DefaultRole \ + --bootstrap-actions '{json.dumps(bootstrap_actions)}' \ + --ebs-root-volume-size 10 \ + --service-role EMR_DefaultRole \ + --enable-debugging \ + --name '{name}' \ + --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \ + --region {region} + """) + + +# def benchmark(names: List[str], region, ec2_attributes, bootstrap_actions, instance_groups, configurations): +# current_dir = os.path.dirname(__file__) +# s3 = boto3.client('s3') +# bucket = 'insightde' +# +# file = 'distroduce.sh' +# abs_path = os.path.join(current_dir, file) +# key = f'emr/bootstraps/{file}' +# +# s3.upload_file(abs_path, bucket, key) +# for name in names: +# launch_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations) \ No newline at end of file diff --git a/distroduce/emr/launch.py b/distroduce/emr/launch.py index 8257b54..5c55fa9 100644 --- a/distroduce/emr/launch.py +++ b/distroduce/emr/launch.py @@ -1,39 +1,5 @@ -import os, json, boto3 -from typing import List - - -def launch_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations): - def log_uri(name, region): - return f's3n://{name}-{region}/elasticmapreduce/' - - os.system(f""" - aws emr create-cluster \ - --applications Name=Hadoop Name=Hive Name=Spark \ - --ec2-attributes '{json.dumps(ec2_attributes)}' \ - --release-label emr-5.26.0 \ - --log-uri '{str(log_uri(name, region))}' \ - --instance-groups '{json.dumps(instance_groups)}' \ - --configurations '{json.dumps(configurations)}' \ - --auto-scaling-role EMR_AutoScaling_DefaultRole \ - --bootstrap-actions '{json.dumps(bootstrap_actions)}' \ - --ebs-root-volume-size 10 \ - --service-role EMR_DefaultRole \ - --enable-debugging \ - --name '{name}' \ - --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \ - --region {region} - """) - - -# def benchmark(names: List[str], region, ec2_attributes, bootstrap_actions, instance_groups, configurations): -# current_dir = os.path.dirname(__file__) -# s3 = boto3.client('s3') -# bucket = 'insightde' -# -# file = 'distroduce.sh' -# abs_path = os.path.join(current_dir, file) -# key = f'emr/bootstraps/{file}' -# -# s3.upload_file(abs_path, bucket, key) -# for name in names: -# launch_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations) \ No newline at end of file +from distroduce.emr import launch_cluster +from distroduce.configuration.cluster import ec2_attributes, bootstrap_actions, instance_groups, configurations +region = 'us-east-1' +cluster_name = 'distibuted_produce' +launch_cluster(cluster_name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations) \ No newline at end of file