7.6 KiB
___ _ __ _ __ __ __
/ _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
/ // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
/____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
/ _ \ ____ ___ ___/ /__ __ ____ ___
/ ___// __// _ \/ _ // // // __// -_)
/_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
by Joshua E. Jodesty
What?: Description
Distributed Produce (distroduce) is a message simulation and throughput benchmarking framework / cadCAD execution mode that leverages Apache Spark and Apache Kafka Producer for optimizing Kafka cluster configurations and debugging real-time data transformations. distroduce leverages cadCAD's user-defined event simulation template and framework to simulate messages sent to Kafka clusters. This enables rapid and iterative design, debugging, and message publish benchmarking of Kafka clusters and real-time data processing using Kafka Streams and Spark (Structured) Streaming.
##How?: A Tail of Two Clusters Distributed Produce is a Spark Application used as a cadCAD Execution Mode that distributes Kafka Producers, message simulation, and message publishing to worker nodes of an EMR cluster. Messages published from these workers are sent to Kafka topics on a Kafka cluster from a Spark bootstrapped EMR cluster.
##Why?: Use Case
- IoT Event / Device Simulation: Competes with AWS IoT Device Simulator and Azure IoT Solution Acceleration: Device Simulation. Unlike these products, Distributed Produce enables a user-defined state updates and agent actions, as well as message publish benchmarking
- Development Environment for Real-Time Data Processing / Routing:
##Get Started:
0. Set Up Local Development Environment: see Kafka Quickstart
a. Install pyspark
pip3 install pyspark
b. Install & Unzip Kafka, Create Kafka test topic, and Start Consumer
sh distroduce/configuration/launch_local_kafka.sh
c. Run Simulation locally
zip -rq distroduce/dist/distroduce.zip distroduce/
spark-submit --py-files distroduce/dist/distroduce.zip distroduce/local_messaging_sim.py `hostname | xargs`
1. Write cadCAD Simulation:
- Simulation Description: To demonstration of Distributed Produce, I implemented a simulation of two users interacting over a messaging service.
- Resources
- Terminology:
-
Initial Conditions - State Variables and their initial values (Start event of Simulation)
initial_conditions = { 'state_variable_1': 0, 'state_variable_2': 0, 'state_variable_3': 1.5, 'timestamp': '2019-01-01 00:00:00' } -
Policy Functions: - computes one or more signals to be passed to State Update Functions
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig): ... return 'state_variable_name', new_valueParameters:
- _params :
dict- System parameters - substep :
int- Current substep - sH :
list[list[dict]]- Historical values of all state variables for the simulation. See Historical State Access for details - s :
dict- Current state of the system, where thedict_keysare the names of the state variables and thedict_valuesare their current values. - kafkaConfig:
kafka.KafkaProducer- Configuration forkafka-pythonProducer
- _params :
-
State Update Functions: - updates state variables change over time
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig): ... return 'state_variable_name', new_valueParameters:
- _params :
dict- System parameters - substep :
int- Current substep - sH :
list[list[dict]]- Historical values of all state variables for the simulation. See Historical State Access for details - s :
dict- Current state of the system, where thedict_keysare the names of the state variables and thedict_valuesare their current values. - actions :
dict- Aggregation of the signals of all policy functions in the current - kafkaConfig:
kafka.KafkaProducer- Configuration forkafka-pythonProducer
- _params :
-
Partial State Update Block (PSUB): - a set of State Update Functions and Policy Functions that update state records

-
Note: State Update and Policy Functions now have the additional / undocumented parameter kafkaConfig
a. Define Policy Functions:
- Example: Two users interacting on separate chat clients and entering / exiting chat
b. Define State Update Functions:
- Example: Used for logging and maintaining state of user actions defined by policies
c. Define Initial Conditions & Partial State Update Block:
d. Create Simulation Executor: Used for running a simulation
2. Configure EMR Cluster
3. Launch EMR Cluster:
Option A: Preconfigured Launch
python3 distroduce/emr/launch.py
Option B: Custom Launch - Example
from distroduce.emr.launch import launch_cluster
from distroduce.configuration.cluster import ec2_attributes, bootstrap_actions, instance_groups, configurations
region = 'us-east-1'
cluster_name = 'distibuted_produce'
launch_cluster(cluster_name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)
4. Execute Benchmark(s):
- Step 1: ssh unto master node
- Step 2: Spark Submit
spark-submit --master yarn --py-files distroduce.zip messaging_sim.py `hostname | xargs`