semi-final to pr

This commit is contained in:
Joshua E. Jodesty 2019-10-03 15:31:31 -04:00
parent 8d5657cb6f
commit c7c68a1abe
6 changed files with 218 additions and 60 deletions

171
README.md
View File

@ -1,50 +1,141 @@
``` ```
__________ ____ ___ _ __ _ __ __ __
________ __ _____/ ____/ | / __ \ / _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
/ ___/ __` / __ / / / /| | / / / / / // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ / /____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
\___/\__,_/\__,_/\____/_/ |_/_____/ / _ \ ____ ___ ___/ /__ __ ____ ___
by BlockScience / ___// __// _ \/ _ // // // __// -_)
====================================== /_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
Complex Adaptive Dynamics by Joshua E. Jodesty
o i e
m d s
p e i
u d g
t n
e
r
``` ```
***cadCAD*** is a Python package that assists in the processes of designing, testing and validating complex systems ## What?: *Description*
through simulation, with support for Monte Carlo methods, A/B testing and parameter sweeping. ***Distributed Produce*** (**[distroduce](distroduce)**) is a message simulation and throughput benchmarking framework /
[cadCAD](https://cadcad.org) execution mode that leverages [Apache Spark](https://spark.apache.org/) and
[Apache Kafka Producer](https://kafka.apache.org/documentation/#producerapi) for optimizing Kafka cluster configurations
and debugging real-time data transformations. *distroduce* leverages cadCAD's user-defined event simulation template and
framework to simulate messages sent to Kafka clusters. This enables rapid and iterative design, debugging, and message
publish benchmarking of Kafka clusters and real-time data processing using Kafka Streams and Spark (Structured)
Streaming.
# Getting Started ##How?: *A Tail of Two Clusters*
## 1. Installation: ***Distributed Produce*** is a Spark Application used as a cadCAD Execution Mode that distributes Kafka Producers,
Requires [Python 3](https://www.python.org/downloads/) message simulation, and message publishing to worker nodes of an EMR cluster. Messages published from these workers are
sent to Kafka topics on a Kafka cluster from a Spark bootstrapped EMR cluster.
**Option A: Install Using [pip](https://pypi.org/project/cadCAD/)** ##Why?: *Use Case*
* **IoT Event / Device Simulation:** Competes with *AWS IoT Device Simulator* and *Azure IoT Solution Acceleration:
Device Simulation*. Unlike these products, *Distributed Produce* enables a user-defined state updates and agent actions,
as well as message publish benchmarking
* **Development Environment for Real-Time Data Processing / Routing:**
##Get Started:
### 0. Set Up Local Development Environment: see [Kafka Quickstart](https://kafka.apache.org/quickstart)
**a.** Install `pyspark`
```bash ```bash
pip3 install cadCAD pip3 install pyspark
```
**b.** Install & Unzip Kafka, Create Kafka `test` topic, and Start Consumer
```bash
sh distroduce/configuration/launch_local_kafka.sh
```
**c.** Run Simulation locally
```bash
zip -rq distroduce/dist/distroduce.zip distroduce/
spark-submit --py-files distroduce/dist/distroduce.zip distroduce/local_messaging_sim.py `hostname | xargs`
``` ```
**Option B:** Build From Source ### 1. Write cadCAD Simulation:
* **Simulation Description:**
To demonstration of *Distributed Produce*, I implemented a simulation of two users interacting over a messaging service.
* **Resources**
* [cadCAD Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation)
* [cadCAD Tutorials](https://github.com/BlockScience/cadCAD/tree/master/tutorials)
* **Terminology:**
* ***[Initial Conditions](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-variables)*** - State Variables and their initial values (Start event of Simulation)
```python
initial_conditions = {
'state_variable_1': 0,
'state_variable_2': 0,
'state_variable_3': 1.5,
'timestamp': '2019-01-01 00:00:00'
}
```
* ***[Policy Functions:](https://github.com/BlockScience/cadCAD/tree/master/documentation#Policy-Functions)*** -
computes one or more signals to be passed to State Update Functions
```python
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig):
...
return 'state_variable_name', new_value
```
Parameters:
* **_params** : `dict` - [System parameters](https://github.com/BlockScience/cadCAD/blob/master/documentation/System_Model_Parameter_Sweep.md)
* **substep** : `int` - Current [substep](https://github.com/BlockScience/cadCAD/tree/master/documentation#Substep)
* **sH** : `list[list[dict]]` - Historical values of all state variables for the simulation. See
[Historical State Access](https://github.com/BlockScience/cadCAD/blob/master/documentation/Historically_State_Access.md) for details
* **s** : `dict` - Current state of the system, where the `dict_keys` are the names of the state variables and the
`dict_values` are their current values.
* **kafkaConfig:** `kafka.KafkaProducer` - Configuration for `kafka-python`
[Producer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
* ***[State Update Functions](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-update-functions):*** -
updates state variables change over time
```python
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig):
...
return 'state_variable_name', new_value
```
Parameters:
* **_params** : `dict` - [System parameters](https://github.com/BlockScience/cadCAD/blob/master/documentation/System_Model_Parameter_Sweep.md)
* **substep** : `int` - Current [substep](https://github.com/BlockScience/cadCAD/tree/master/documentation#Substep)
* **sH** : `list[list[dict]]` - Historical values of all state variables for the simulation. See
[Historical State Access](https://github.com/BlockScience/cadCAD/blob/master/documentation/Historically_State_Access.md) for details
* **s** : `dict` - Current state of the system, where the `dict_keys` are the names of the state variables and the
`dict_values` are their current values.
* **actions** : `dict` - Aggregation of the signals of all policy functions in the current
* **kafkaConfig:** `kafka.KafkaProducer` - Configuration for `kafka-python`
[Producer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
* ***[Partial State Update Block](https://github.com/BlockScience/cadCAD/tree/master/documentation#State-Variables) (PSUB):*** -
a set of State Update Functions and Policy Functions that update state records
![](https://i.imgur.com/9rlX9TG.png)
**Note:** State Update and Policy Functions now have the additional / undocumented parameter `kafkaConfig`
**a.** **Define Policy Functions:**
* [Example:](distroduce/action_policies.py) Two users interacting on separate chat clients and entering / exiting chat
**b.** **Define State Update Functions:**
* [Example:](distroduce/state_updates.py) Used for logging and maintaining state of user actions defined by policies
**c.** **Define Initial Conditions & Partial State Update Block:**
* **Initial Conditions:** [Example](distroduce/messaging_sim.py)
* **Partial State Update Block (PSUB):** [Example](distroduce/simulation.py)
**d.** **Create Simulation Executor:** Used for running a simulation
* [Local](distroduce/local_messaging_sim.py)
* [EMR](distroduce/messaging_sim.py)
### 2. [Configure EMR Cluster](distroduce/configuration/cluster.py)
### 3. Launch EMR Cluster:
**Option A:** Preconfigured Launch
```bash
python3 distroduce/emr/launch.py
``` ```
pip3 install -r requirements.txt **Option B:** Custom Launch - [Example](distroduce/emr/launch.py)
python3 setup.py sdist bdist_wheel ```python
pip3 install dist/*.whl from distroduce.emr.launch import launch_cluster
from distroduce.configuration.cluster import ec2_attributes, bootstrap_actions, instance_groups, configurations
region = 'us-east-1'
cluster_name = 'distibuted_produce'
launch_cluster(cluster_name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)
``` ```
### 4. Execute Benchmark(s):
## 2. Learn the basics * **Step 1:** ssh unto master node
**Tutorials:** available both as [Jupyter Notebooks](tutorials) and [videos](https://www.youtube.com/watch?v=uJEiYHRWA9g&list=PLmWm8ksQq4YKtdRV-SoinhV6LbQMgX1we) * **Step 2:** Spark Submit
```
Familiarize yourself with some system modelling concepts and cadCAD terminology. spark-submit --master yarn --py-files distroduce.zip messaging_sim.py `hostname | xargs`
```
## 3. Documentation:
* [System Model Configuration](documentation/Simulation_Configuration.md)
* [System Simulation Execution](documentation/Simulation_Execution.md)
* [Policy Aggregation](documentation/Policy_Aggregation.md)
* [System Model Parameter Sweep](documentation/System_Model_Parameter_Sweep.md)
## 4. Connect
Find other cadCAD users at our [Discourse](https://community.cadcad.org/). We are a small but rapidly growing community.

View File

@ -11,8 +11,9 @@ by Joshua E. Jodesty
``` ```
## What?: *Description* ## What?: *Description*
***Distributed Produce*** (**[distroduce](distroduce)**) is a message simulation and throughput benchmarking framework / ***Distributed Produce*** (**[distroduce](distroduce)**) is a message simulation and throughput benchmarking framework /
cadCAD execution mode that leverages Apache Spark and Kafka Producer for optimizing Kafka cluster configurations and [cadCAD](https://cadcad.org) execution mode that leverages [Apache Spark](https://spark.apache.org/) and
debugging real-time data transformations. *distroduce* leverages cadCAD's user-defined event simulation template and [Apache Kafka Producer](https://kafka.apache.org/documentation/#producerapi) for optimizing Kafka cluster configurations
and debugging real-time data transformations. *distroduce* leverages cadCAD's user-defined event simulation template and
framework to simulate messages sent to Kafka clusters. This enables rapid and iterative design, debugging, and message framework to simulate messages sent to Kafka clusters. This enables rapid and iterative design, debugging, and message
publish benchmarking of Kafka clusters and real-time data processing using Kafka Streams and Spark (Structured) publish benchmarking of Kafka clusters and real-time data processing using Kafka Streams and Spark (Structured)
Streaming. Streaming.
@ -41,24 +42,65 @@ sh distroduce/configuration/launch_local_kafka.sh
``` ```
**c.** Run Simulation locally **c.** Run Simulation locally
```bash ```bash
python3 distroduce/local_messaging_sim.py zip -rq distroduce/dist/distroduce.zip distroduce/
spark-submit --py-files distroduce/dist/distroduce.zip distroduce/local_messaging_sim.py `hostname | xargs`
``` ```
### 1. Write cadCAD Simulation: ### 1. Write cadCAD Simulation:
* [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation) * **Simulation Description:**
* [Tutorials](https://github.com/BlockScience/cadCAD/tree/master/tutorials) To demonstration of *Distributed Produce*, I implemented a simulation of two users interacting over a messaging service.
* **Resources**
* [cadCAD Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation)
* [cadCAD Tutorials](https://github.com/BlockScience/cadCAD/tree/master/tutorials)
* **Terminology:** * **Terminology:**
* ***Initial Conditions*** - State Variables and their initial values (Start event of Simulation) * ***[Initial Conditions](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-variables)*** - State Variables and their initial values (Start event of Simulation)
```python
initial_conditions = {
'state_variable_1': 0,
'state_variable_2': 0,
'state_variable_3': 1.5,
'timestamp': '2019-01-01 00:00:00'
}
```
* ***[Policy Functions:](https://github.com/BlockScience/cadCAD/tree/master/documentation#Policy-Functions)*** - * ***[Policy Functions:](https://github.com/BlockScience/cadCAD/tree/master/documentation#Policy-Functions)*** -
computes one or more signals to be passed to State Update Functions computes one or more signals to be passed to State Update Functions
* ***[State Update Functions]((https://github.com/BlockScience/cadCAD/tree/master/documentation#state-update-functions)):*** - ```python
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig):
...
return 'state_variable_name', new_value
```
Parameters:
* **_params** : `dict` - [System parameters](https://github.com/BlockScience/cadCAD/blob/master/documentation/System_Model_Parameter_Sweep.md)
* **substep** : `int` - Current [substep](https://github.com/BlockScience/cadCAD/tree/master/documentation#Substep)
* **sH** : `list[list[dict]]` - Historical values of all state variables for the simulation. See
[Historical State Access](https://github.com/BlockScience/cadCAD/blob/master/documentation/Historically_State_Access.md) for details
* **s** : `dict` - Current state of the system, where the `dict_keys` are the names of the state variables and the
`dict_values` are their current values.
* **kafkaConfig:** `kafka.KafkaProducer` - Configuration for `kafka-python`
[Producer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
* ***[State Update Functions](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-update-functions):*** -
updates state variables change over time updates state variables change over time
```python
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig):
...
return 'state_variable_name', new_value
```
Parameters:
* **_params** : `dict` - [System parameters](https://github.com/BlockScience/cadCAD/blob/master/documentation/System_Model_Parameter_Sweep.md)
* **substep** : `int` - Current [substep](https://github.com/BlockScience/cadCAD/tree/master/documentation#Substep)
* **sH** : `list[list[dict]]` - Historical values of all state variables for the simulation. See
[Historical State Access](https://github.com/BlockScience/cadCAD/blob/master/documentation/Historically_State_Access.md) for details
* **s** : `dict` - Current state of the system, where the `dict_keys` are the names of the state variables and the
`dict_values` are their current values.
* **actions** : `dict` - Aggregation of the signals of all policy functions in the current
* **kafkaConfig:** `kafka.KafkaProducer` - Configuration for `kafka-python`
[Producer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
* ***[Partial State Update Block](https://github.com/BlockScience/cadCAD/tree/master/documentation#State-Variables) (PSUB):*** - * ***[Partial State Update Block](https://github.com/BlockScience/cadCAD/tree/master/documentation#State-Variables) (PSUB):*** -
a set of State Update Functions and Policy Functions that update state records a set of State Update Functions and Policy Functions that update state records
![](https://i.imgur.com/9rlX9TG.png) ![](https://i.imgur.com/9rlX9TG.png)
**Note:** State Update and Policy Functions now have the additional / undocumented parameter `kafkaConfig` **Note:** State Update and Policy Functions now have the additional / undocumented parameter `kafkaConfig`
**a.** **Define Policy Functions:** **a.** **Define Policy Functions:**
@ -95,6 +137,5 @@ launch_cluster(cluster_name, region, ec2_attributes, bootstrap_actions, instance
* **Step 1:** ssh unto master node * **Step 1:** ssh unto master node
* **Step 2:** Spark Submit * **Step 2:** Spark Submit
``` ```
PRIVATE_IP=`hostname -I | xargs` spark-submit --master yarn --py-files distroduce.zip messaging_sim.py `hostname | xargs`
spark-submit --master yarn --py-files distroduce.zip messaging_sim.py $PRIVATE_IP
``` ```

Binary file not shown.

View File

@ -1,3 +1,4 @@
import sys
from datetime import datetime from datetime import datetime
from cadCAD import configs from cadCAD import configs
@ -30,8 +31,15 @@ if __name__ == "__main__":
} }
) )
# Configuration for Kafka Producer
kafkaConfig = {
'send_topic': 'test',
'producer_config': {
'bootstrap_servers': f'{sys.argv[1]}:9092',
'acks': 'all'
}
}
exec_mode = ExecutionMode() exec_mode = ExecutionMode()
kafkaConfig = {'send_topic': 'test', 'producer_config': {'bootstrap_servers': f'localhost:9092', 'acks': 'all'}}
dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc, method=distributed_produce, kafka_config=kafkaConfig) dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc, method=distributed_produce, kafka_config=kafkaConfig)
run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc) run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc)

View File

@ -1,3 +1,4 @@
import sys
from datetime import datetime from datetime import datetime
from cadCAD import configs from cadCAD import configs
@ -30,8 +31,15 @@ if __name__ == "__main__":
} }
) )
# Configuration for Kafka Producer
kafkaConfig = {
'send_topic': 'test',
'producer_config': {
'bootstrap_servers': f'{sys.argv[1]}:9092',
'acks': 'all'
}
}
exec_mode = ExecutionMode() exec_mode = ExecutionMode()
kafkaConfig = {'send_topic': 'test', 'producer_config': {'bootstrap_servers': f'{sys.argv[1]}:9092', 'acks': 'all'}}
dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc, method=distributed_produce, kafka_config=kafkaConfig) dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc, method=distributed_produce, kafka_config=kafkaConfig)
run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc) run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc)

View File

@ -57,21 +57,31 @@ def main(executor, sim_config, intitial_conditions, sim_composition):
i = 0 i = 0
for raw_result, tensor_field in executor.execute(): for raw_result, tensor_field in executor.execute():
result = arrange_cols(pd.DataFrame(raw_result), False)[ result = arrange_cols(pd.DataFrame(raw_result), False)
metrics_result = result[
[ [
'run_id', 'timestep', 'substep', 'run_id', 'timestep', 'substep',
'record_creation', 'total_msg_count', 'total_send_time' 'record_creation', 'total_msg_count', 'total_send_time'
] ]
] ]
msgs_result = result[
[
'run_id', 'timestep', 'substep',
'record_creation',
'client_a', 'client_b'
]
]
print() print()
if i == 0: if i == 0:
print(tabulate(tensor_field, headers='keys', tablefmt='psql')) print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
last = result.tail(1) last = metrics_result.tail(1)
last['msg_per_sec'] = last['total_msg_count'] / last['total_send_time'] last['msg_per_sec'] = last['total_msg_count'] / last['total_send_time']
print("Output: Head") print("Messages Output: Head")
print(tabulate(result.head(5), headers='keys', tablefmt='psql')) print(tabulate(msgs_result.head(5), headers='keys', tablefmt='psql'))
print("Output: Tail") print("Metrics Output: Head")
print(tabulate(result.tail(5), headers='keys', tablefmt='psql')) print(tabulate(metrics_result.head(5), headers='keys', tablefmt='psql'))
print("Metrics Output: Tail")
print(tabulate(metrics_result.tail(5), headers='keys', tablefmt='psql'))
print(tabulate(last, headers='keys', tablefmt='psql')) print(tabulate(last, headers='keys', tablefmt='psql'))
print() print()
i += 1 i += 1