Compare commits
29 Commits
master
...
distroduce
| Author | SHA1 | Date |
|---|---|---|
|
|
8503ec1d6b | |
|
|
c7c68a1abe | |
|
|
8d5657cb6f | |
|
|
ea5df50659 | |
|
|
8cf3b09582 | |
|
|
0a9b980ad7 | |
|
|
2a8c1d5e8f | |
|
|
2f0316e5f0 | |
|
|
0c90e2048f | |
|
|
8897c1f973 | |
|
|
5f25cb95fb | |
|
|
6c235f9914 | |
|
|
cdfc4b474f | |
|
|
9fc12e7cce | |
|
|
239cf94cae | |
|
|
964c0b9123 | |
|
|
ff44b0bacf | |
|
|
ca2a9db8ff | |
|
|
9b8b1ba1a0 | |
|
|
a7b01c1d07 | |
|
|
cef6e652b3 | |
|
|
50f9dd9c40 | |
|
|
db2bc8c8b8 | |
|
|
71c334d5f6 | |
|
|
662046f34f | |
|
|
b9c7775d07 | |
|
|
fd0de2d1c0 | |
|
|
cae3ccb119 | |
|
|
903069f23b |
|
|
@ -3,7 +3,7 @@ jupyter notebook
|
|||
.ipynb_checkpoints
|
||||
.DS_Store
|
||||
.idea
|
||||
.pytest_cache/
|
||||
.pytest_cache
|
||||
notebooks
|
||||
*.egg-info
|
||||
__pycache__
|
||||
|
|
@ -28,3 +28,7 @@ testing/udo_test.py
|
|||
Simulation.md
|
||||
|
||||
monkeytype.sqlite3
|
||||
|
||||
distributed_produce/bash/
|
||||
distributed_produce/notes.txt
|
||||
notes.txt
|
||||
277
README.md
277
README.md
|
|
@ -1,173 +1,142 @@
|
|||
```
|
||||
__________ ____
|
||||
________ __ _____/ ____/ | / __ \
|
||||
/ ___/ __` / __ / / / /| | / / / /
|
||||
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
|
||||
\___/\__,_/\__,_/\____/_/ |_/_____/
|
||||
by BlockScience
|
||||
___ _ __ _ __ __ __
|
||||
/ _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
|
||||
/ // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
|
||||
/____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
|
||||
/ _ \ ____ ___ ___/ /__ __ ____ ___
|
||||
/ ___// __// _ \/ _ // // // __// -_)
|
||||
/_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
|
||||
by Joshua E. Jodesty
|
||||
|
||||
```
|
||||
## What?: *Description*
|
||||
***Distributed Produce*** (**[distroduce](distroduce)**) is a message simulation and throughput benchmarking framework /
|
||||
[cadCAD](https://cadcad.org) execution mode that leverages [Apache Spark](https://spark.apache.org/) and
|
||||
[Apache Kafka Producer](https://kafka.apache.org/documentation/#producerapi) for optimizing Kafka cluster configurations
|
||||
and debugging real-time data transformations. *distroduce* leverages cadCAD's user-defined event simulation template and
|
||||
framework to simulate messages sent to Kafka clusters. This enables rapid and iterative design, debugging, and message
|
||||
publish benchmarking of Kafka clusters and real-time data processing using Kafka Streams and Spark (Structured)
|
||||
Streaming.
|
||||
|
||||
**Introduction:**
|
||||
##How?: *A Tail of Two Clusters*
|
||||
***Distributed Produce*** is a Spark Application used as a cadCAD Execution Mode that distributes Kafka Producers,
|
||||
message simulation, and message publishing to worker nodes of an EMR cluster. Messages published from these workers are
|
||||
sent to Kafka topics on a Kafka cluster from a Spark bootstrapped EMR cluster.
|
||||
|
||||
***cadCAD*** is a Python library that assists in the processes of designing, testing and validating complex systems through
|
||||
simulation. At its core, cadCAD is a differential games engine that supports parameter sweeping and Monte Carlo analyses
|
||||
and can be easily integrated with other scientific computing Python modules and data science workflows.
|
||||
##Why?: *Use Case*
|
||||
* **IoT Event / Device Simulation:** Competes with *AWS IoT Device Simulator* and *Azure IoT Solution Acceleration:
|
||||
Device Simulation*. Unlike these products, *Distributed Produce* enables a user-defined state updates and agent actions,
|
||||
as well as message publish benchmarking
|
||||
* **Development Environment for Real-Time Data Processing / Routing:**
|
||||
|
||||
**Description:**
|
||||
##Get Started:
|
||||
|
||||
cadCAD (complex adaptive systems computer-aided design) is a python based, unified modeling framework for stochastic
|
||||
dynamical systems and differential games for research, validation, and Computer Aided Design of economic systems created
|
||||
by BlockScience. It is capable of modeling systems at all levels of abstraction from Agent Based Modeling (ABM) to
|
||||
System Dynamics (SD), and enabling smooth integration of computational social science simulations with empirical data
|
||||
science workflows.
|
||||
|
||||
|
||||
An economic system is treated as a state-based model and defined through a set of endogenous and exogenous state
|
||||
variables which are updated through mechanisms and environmental processes, respectively. Behavioral models, which may
|
||||
be deterministic or stochastic, provide the evolution of the system within the action space of the mechanisms.
|
||||
Mathematical formulations of these economic games treat agent utility as derived from the state rather than direct from
|
||||
an action, creating a rich, dynamic modeling framework. Simulations may be run with a range of initial conditions and
|
||||
parameters for states, behaviors, mechanisms, and environmental processes to understand and visualize network behavior
|
||||
under various conditions. Support for A/B testing policies, Monte Carlo analysis, and other common numerical methods is
|
||||
provided.
|
||||
|
||||
|
||||
For example, cadCAD tool allows us to represent a company’s or community’s current business model along with a desired
|
||||
future state and helps make informed, rigorously tested decisions on how to get from today’s stage to the future state.
|
||||
It allows us to use code to solidify our conceptualized ideas and see if the outcome meets our expectations. We can
|
||||
iteratively refine our work until we have constructed a model that closely reflects reality at the start of the model,
|
||||
and see how it evolves. We can then use these results to inform business decisions.
|
||||
|
||||
#### Documentation:
|
||||
* ##### [Tutorials](tutorials)
|
||||
* ##### [System Model Configuration](documentation/Simulation_Configuration.md)
|
||||
* ##### [System Simulation Execution](documentation/Simulation_Execution.md)
|
||||
* ##### [Policy Aggregation](documentation/Policy_Aggregation.md)
|
||||
* ##### [System Model Parameter Sweep](documentation/System_Model_Parameter_Sweep.md)
|
||||
|
||||
#### 0. Installation:
|
||||
|
||||
**Python 3.6.5** :: Anaconda, Inc.
|
||||
|
||||
**Option A:** Build From Source
|
||||
### 0. Set Up Local Development Environment: see [Kafka Quickstart](https://kafka.apache.org/quickstart)
|
||||
**a.** Install `pyspark`
|
||||
```bash
|
||||
pip3 install -r requirements.txt
|
||||
python3 setup.py sdist bdist_wheel
|
||||
pip3 install dist/*.whl
|
||||
pip3 install pyspark
|
||||
```
|
||||
|
||||
**Option B:** Proprietary Build Access
|
||||
|
||||
***IMPORTANT NOTE:*** Tokens are issued to those with access to proprietary builds of cadCAD and BlockScience employees **ONLY**.
|
||||
Replace \<TOKEN\> with an issued token in the script below.
|
||||
**b.** Install & Unzip Kafka, Create Kafka `test` topic, and Start Consumer
|
||||
```bash
|
||||
pip3 install pandas pathos fn funcy tabulate
|
||||
pip3 install cadCAD --extra-index-url https://<TOKEN>@repo.fury.io/blockscience/
|
||||
sh distroduce/configuration/launch_local_kafka.sh
|
||||
```
|
||||
**c.** Run Simulation locally
|
||||
```bash
|
||||
zip -rq distroduce/dist/distroduce.zip distroduce/
|
||||
spark-submit --py-files distroduce/dist/distroduce.zip distroduce/local_messaging_sim.py `hostname | xargs`
|
||||
```
|
||||
|
||||
### 1. Write cadCAD Simulation:
|
||||
* **Simulation Description:**
|
||||
To demonstration of *Distributed Produce*, I implemented a simulation of two users interacting over a messaging service.
|
||||
* **Resources**
|
||||
* [cadCAD Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation)
|
||||
* [cadCAD Tutorials](https://github.com/BlockScience/cadCAD/tree/master/tutorials)
|
||||
* **Terminology:**
|
||||
* ***[Initial Conditions](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-variables)*** -
|
||||
State Variables and their initial values (Start event of Simulation)
|
||||
```python
|
||||
initial_conditions = {
|
||||
'state_variable_1': 0,
|
||||
'state_variable_2': 0,
|
||||
'state_variable_3': 1.5,
|
||||
'timestamp': '2019-01-01 00:00:00'
|
||||
}
|
||||
```
|
||||
* ***[Policy Functions:](https://github.com/BlockScience/cadCAD/tree/master/documentation#Policy-Functions)*** -
|
||||
computes one or more signals to be passed to State Update Functions
|
||||
```python
|
||||
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig):
|
||||
...
|
||||
return 'state_variable_name', new_value
|
||||
```
|
||||
|
||||
#### 1. [Configure System Model](documentation/Simulation_Configuration.md)
|
||||
Parameters:
|
||||
* **_params** : `dict` - [System parameters](https://github.com/BlockScience/cadCAD/blob/master/documentation/System_Model_Parameter_Sweep.md)
|
||||
* **substep** : `int` - Current [substep](https://github.com/BlockScience/cadCAD/tree/master/documentation#Substep)
|
||||
* **sH** : `list[list[dict]]` - Historical values of all state variables for the simulation. See
|
||||
[Historical State Access](https://github.com/BlockScience/cadCAD/blob/master/documentation/Historically_State_Access.md) for details
|
||||
* **s** : `dict` - Current state of the system, where the `dict_keys` are the names of the state variables and the
|
||||
`dict_values` are their current values.
|
||||
* **kafkaConfig:** `kafka.KafkaProducer` - Configuration for `kafka-python`
|
||||
[Producer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
|
||||
|
||||
#### 2. [Execute Simulations:](documentation/Simulation_Execution.md)
|
||||
* ***[State Update Functions](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-update-functions):*** -
|
||||
updates state variables change over time
|
||||
```python
|
||||
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig):
|
||||
...
|
||||
return 'state_variable_name', new_value
|
||||
```
|
||||
Parameters:
|
||||
* **_params** : `dict` - [System parameters](https://github.com/BlockScience/cadCAD/blob/master/documentation/System_Model_Parameter_Sweep.md)
|
||||
* **substep** : `int` - Current [substep](https://github.com/BlockScience/cadCAD/tree/master/documentation#Substep)
|
||||
* **sH** : `list[list[dict]]` - Historical values of all state variables for the simulation. See
|
||||
[Historical State Access](https://github.com/BlockScience/cadCAD/blob/master/documentation/Historically_State_Access.md) for details
|
||||
* **s** : `dict` - Current state of the system, where the `dict_keys` are the names of the state variables and the
|
||||
`dict_values` are their current values.
|
||||
* **actions** : `dict` - Aggregation of the signals of all policy functions in the current
|
||||
* **kafkaConfig:** `kafka.KafkaProducer` - Configuration for `kafka-python`
|
||||
[Producer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
|
||||
* ***[Partial State Update Block](https://github.com/BlockScience/cadCAD/tree/master/documentation#State-Variables) (PSUB):*** -
|
||||
a set of State Update Functions and Policy Functions that update state records
|
||||

|
||||
|
||||
##### Single Process Execution:
|
||||
Example System Model Configurations:
|
||||
* [System Model A](documentation/examples/sys_model_A.py):
|
||||
`/documentation/examples/sys_model_A.py`
|
||||
* [System Model B](documentation/examples/sys_model_B.py):
|
||||
`/documentation/examples/sys_model_B.py`
|
||||
**Note:** State Update and Policy Functions now have the additional / undocumented parameter `kafkaConfig`
|
||||
|
||||
Example Simulation Executions:
|
||||
* [System Model A](documentation/examples/sys_model_A_exec.py):
|
||||
`/documentation/examples/sys_model_A_exec.py`
|
||||
* [System Model B](documentation/examples/sys_model_B_exec.py):
|
||||
`/documentation/examples/sys_model_B_exec.py`
|
||||
**a.** **Define Policy Functions:**
|
||||
* [Example:](distroduce/action_policies.py) Two users interacting on separate chat clients and entering / exiting chat
|
||||
|
||||
**b.** **Define State Update Functions:**
|
||||
* [Example:](distroduce/state_updates.py) Used for logging and maintaining state of user actions defined by policies
|
||||
|
||||
**c.** **Define Initial Conditions & Partial State Update Block:**
|
||||
* **Initial Conditions:** [Example](distroduce/messaging_sim.py)
|
||||
* **Partial State Update Block (PSUB):** [Example](distroduce/simulation.py)
|
||||
|
||||
**d.** **Create Simulation Executor:** Used for running a simulation
|
||||
* [Local](distroduce/local_messaging_sim.py)
|
||||
* [EMR](distroduce/messaging_sim.py)
|
||||
|
||||
### 2. [Configure EMR Cluster](distroduce/configuration/cluster.py)
|
||||
|
||||
### 3. Launch EMR Cluster:
|
||||
**Option A:** Preconfigured Launch
|
||||
```bash
|
||||
python3 distroduce/emr/launch.py
|
||||
```
|
||||
**Option B:** Custom Launch - [Example](distroduce/emr/launch.py)
|
||||
```python
|
||||
import pandas as pd
|
||||
from tabulate import tabulate
|
||||
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
|
||||
from documentation.examples import sys_model_A
|
||||
from cadCAD import configs
|
||||
|
||||
exec_mode = ExecutionMode()
|
||||
|
||||
# Single Process Execution using a Single System Model Configuration:
|
||||
# sys_model_A
|
||||
sys_model_A = [configs[0]] # sys_model_A
|
||||
single_proc_ctx = ExecutionContext(context=exec_mode.single_proc)
|
||||
sys_model_A_simulation = Executor(exec_context=single_proc_ctx, configs=sys_model_A)
|
||||
|
||||
sys_model_A_raw_result, sys_model_A_tensor_field = sys_model_A_simulation.execute()
|
||||
sys_model_A_result = pd.DataFrame(sys_model_A_raw_result)
|
||||
print()
|
||||
print("Tensor Field: sys_model_A")
|
||||
print(tabulate(sys_model_A_tensor_field, headers='keys', tablefmt='psql'))
|
||||
print("Result: System Events DataFrame")
|
||||
print(tabulate(sys_model_A_result, headers='keys', tablefmt='psql'))
|
||||
print()
|
||||
from distroduce.emr.launch import launch_cluster
|
||||
from distroduce.configuration.cluster import ec2_attributes, bootstrap_actions, instance_groups, configurations
|
||||
region = 'us-east-1'
|
||||
cluster_name = 'distibuted_produce'
|
||||
launch_cluster(cluster_name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)
|
||||
```
|
||||
|
||||
##### Multiple Simulations (Concurrent):
|
||||
###### Multiple Simulation Execution (Multi Process Execution)
|
||||
System Model Configurations:
|
||||
* [System Model A](documentation/examples/sys_model_A.py):
|
||||
`/documentation/examples/sys_model_A.py`
|
||||
* [System Model B](documentation/examples/sys_model_B.py):
|
||||
`/documentation/examples/sys_model_B.py`
|
||||
|
||||
[Example Simulation Executions:](documentation/examples/sys_model_AB_exec.py)
|
||||
`/documentation/examples/sys_model_AB_exec.py`
|
||||
|
||||
```python
|
||||
import pandas as pd
|
||||
from tabulate import tabulate
|
||||
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
|
||||
from documentation.examples import sys_model_A, sys_model_B
|
||||
from cadCAD import configs
|
||||
|
||||
exec_mode = ExecutionMode()
|
||||
|
||||
# # Multiple Processes Execution using Multiple System Model Configurations:
|
||||
# # sys_model_A & sys_model_B
|
||||
multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
|
||||
sys_model_AB_simulation = Executor(exec_context=multi_proc_ctx, configs=configs)
|
||||
|
||||
i = 0
|
||||
config_names = ['sys_model_A', 'sys_model_B']
|
||||
for sys_model_AB_raw_result, sys_model_AB_tensor_field in sys_model_AB_simulation.execute():
|
||||
sys_model_AB_result = pd.DataFrame(sys_model_AB_raw_result)
|
||||
print()
|
||||
print(f"Tensor Field: {config_names[i]}")
|
||||
print(tabulate(sys_model_AB_tensor_field, headers='keys', tablefmt='psql'))
|
||||
print("Result: System Events DataFrame:")
|
||||
print(tabulate(sys_model_AB_result, headers='keys', tablefmt='psql'))
|
||||
print()
|
||||
i += 1
|
||||
```
|
||||
|
||||
##### Parameter Sweep Simulation (Concurrent):
|
||||
[Example:](documentation/examples/param_sweep.py)
|
||||
`/documentation/examples/param_sweep.py`
|
||||
|
||||
```python
|
||||
import pandas as pd
|
||||
from tabulate import tabulate
|
||||
# The following imports NEED to be in the exact order
|
||||
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
|
||||
from documentation.examples import param_sweep
|
||||
from cadCAD import configs
|
||||
|
||||
exec_mode = ExecutionMode()
|
||||
multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
|
||||
run = Executor(exec_context=multi_proc_ctx, configs=configs)
|
||||
|
||||
for raw_result, tensor_field in run.execute():
|
||||
result = pd.DataFrame(raw_result)
|
||||
print()
|
||||
print("Tensor Field:")
|
||||
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
|
||||
print("Output:")
|
||||
print(tabulate(result, headers='keys', tablefmt='psql'))
|
||||
print()
|
||||
```
|
||||
### 4. Execute Benchmark(s):
|
||||
* **Step 1:** ssh unto master node
|
||||
* **Step 2:** Spark Submit
|
||||
```
|
||||
spark-submit --master yarn --py-files distroduce.zip messaging_sim.py `hostname | xargs`
|
||||
```
|
||||
|
|
|
|||
|
|
@ -0,0 +1,38 @@
|
|||
text = r'''
|
||||
Complex Adaptive Dynamics
|
||||
o i e
|
||||
m d s
|
||||
p e i
|
||||
u d g
|
||||
t n
|
||||
e
|
||||
r
|
||||
|
||||
'''
|
||||
|
||||
block_letters = r'''
|
||||
__________ ____
|
||||
________ __ _____/ ____/ | / __ \
|
||||
/ ___/ __` / __ / / / /| | / / / /
|
||||
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
|
||||
\___/\__,_/\__,_/\____/_/ |_/_____/
|
||||
by BlockScience
|
||||
'''
|
||||
|
||||
production = r'''
|
||||
__________ ____
|
||||
________ __ _____/ ____/ | / __ \
|
||||
/ ___/ __` / __ / / / /| | / / / /
|
||||
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
|
||||
\___/\__,_/\__,_/\____/_/ |_/_____/
|
||||
by BlockScience
|
||||
======================================
|
||||
Complex Adaptive Dynamics
|
||||
o i e
|
||||
m d s
|
||||
p e i
|
||||
u d g
|
||||
t n
|
||||
e
|
||||
r
|
||||
'''
|
||||
|
|
@ -1,15 +0,0 @@
|
|||
Complex Adaptive Dynamics
|
||||
o i e
|
||||
m d s
|
||||
p e i
|
||||
u d g
|
||||
t n
|
||||
e
|
||||
r
|
||||
|
||||
__________ ____
|
||||
________ __ _____/ ____/ | / __ \
|
||||
/ ___/ __` / __ / / / /| | / / / /
|
||||
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
|
||||
\___/\__,_/\__,_/\____/_/ |_/_____/
|
||||
by BlockScience
|
||||
|
|
@ -1,2 +1,20 @@
|
|||
name = "cadCAD"
|
||||
configs = []
|
||||
|
||||
print(r'''
|
||||
__________ ____
|
||||
________ __ _____/ ____/ | / __ \
|
||||
/ ___/ __` / __ / / / /| | / / / /
|
||||
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
|
||||
\___/\__,_/\__,_/\____/_/ |_/_____/
|
||||
by BlockScience
|
||||
======================================
|
||||
Complex Adaptive Dynamics
|
||||
o i e
|
||||
m d s
|
||||
p e i
|
||||
u d g
|
||||
t n
|
||||
e
|
||||
r
|
||||
''')
|
||||
|
|
@ -1,7 +1,9 @@
|
|||
from copy import deepcopy
|
||||
from typing import Dict, Callable, List, Tuple
|
||||
from functools import reduce
|
||||
import pandas as pd
|
||||
from pandas.core.frame import DataFrame
|
||||
# import cloudpickle, pickle
|
||||
|
||||
from cadCAD import configs
|
||||
from cadCAD.utils import key_filter
|
||||
|
|
@ -11,9 +13,9 @@ from cadCAD.configuration.utils.depreciationHandler import sanitize_partial_stat
|
|||
|
||||
|
||||
class Configuration(object):
|
||||
def __init__(self, sim_config={}, initial_state={}, seeds={}, env_processes={},
|
||||
def __init__(self, user_id, sim_config={}, initial_state={}, seeds={}, env_processes={},
|
||||
exogenous_states={}, partial_state_update_blocks={}, policy_ops=[lambda a, b: a + b],
|
||||
**kwargs) -> None:
|
||||
session_id=0, simulation_id=0, run_id=1, **kwargs) -> None:
|
||||
# print(exogenous_states)
|
||||
self.sim_config = sim_config
|
||||
self.initial_state = initial_state
|
||||
|
|
@ -24,11 +26,18 @@ class Configuration(object):
|
|||
self.policy_ops = policy_ops
|
||||
self.kwargs = kwargs
|
||||
|
||||
self.user_id = user_id
|
||||
self.session_id = session_id
|
||||
self.simulation_id = simulation_id
|
||||
self.run_id = run_id
|
||||
|
||||
sanitize_config(self)
|
||||
|
||||
|
||||
def append_configs(sim_configs={}, initial_state={}, seeds={}, raw_exogenous_states={}, env_processes={},
|
||||
partial_state_update_blocks={}, policy_ops=[lambda a, b: a + b], _exo_update_per_ts: bool = True) -> None:
|
||||
def append_configs(user_id='cadCAD_user', session_id=0, #ToDo: change to string
|
||||
sim_configs={}, initial_state={}, seeds={}, raw_exogenous_states={}, env_processes={},
|
||||
partial_state_update_blocks={}, policy_ops=[lambda a, b: a + b], _exo_update_per_ts: bool = True
|
||||
) -> None:
|
||||
if _exo_update_per_ts is True:
|
||||
exogenous_states = exo_update_per_ts(raw_exogenous_states)
|
||||
else:
|
||||
|
|
@ -37,7 +46,27 @@ def append_configs(sim_configs={}, initial_state={}, seeds={}, raw_exogenous_sta
|
|||
if isinstance(sim_configs, dict):
|
||||
sim_configs = [sim_configs]
|
||||
|
||||
for sim_config in sim_configs:
|
||||
new_sim_configs = []
|
||||
for t in list(zip(sim_configs, list(range(len(sim_configs))))):
|
||||
sim_config, simulation_id = t[0], t[1]
|
||||
N = sim_config['N']
|
||||
if N > 1:
|
||||
for n in range(N):
|
||||
sim_config['simulation_id'] = simulation_id
|
||||
sim_config['run_id'] = n
|
||||
sim_config['N'] = 1
|
||||
new_sim_configs.append(deepcopy(sim_config))
|
||||
del sim_config
|
||||
else:
|
||||
sim_config['simulation_id'] = simulation_id
|
||||
sim_config['run_id'] = 0
|
||||
new_sim_configs.append(deepcopy(sim_config))
|
||||
|
||||
print(new_sim_configs)
|
||||
print()
|
||||
|
||||
# for sim_config in sim_configs:
|
||||
for sim_config in new_sim_configs:
|
||||
config = Configuration(
|
||||
sim_config=sim_config,
|
||||
initial_state=initial_state,
|
||||
|
|
@ -45,10 +74,15 @@ def append_configs(sim_configs={}, initial_state={}, seeds={}, raw_exogenous_sta
|
|||
exogenous_states=exogenous_states,
|
||||
env_processes=env_processes,
|
||||
partial_state_update_blocks=partial_state_update_blocks,
|
||||
policy_ops=policy_ops
|
||||
policy_ops=policy_ops,
|
||||
|
||||
user_id=user_id,
|
||||
session_id=session_id,
|
||||
simulation_id=sim_config['simulation_id'],
|
||||
run_id=sim_config['run_id']
|
||||
)
|
||||
print(sim_configs)
|
||||
#for each sim config create new config
|
||||
# print(sim_configs)
|
||||
# for each sim config create new config
|
||||
configs.append(config)
|
||||
|
||||
|
||||
|
|
@ -68,7 +102,10 @@ class Identity:
|
|||
def state_identity(self, k: str) -> Callable:
|
||||
return lambda var_dict, sub_step, sL, s, _input: (k, s[k])
|
||||
|
||||
# state_identity = cloudpickle.dumps(state_identity)
|
||||
|
||||
def apply_identity_funcs(self, identity: Callable, df: DataFrame, cols: List[str]) -> List[DataFrame]:
|
||||
# identity = pickle.loads(identity)
|
||||
def fillna_with_id_func(identity, df, col):
|
||||
return df[[col]].fillna(value=identity(col))
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,30 @@
|
|||
from datetime import datetime
|
||||
|
||||
# ToDo: Model Async communication here
|
||||
# ToDo: Configuration of Input
|
||||
def configure_producer(msg, config):
|
||||
from kafka import KafkaProducer
|
||||
def send_messages(events):
|
||||
producer = KafkaProducer(**config)
|
||||
|
||||
start_timestamp = datetime.now()
|
||||
for event in range(events):
|
||||
producer.send('test', msg(event)).get()
|
||||
delta = datetime.now() - start_timestamp
|
||||
|
||||
return start_timestamp, delta.total_seconds()
|
||||
|
||||
return send_messages
|
||||
|
||||
# def action(step):
|
||||
# step += 1
|
||||
# percent = step / 10000
|
||||
# print(str(datetime.now()) + " - " + str(percent) + ": " + str(step))
|
||||
|
||||
# def configure_consumer(config, elemental_action: function):
|
||||
# from kafka import KafkaConsumer
|
||||
# def receive_messages():
|
||||
# consumer = KafkaConsumer(**config)
|
||||
# return [elemental_action(i) for i, message in enumerate(consumer)]
|
||||
#
|
||||
# return receive_messages
|
||||
|
|
@ -0,0 +1 @@
|
|||
from cadCAD.distroduce.executor.spark.jobs import distributed_produce
|
||||
|
|
@ -0,0 +1,33 @@
|
|||
from cadCAD.distroduce.configuration.kakfa import configure_producer
|
||||
from pyspark.context import SparkContext
|
||||
|
||||
ascii_art = r'''
|
||||
___ _ __ _ __ __ __
|
||||
/ _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
|
||||
/ // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
|
||||
/____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
|
||||
/ _ \ ____ ___ ___/ /__ __ ____ ___
|
||||
/ ___// __// _ \/ _ // // // __// -_)
|
||||
/_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
|
||||
by Joshua E. Jodesty
|
||||
'''
|
||||
|
||||
def distributed_produce(
|
||||
sc: SparkContext,
|
||||
spark_run,
|
||||
sim_time,
|
||||
sim_runs,
|
||||
rdd_parts,
|
||||
parameterized_message,
|
||||
prod_config
|
||||
):
|
||||
print(ascii_art)
|
||||
message_counts = [sim_time] * sim_runs
|
||||
msg_rdd = sc.parallelize(message_counts).repartition(rdd_parts)
|
||||
parts = msg_rdd.getNumPartitions()
|
||||
print()
|
||||
print(f"RDD_{spark_run} - Partitions: {parts}")
|
||||
print()
|
||||
|
||||
produce = configure_producer(parameterized_message, prod_config)
|
||||
return msg_rdd.map(produce).collect()
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
def flatten(l):
|
||||
return [item for sublist in l for item in sublist]
|
||||
|
|
@ -1,7 +1,14 @@
|
|||
from pprint import pprint
|
||||
from typing import Callable, Dict, List, Any, Tuple
|
||||
from pathos.multiprocessing import ProcessingPool as PPool
|
||||
from pathos.multiprocessing import ThreadPool as TPool
|
||||
from pandas.core.frame import DataFrame
|
||||
from pyspark.context import SparkContext
|
||||
from pyspark import cloudpickle
|
||||
import pickle
|
||||
from fn.func import curried
|
||||
|
||||
from cadCAD.distroduce.configuration.kakfa import configure_producer
|
||||
from cadCAD.utils import flatten
|
||||
from cadCAD.configuration import Configuration, Processor
|
||||
from cadCAD.configuration.utils import TensorFieldReport
|
||||
|
|
@ -16,6 +23,7 @@ EnvProcessesType = Dict[str, Callable]
|
|||
class ExecutionMode:
|
||||
single_proc = 'single_proc'
|
||||
multi_proc = 'multi_proc'
|
||||
dist_proc = 'dist_proc'
|
||||
|
||||
|
||||
def single_proc_exec(
|
||||
|
|
@ -25,11 +33,18 @@ def single_proc_exec(
|
|||
configs_structs: List[ConfigsType],
|
||||
env_processes_list: List[EnvProcessesType],
|
||||
Ts: List[range],
|
||||
Ns: List[int]
|
||||
Ns: List[int],
|
||||
userIDs,
|
||||
sessionIDs,
|
||||
simulationIDs,
|
||||
runIDs: List[int],
|
||||
):
|
||||
l = [simulation_execs, states_lists, configs_structs, env_processes_list, Ts, Ns]
|
||||
simulation_exec, states_list, config, env_processes, T, N = list(map(lambda x: x.pop(), l))
|
||||
result = simulation_exec(var_dict_list, states_list, config, env_processes, T, N)
|
||||
params = [simulation_execs, states_lists, configs_structs, env_processes_list, Ts, Ns,
|
||||
userIDs, sessionIDs, simulationIDs, runIDs]
|
||||
simulation_exec, states_list, config, env_processes, T, N, user_id, session_id, simulation_id, run_id = \
|
||||
list(map(lambda x: x.pop(), params))
|
||||
result = simulation_exec(var_dict_list, states_list, config, env_processes, T, N,
|
||||
user_id, session_id, simulation_id, run_id)
|
||||
return flatten(result)
|
||||
|
||||
|
||||
|
|
@ -40,27 +55,87 @@ def parallelize_simulations(
|
|||
configs_structs: List[ConfigsType],
|
||||
env_processes_list: List[EnvProcessesType],
|
||||
Ts: List[range],
|
||||
Ns: List[int]
|
||||
Ns: List[int],
|
||||
userIDs,
|
||||
sessionIDs,
|
||||
simulationIDs,
|
||||
runIDs: List[int]
|
||||
):
|
||||
l = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns))
|
||||
params = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
|
||||
userIDs, sessionIDs, simulationIDs, runIDs))
|
||||
with PPool(len(configs_structs)) as p:
|
||||
results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6]), l)
|
||||
results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10]), params)
|
||||
return results
|
||||
|
||||
def distributed_simulations(
|
||||
simulation_execs: List[Callable],
|
||||
var_dict_list: List[VarDictType],
|
||||
states_lists: List[StatesListsType],
|
||||
configs_structs: List[ConfigsType],
|
||||
env_processes_list: List[EnvProcessesType],
|
||||
Ts: List[range],
|
||||
Ns: List[int],
|
||||
userIDs,
|
||||
sessionIDs,
|
||||
simulationIDs,
|
||||
runIDs: List[int],
|
||||
sc,
|
||||
kafkaConfig
|
||||
):
|
||||
|
||||
func_params_zipped = list(
|
||||
zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list)
|
||||
)
|
||||
func_params_kv = [((t[0], t[1], t[2], t[3]), (t[4], t[5], t[6])) for t in func_params_zipped]
|
||||
def simulate(k, v):
|
||||
from kafka import KafkaProducer
|
||||
prod_config = kafkaConfig['producer_config']
|
||||
kafkaConfig['producer'] = KafkaProducer(**prod_config)
|
||||
(sim_exec, config, env_procs) = [f[1] for f in func_params_kv if f[0] == k][0]
|
||||
results = sim_exec(
|
||||
v['var_dict'], v['states_lists'], config, env_procs, v['Ts'], v['Ns'],
|
||||
k[0], k[1], k[2], k[3], kafkaConfig
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns))
|
||||
val_params_kv = [
|
||||
(
|
||||
(t[0], t[1], t[2], t[3]),
|
||||
{'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}
|
||||
) for t in val_params
|
||||
]
|
||||
results_rdd = sc.parallelize(val_params_kv).coalesce(35)
|
||||
|
||||
return list(results_rdd.map(lambda x: simulate(*x)).collect())
|
||||
|
||||
|
||||
class ExecutionContext:
|
||||
def __init__(self, context: str = ExecutionMode.multi_proc) -> None:
|
||||
def __init__(self, context=ExecutionMode.multi_proc, method=None, kafka_config=None) -> None:
|
||||
self.name = context
|
||||
self.method = None
|
||||
|
||||
if context == 'single_proc':
|
||||
self.method = single_proc_exec
|
||||
elif context == 'multi_proc':
|
||||
self.method = parallelize_simulations
|
||||
elif context == 'dist_proc':
|
||||
def distroduce_proc(
|
||||
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
|
||||
userIDs, sessionIDs, simulationIDs, runIDs,
|
||||
sc, kafkaConfig=kafka_config
|
||||
):
|
||||
return method(
|
||||
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
|
||||
userIDs, sessionIDs, simulationIDs, runIDs,
|
||||
sc, kafkaConfig
|
||||
)
|
||||
|
||||
self.method = distroduce_proc
|
||||
|
||||
|
||||
class Executor:
|
||||
def __init__(self, exec_context: ExecutionContext, configs: List[Configuration]) -> None:
|
||||
def __init__(self, exec_context: ExecutionContext, configs: List[Configuration], spark_context: SparkContext = None) -> None:
|
||||
self.sc = spark_context
|
||||
self.SimExecutor = SimExecutor
|
||||
self.exec_method = exec_context.method
|
||||
self.exec_context = exec_context.name
|
||||
|
|
@ -70,50 +145,59 @@ class Executor:
|
|||
config_proc = Processor()
|
||||
create_tensor_field = TensorFieldReport(config_proc).create_tensor_field
|
||||
|
||||
|
||||
print(r'''
|
||||
__________ ____
|
||||
________ __ _____/ ____/ | / __ \
|
||||
/ ___/ __` / __ / / / /| | / / / /
|
||||
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
|
||||
\___/\__,_/\__,_/\____/_/ |_/_____/
|
||||
by BlockScience
|
||||
''')
|
||||
print(f'Execution Mode: {self.exec_context + ": " + str(self.configs)}')
|
||||
print(f'Configurations: {self.configs}')
|
||||
|
||||
var_dict_list, states_lists, Ts, Ns, eps, configs_structs, env_processes_list, partial_state_updates, simulation_execs = \
|
||||
[], [], [], [], [], [], [], [], []
|
||||
userIDs, sessionIDs, simulationIDs, runIDs, \
|
||||
var_dict_list, states_lists, \
|
||||
Ts, Ns, \
|
||||
eps, configs_structs, env_processes_list, \
|
||||
partial_state_updates, simulation_execs = \
|
||||
[], [], [], [], [], [], [], [], [], [], [], [], []
|
||||
config_idx = 0
|
||||
|
||||
for x in self.configs:
|
||||
userIDs.append(x.user_id)
|
||||
sessionIDs.append(x.session_id)
|
||||
simulationIDs.append(x.simulation_id)
|
||||
runIDs.append(x.run_id)
|
||||
|
||||
Ts.append(x.sim_config['T'])
|
||||
Ns.append(x.sim_config['N'])
|
||||
|
||||
var_dict_list.append(x.sim_config['M'])
|
||||
states_lists.append([x.initial_state])
|
||||
eps.append(list(x.exogenous_states.values()))
|
||||
configs_structs.append(config_proc.generate_config(x.initial_state, x.partial_state_updates, eps[config_idx]))
|
||||
# print(env_processes_list)
|
||||
env_processes_list.append(x.env_processes)
|
||||
partial_state_updates.append(x.partial_state_updates)
|
||||
simulation_execs.append(SimExecutor(x.policy_ops).simulation)
|
||||
|
||||
config_idx += 1
|
||||
|
||||
final_result = None
|
||||
|
||||
if self.exec_context == ExecutionMode.single_proc:
|
||||
tensor_field = create_tensor_field(partial_state_updates.pop(), eps.pop())
|
||||
result = self.exec_method(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns)
|
||||
result = self.exec_method(
|
||||
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
|
||||
userIDs, sessionIDs, simulationIDs, runIDs
|
||||
)
|
||||
final_result = result, tensor_field
|
||||
elif self.exec_context == ExecutionMode.multi_proc:
|
||||
# if len(self.configs) > 1:
|
||||
simulations = self.exec_method(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns)
|
||||
else:
|
||||
if self.exec_context == ExecutionMode.multi_proc:
|
||||
# if len(self.configs) > 1:
|
||||
simulations = self.exec_method(
|
||||
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
|
||||
userIDs, sessionIDs, simulationIDs, runIDs
|
||||
)
|
||||
elif self.exec_context == ExecutionMode.dist_proc:
|
||||
simulations = self.exec_method(
|
||||
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
|
||||
userIDs, sessionIDs, simulationIDs, runIDs, self.sc
|
||||
)
|
||||
|
||||
results = []
|
||||
for result, partial_state_updates, ep in list(zip(simulations, partial_state_updates, eps)):
|
||||
results.append((flatten(result), create_tensor_field(partial_state_updates, ep)))
|
||||
|
||||
final_result = results
|
||||
|
||||
return final_result
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
from typing import Any, Callable, Dict, List, Tuple
|
||||
from pathos.pools import ThreadPool as TPool
|
||||
# from pathos.pools import ThreadPool as TPool
|
||||
from copy import deepcopy
|
||||
from functools import reduce
|
||||
|
||||
|
|
@ -27,13 +27,14 @@ class Executor:
|
|||
sub_step: int,
|
||||
sL: List[Dict[str, Any]],
|
||||
s: Dict[str, Any],
|
||||
funcs: List[Callable]
|
||||
funcs: List[Callable],
|
||||
kafkaConfig
|
||||
) -> Dict[str, Any]:
|
||||
|
||||
ops = self.policy_ops
|
||||
|
||||
def get_col_results(sweep_dict, sub_step, sL, s, funcs):
|
||||
return list(map(lambda f: f(sweep_dict, sub_step, sL, s), funcs))
|
||||
return list(map(lambda f: f(sweep_dict, sub_step, sL, s, kafkaConfig), funcs))
|
||||
|
||||
def compose(init_reduction_funct, funct_list, val_list):
|
||||
result, i = None, 0
|
||||
|
|
@ -105,18 +106,18 @@ class Executor:
|
|||
policy_funcs: List[Callable],
|
||||
env_processes: Dict[str, Callable],
|
||||
time_step: int,
|
||||
run: int
|
||||
run: int,
|
||||
kafkaConfig
|
||||
) -> List[Dict[str, Any]]:
|
||||
|
||||
last_in_obj: Dict[str, Any] = deepcopy(sL[-1])
|
||||
_input: Dict[str, Any] = self.policy_update_exception(
|
||||
self.get_policy_input(sweep_dict, sub_step, sH, last_in_obj, policy_funcs)
|
||||
self.get_policy_input(sweep_dict, sub_step, sH, last_in_obj, policy_funcs, kafkaConfig)
|
||||
)
|
||||
|
||||
|
||||
def generate_record(state_funcs):
|
||||
for f in state_funcs:
|
||||
yield self.state_update_exception(f(sweep_dict, sub_step, sH, last_in_obj, _input))
|
||||
yield self.state_update_exception(f(sweep_dict, sub_step, sH, last_in_obj, _input, kafkaConfig))
|
||||
|
||||
def transfer_missing_fields(source, destination):
|
||||
for k in source:
|
||||
|
|
@ -128,7 +129,6 @@ class Executor:
|
|||
last_in_copy: Dict[str, Any] = transfer_missing_fields(last_in_obj, dict(generate_record(state_funcs)))
|
||||
last_in_copy: Dict[str, Any] = self.apply_env_proc(sweep_dict, env_processes, last_in_copy)
|
||||
last_in_copy['substep'], last_in_copy['timestep'], last_in_copy['run'] = sub_step, time_step, run
|
||||
|
||||
sL.append(last_in_copy)
|
||||
del last_in_copy
|
||||
|
||||
|
|
@ -142,7 +142,8 @@ class Executor:
|
|||
configs: List[Tuple[List[Callable], List[Callable]]],
|
||||
env_processes: Dict[str, Callable],
|
||||
time_step: int,
|
||||
run: int
|
||||
run: int,
|
||||
kafkaConfig
|
||||
) -> List[Dict[str, Any]]:
|
||||
|
||||
sub_step = 0
|
||||
|
|
@ -159,11 +160,9 @@ class Executor:
|
|||
states_list: List[Dict[str, Any]] = [genesis_states]
|
||||
|
||||
sub_step += 1
|
||||
|
||||
for [s_conf, p_conf] in configs: # tensor field
|
||||
|
||||
states_list: List[Dict[str, Any]] = self.partial_state_update(
|
||||
sweep_dict, sub_step, states_list, simulation_list, s_conf, p_conf, env_processes, time_step, run
|
||||
sweep_dict, sub_step, states_list, simulation_list, s_conf, p_conf, env_processes, time_step, run, kafkaConfig
|
||||
)
|
||||
sub_step += 1
|
||||
|
||||
|
|
@ -179,15 +178,15 @@ class Executor:
|
|||
configs: List[Tuple[List[Callable], List[Callable]]],
|
||||
env_processes: Dict[str, Callable],
|
||||
time_seq: range,
|
||||
run: int
|
||||
run: int,
|
||||
kafkaConfig
|
||||
) -> List[List[Dict[str, Any]]]:
|
||||
|
||||
time_seq: List[int] = [x + 1 for x in time_seq]
|
||||
simulation_list: List[List[Dict[str, Any]]] = [states_list]
|
||||
|
||||
for time_step in time_seq:
|
||||
pipe_run: List[Dict[str, Any]] = self.state_update_pipeline(
|
||||
sweep_dict, simulation_list, configs, env_processes, time_step, run
|
||||
sweep_dict, simulation_list, configs, env_processes, time_step, run, kafkaConfig
|
||||
)
|
||||
|
||||
_, *pipe_run = pipe_run
|
||||
|
|
@ -202,33 +201,53 @@ class Executor:
|
|||
configs: List[Tuple[List[Callable], List[Callable]]],
|
||||
env_processes: Dict[str, Callable],
|
||||
time_seq: range,
|
||||
runs: int
|
||||
runs: int,
|
||||
user_id,
|
||||
session_id,
|
||||
simulation_id,
|
||||
run_id,
|
||||
kafkaConfig
|
||||
) -> List[List[Dict[str, Any]]]:
|
||||
|
||||
def execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run) -> List[Dict[str, Any]]:
|
||||
run += 1
|
||||
|
||||
def generate_init_sys_metrics(genesis_states_list):
|
||||
for d in genesis_states_list:
|
||||
d['run'], d['substep'], d['timestep'] = run, 0, 0
|
||||
d['user_id'], d['simulation_id'], d['session_id'], d['run_id'] = user_id, simulation_id, session_id, run_id
|
||||
yield d
|
||||
|
||||
states_list_copy: List[Dict[str, Any]] = list(generate_init_sys_metrics(deepcopy(states_list)))
|
||||
|
||||
first_timestep_per_run: List[Dict[str, Any]] = self.run_pipeline(
|
||||
sweep_dict, states_list_copy, configs, env_processes, time_seq, run
|
||||
sweep_dict, states_list_copy, configs, env_processes, time_seq, run, kafkaConfig
|
||||
)
|
||||
del states_list_copy
|
||||
|
||||
return first_timestep_per_run
|
||||
|
||||
tp = TPool(runs)
|
||||
pipe_run: List[List[Dict[str, Any]]] = flatten(
|
||||
tp.map(
|
||||
lambda run: execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run),
|
||||
list(range(runs))
|
||||
)
|
||||
pipe_run = flatten(
|
||||
[execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run) for run in range(runs)]
|
||||
)
|
||||
|
||||
tp.clear()
|
||||
return pipe_run
|
||||
|
||||
# ******** Only has one run
|
||||
# pipe_run: List[List[Dict[str, Any]]] = flatten(
|
||||
# sc.parallelize(list(range(runs))).map(
|
||||
# lambda run: execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run)
|
||||
# ).collect()
|
||||
# )
|
||||
|
||||
# print(type(run_id))
|
||||
# print(runs)
|
||||
# tp = TPool(runs)
|
||||
# pipe_run: List[List[Dict[str, Any]]] = flatten(
|
||||
# tp.map(
|
||||
# lambda run: execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run),
|
||||
# list(range(runs))
|
||||
# )
|
||||
# )
|
||||
#
|
||||
# tp.clear()
|
||||
r
|
||||
|
|
|
|||
|
|
@ -17,6 +17,14 @@ def append_dict(dict, new_dict):
|
|||
return dict
|
||||
|
||||
|
||||
def arrange_cols(df, reverse):
|
||||
session_metrics = ['user_id', 'session_id', 'simulation_id', 'run_id']
|
||||
sys_metrics = ['run', 'timestep', 'substep']
|
||||
result_cols = list(set(df.columns) - set(session_metrics) - set(sys_metrics))
|
||||
result_cols.sort(reverse=reverse)
|
||||
return df[session_metrics + sys_metrics + result_cols]
|
||||
|
||||
|
||||
class IndexCounter:
|
||||
def __init__(self):
|
||||
self.i = 0
|
||||
|
|
|
|||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -0,0 +1,145 @@
|
|||
```
|
||||
___ _ __ _ __ __ __
|
||||
/ _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
|
||||
/ // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
|
||||
/____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
|
||||
/ _ \ ____ ___ ___/ /__ __ ____ ___
|
||||
/ ___// __// _ \/ _ // // // __// -_)
|
||||
/_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
|
||||
by Joshua E. Jodesty
|
||||
|
||||
```
|
||||
## What?: *Description*
|
||||
***Distributed Produce*** (**[distroduce](distroduce)**) is a distributed message simulation and throughput benchmarking
|
||||
framework / [cadCAD](https://cadcad.org) execution mode that leverages [Apache Spark](https://spark.apache.org/) and
|
||||
[Apache Kafka Producer](https://kafka.apache.org/documentation/#producerapi) for optimizing Kafka cluster configurations
|
||||
and debugging real-time data transformations. *distroduce* leverages cadCAD's user-defined event simulation template and
|
||||
framework to simulate messages sent to Kafka clusters. This enables rapid and iterative design, debugging, and message
|
||||
publish benchmarking of Kafka clusters and real-time data processing using Kafka Streams and Spark (Structured)
|
||||
Streaming.
|
||||
|
||||
##How?: *A Tail of Two Clusters*
|
||||
***Distributed Produce*** is a Spark Application used as a cadCAD Execution Mode that distributes Kafka Producers,
|
||||
message simulation, and message publishing to worker nodes of an [AWS EMR](https://aws.amazon.com/emr/) cluster.
|
||||
Messages published from these workers are sent to Kafka topics on a Kafka cluster from a Spark bootstrapped EMR cluster.
|
||||
|
||||
##Why?: *Use Case*
|
||||
* **IoT Event / Device Simulation:** Competes with *AWS IoT Device Simulator* and *Azure IoT Solution Acceleration:
|
||||
Device Simulation*. Unlike these products, *Distributed Produce* enables a user-defined state updates and agent actions,
|
||||
as well as message publish benchmarking
|
||||
* **Development Environment for Real-Time Data Processing / Routing:**
|
||||
|
||||
##Get Started:
|
||||
|
||||
### 0. Set Up Local Development Environment: see [Kafka Quickstart](https://kafka.apache.org/quickstart)
|
||||
**a.** Install `pyspark`
|
||||
```bash
|
||||
pip3 install pyspark
|
||||
```
|
||||
**b.** Install & Unzip Kafka, Create Kafka `test` topic, and Start Consumer
|
||||
```bash
|
||||
sh distroduce/configuration/launch_local_kafka.sh
|
||||
```
|
||||
**c.** Run Simulation locally
|
||||
```bash
|
||||
zip -rq distroduce/dist/distroduce.zip distroduce/
|
||||
spark-submit --py-files distroduce/dist/distroduce.zip distroduce/local_messaging_sim.py `hostname | xargs`
|
||||
```
|
||||
|
||||
### 1. Write cadCAD Simulation:
|
||||
* **Simulation Description:**
|
||||
To demonstration of *Distributed Produce*, I implemented a simulation of two users interacting over a messaging service.
|
||||
* **cadCAD Resources:**
|
||||
* [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation)
|
||||
* [Tutorials](https://github.com/BlockScience/cadCAD/tree/master/tutorials)
|
||||
* **Terminology:**
|
||||
* ***[Initial Conditions](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-variables)*** - State Variables and their initial values (Start event of Simulation)
|
||||
```python
|
||||
initial_conditions = {
|
||||
'state_variable_1': 0,
|
||||
'state_variable_2': 0,
|
||||
'state_variable_3': 1.5,
|
||||
'timestamp': '2019-01-01 00:00:00'
|
||||
}
|
||||
```
|
||||
* ***[Policy Functions:](https://github.com/BlockScience/cadCAD/tree/master/documentation#Policy-Functions)*** -
|
||||
computes one or more signals to be passed to State Update Functions
|
||||
```python
|
||||
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig):
|
||||
...
|
||||
return 'state_variable_name', new_value
|
||||
```
|
||||
|
||||
Parameters:
|
||||
* **_params** : `dict` - [System parameters](https://github.com/BlockScience/cadCAD/blob/master/documentation/System_Model_Parameter_Sweep.md)
|
||||
* **substep** : `int` - Current [substep](https://github.com/BlockScience/cadCAD/tree/master/documentation#Substep)
|
||||
* **sH** : `list[list[dict]]` - Historical values of all state variables for the simulation. See
|
||||
[Historical State Access](https://github.com/BlockScience/cadCAD/blob/master/documentation/Historically_State_Access.md) for details
|
||||
* **s** : `dict` - Current state of the system, where the `dict_keys` are the names of the state variables and the
|
||||
`dict_values` are their current values.
|
||||
* **kafkaConfig:** `kafka.KafkaProducer` - Configuration for `kafka-python`
|
||||
[Producer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
|
||||
|
||||
* ***[State Update Functions](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-update-functions):*** -
|
||||
updates state variables change over time
|
||||
```python
|
||||
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig):
|
||||
...
|
||||
return 'state_variable_name', new_value
|
||||
```
|
||||
Parameters:
|
||||
* **_params** : `dict` - [System parameters](https://github.com/BlockScience/cadCAD/blob/master/documentation/System_Model_Parameter_Sweep.md)
|
||||
* **substep** : `int` - Current [substep](https://github.com/BlockScience/cadCAD/tree/master/documentation#Substep)
|
||||
* **sH** : `list[list[dict]]` - Historical values of all state variables for the simulation. See
|
||||
[Historical State Access](https://github.com/BlockScience/cadCAD/blob/master/documentation/Historically_State_Access.md) for details
|
||||
* **s** : `dict` - Current state of the system, where the `dict_keys` are the names of the state variables and the
|
||||
`dict_values` are their current values.
|
||||
* **actions** : `dict` - Aggregation of the signals of all policy functions in the current
|
||||
* **kafkaConfig:** `kafka.KafkaProducer` - Configuration for `kafka-python`
|
||||
[Producer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
|
||||
* ***[Partial State Update Block](https://github.com/BlockScience/cadCAD/tree/master/documentation#State-Variables) (PSUB):*** -
|
||||
a set of State Update Functions and Policy Functions that update state records
|
||||

|
||||
|
||||
**Note:** State Update and Policy Functions now have the additional / undocumented parameter `kafkaConfig`
|
||||
|
||||
**a.** **Define Policy Functions:**
|
||||
* [Example:](distroduce/action_policies.py) Two users interacting on separate chat clients and entering / exiting chat
|
||||
|
||||
**b.** **Define State Update Functions:**
|
||||
* [Example:](distroduce/state_updates.py) Used for logging and maintaining state of user actions defined by policies
|
||||
|
||||
**c.** **Define Initial Conditions & Partial State Update Block:**
|
||||
* **Initial Conditions:** [Example](distroduce/messaging_sim.py)
|
||||
* **Partial State Update Block (PSUB):** [Example](distroduce/simulation.py)
|
||||
|
||||
**d.** **Create Simulation Executor:** Used for running a simulation
|
||||
* [Local](distroduce/local_messaging_sim.py)
|
||||
* [EMR](distroduce/messaging_sim.py)
|
||||
|
||||
### 2. [Configure EMR Cluster](distroduce/configuration/cluster.py)
|
||||
|
||||
### 3. Launch EMR Cluster:
|
||||
**Option A:** Preconfigured Launch
|
||||
```bash
|
||||
python3 distroduce/emr/launch.py
|
||||
```
|
||||
**Option B:** Custom Launch - [Example](distroduce/emr/launch.py)
|
||||
```python
|
||||
from distroduce.emr.launch import launch_cluster
|
||||
from distroduce.configuration.cluster import ec2_attributes, bootstrap_actions, instance_groups, configurations
|
||||
region = 'us-east-1'
|
||||
cluster_name = 'distibuted_produce'
|
||||
launch_cluster(cluster_name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)
|
||||
```
|
||||
|
||||
### 4. Execute Benchmark(s) on EMR:
|
||||
* **Step 1:** ssh unto master node
|
||||
```bash
|
||||
zip -rq distroduce/dist/distroduce.zip distroduce/
|
||||
```
|
||||
* **Step 2:** ssh unto master node
|
||||
* **Step 3:** Spark Submit
|
||||
```
|
||||
spark-submit --master yarn --py-files distroduce.zip messaging_sim.py `hostname | xargs`
|
||||
```
|
||||
|
|
@ -0,0 +1,72 @@
|
|||
from datetime import datetime
|
||||
from kafka import KafkaProducer
|
||||
|
||||
|
||||
'''
|
||||
Function that produces a single actions taken by a user and its accompanying message.
|
||||
|
||||
The results will be combined with a user-defined aggregation function (UDAF)
|
||||
given to the `policy_ops` argument of `cadCAD.configuration.append_configs`.
|
||||
Current UDAF: `policy_ops=[lambda a, b: a + b]`
|
||||
'''
|
||||
def messages(client_id, room, action, _input, sender, receiver=None):
|
||||
return {
|
||||
'types': [action],
|
||||
'messages': [
|
||||
{
|
||||
'client': client_id, 'room': room, 'action': action,
|
||||
'sender': sender, 'receiver': receiver,
|
||||
'input': _input,
|
||||
'created': datetime.now()
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
'''
|
||||
Policy representing a user entering a chat room and its accompanying message
|
||||
'''
|
||||
def enter_action(state, room, user):
|
||||
def f(_g, step, sL, s, kafkaConfig):
|
||||
msgs = messages(state, room, 'enter', f"{user} enters {room}", user)
|
||||
msgs['send_times'] = [0.000000]
|
||||
msgs['msg_counts'] = [len(msgs['messages'])]
|
||||
return msgs
|
||||
return f
|
||||
|
||||
|
||||
'''
|
||||
Policy representing a user sending a message to a receiver within a chat room
|
||||
and its accompanying message
|
||||
|
||||
A Kafka Producer is used to send messages to a Kafka cluster.
|
||||
The configuration of the Kafka Producer via `cadCAD.engine.ExecutionContext`
|
||||
'''
|
||||
def message_actions(state, room, message_input, sender, receiver):
|
||||
msgs = messages(state, room, 'send', message_input, sender, receiver)
|
||||
msgs_list = msgs['messages']
|
||||
def send_action(_g, step, sL, s, kafkaConfig):
|
||||
start_time = datetime.now()
|
||||
for msg in msgs_list:
|
||||
producer: KafkaProducer = kafkaConfig['producer']
|
||||
topic: str = kafkaConfig['send_topic']
|
||||
encoded_msg = str(msg).encode('utf-8')
|
||||
producer.send(topic, encoded_msg)
|
||||
msgs['send_times'] = [(datetime.now() - start_time).total_seconds()]
|
||||
msgs['msg_counts'] = [len(msgs_list)]
|
||||
return msgs
|
||||
|
||||
return send_action
|
||||
|
||||
|
||||
'''
|
||||
Policy representing a user exiting a chat room and its accompanying message
|
||||
'''
|
||||
def exit_action(state, room, user):
|
||||
def f(_g, step, sL, s, kafkaConfig):
|
||||
msgs = messages(state, room, 'exit', f"{user} exited {room}", user)
|
||||
msgs_list = msgs['messages']
|
||||
msgs['send_times'] = [0.000000]
|
||||
msgs['msg_counts'] = [len(msgs_list)]
|
||||
return msgs
|
||||
return f
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
#!/bin/bash
|
||||
pip3 install -r requirements.txt
|
||||
zip -rq distroduce/dist/distroduce.zip distroduce/
|
||||
|
|
@ -0,0 +1,79 @@
|
|||
ec2_attributes = {
|
||||
"KeyName":"joshua-IAM-keypair",
|
||||
"InstanceProfile":"EMR_EC2_DefaultRole",
|
||||
"SubnetId":"subnet-0034e615b047fd112",
|
||||
"EmrManagedSlaveSecurityGroup":"sg-08e546ae27d86d6a3",
|
||||
"EmrManagedMasterSecurityGroup":"sg-08e546ae27d86d6a3"
|
||||
}
|
||||
|
||||
bootstrap_actions = [
|
||||
{
|
||||
"Path":"s3://insightde/emr/bootstraps/distroduce.sh",
|
||||
"Name":"bootstrap"
|
||||
}
|
||||
]
|
||||
|
||||
instance_groups = [
|
||||
{
|
||||
"InstanceCount":5,
|
||||
"EbsConfiguration":
|
||||
{"EbsBlockDeviceConfigs":
|
||||
[
|
||||
{
|
||||
"VolumeSpecification":
|
||||
{"SizeInGB":32,"VolumeType":"gp2"},
|
||||
"VolumesPerInstance":2
|
||||
}
|
||||
]
|
||||
},
|
||||
"InstanceGroupType":"CORE",
|
||||
"InstanceType":"m4.xlarge",
|
||||
"Name":"Core - 2"
|
||||
},
|
||||
{
|
||||
"InstanceCount":1,
|
||||
"EbsConfiguration":
|
||||
{
|
||||
"EbsBlockDeviceConfigs":
|
||||
[
|
||||
{
|
||||
"VolumeSpecification":
|
||||
{"SizeInGB":32,"VolumeType":"gp2"},
|
||||
"VolumesPerInstance":2
|
||||
}
|
||||
]
|
||||
},
|
||||
"InstanceGroupType":"MASTER",
|
||||
"InstanceType":"m4.xlarge",
|
||||
"Name":"Master - 1"
|
||||
}
|
||||
]
|
||||
|
||||
configurations = [
|
||||
{
|
||||
"Classification":"spark-env",
|
||||
"Properties":{},
|
||||
"Configurations":
|
||||
[
|
||||
{
|
||||
"Classification":"export",
|
||||
"Properties":{
|
||||
"PYSPARK_PYTHON": "/usr/bin/python3",
|
||||
"PYSPARK_DRIVER_PYTHON": "/usr/bin/python3"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"Classification":"spark-defaults",
|
||||
"Properties":{
|
||||
"spark.sql.execution.arrow.enabled": "true"
|
||||
}
|
||||
},
|
||||
{
|
||||
"Classification":"spark",
|
||||
"Properties":{
|
||||
"maximizeResourceAllocation":"true"
|
||||
}
|
||||
}
|
||||
]
|
||||
|
|
@ -0,0 +1,23 @@
|
|||
#!/bin/bash
|
||||
cd ~
|
||||
yes | sudo python3 -m pip install --upgrade pip
|
||||
yes | sudo python3 -m pip install pathos kafka-python
|
||||
wget https://raw.githubusercontent.com/JEJodesty/cadCAD/dev/dist/cadCAD-0.0.2-py3-none-any.whl
|
||||
yes | sudo python3 -m pip install cadCAD-0.0.2-py3-none-any.whl
|
||||
|
||||
# check for master node
|
||||
IS_MASTER=false
|
||||
if grep -i isMaster /mnt/var/lib/info/instance.json | grep -i true;
|
||||
then
|
||||
IS_MASTER=true
|
||||
PRIVATE_IP=`hostname -I | xargs`
|
||||
wget https://raw.githubusercontent.com/JEJodesty/cadCAD/dev/distroduce/dist/distroduce.zip
|
||||
wget https://raw.githubusercontent.com/JEJodesty/cadCAD/dev/distroduce/messaging_sim.py
|
||||
sudo sed -i -e '$a\export PYSPARK_PYTHON=/usr/bin/python3' /etc/spark/conf/spark-env.sh
|
||||
wget http://apache.spinellicreations.com/kafka/2.3.0/kafka_2.12-2.3.0.tgz
|
||||
tar -xzf kafka_2.12-2.3.0.tgz
|
||||
cd kafka_2.12-2.3.0
|
||||
bin/zookeeper-server-start.sh config/zookeeper.properties &
|
||||
bin/kafka-server-start.sh config/server.properties &
|
||||
bin/kafka-topics.sh --create --bootstrap-server ${PRIVATE_IP}:9092 --replication-factor 1 --partitions 1 --topic test
|
||||
fi
|
||||
|
|
@ -0,0 +1,83 @@
|
|||
#!/bin/bash
|
||||
# SSH into all machines
|
||||
ssh -i ~/.ssh/joshua-IAM-keypair.pem hadoop@
|
||||
sudo python3 -m pip install --upgrade pip
|
||||
sudo python3 -m pip install pathos kafka-python
|
||||
|
||||
# SetUp Window: head node
|
||||
cd ~
|
||||
#sudo python3 -m pip install --upgrade pip
|
||||
#sudo python3 -m pip install pathos kafka-python
|
||||
sudo sed -i -e '$a\export PYSPARK_PYTHON=/usr/bin/python3' /etc/spark/conf/spark-env.sh
|
||||
wget http://apache.spinellicreations.com/kafka/2.3.0/kafka_2.12-2.3.0.tgz
|
||||
tar -xzf kafka_2.12-2.3.0.tgz
|
||||
cd kafka_2.12-2.3.0
|
||||
bin/zookeeper-server-start.sh config/zookeeper.properties &
|
||||
bin/kafka-server-start.sh config/server.properties &
|
||||
# get ip
|
||||
bin/kafka-topics.sh --create --bootstrap-server 10.0.0.9:9092 --replication-factor 1 --partitions 1 --topic test
|
||||
# bin/kafka-topics.sh --list --bootstrap-server 10.0.0.9:9092
|
||||
|
||||
# Consume (Window): head node
|
||||
bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.9:9092 --topic test --from-beginning
|
||||
# DELETE
|
||||
# bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test
|
||||
|
||||
|
||||
# local
|
||||
python3 setup.py sdist bdist_wheel
|
||||
pip3 install dist/*.whl
|
||||
# SCP: all nodes
|
||||
# S3 duuhhhh
|
||||
# git clone specific file
|
||||
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-18-206-12-181.compute-1.amazonaws.com:/home/hadoop/
|
||||
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-34-239-171-181.compute-1.amazonaws.com:/home/hadoop/
|
||||
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-3-230-154-170.compute-1.amazonaws.com:/home/hadoop/
|
||||
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-18-232-52-219.compute-1.amazonaws.com:/home/hadoop/
|
||||
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-34-231-70-210.compute-1.amazonaws.com:/home/hadoop/
|
||||
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-34-231-243-101.compute-1.amazonaws.com:/home/hadoop/
|
||||
|
||||
|
||||
sudo python3 -m pip install *.whl
|
||||
|
||||
|
||||
# SCP head node
|
||||
# ToDo: zip build for cadCAD OR give py library after whl install
|
||||
scp -i ~/.ssh/joshua-IAM-keypair.pem distroduce/dist/distroduce.zip hadoop@ec2-18-206-12-181.compute-1.amazonaws.com:/home/hadoop/
|
||||
scp -i ~/.ssh/joshua-IAM-keypair.pem distroduce/messaging_sim.py hadoop@ec2-18-206-12-181.compute-1.amazonaws.com:/home/hadoop/
|
||||
#scp -i ~/.ssh/joshua-IAM-keypair.pem dist/distroduce.zip hadoop@ec2-18-232-54-233.compute-1.amazonaws.com:/home/hadoop/
|
||||
#scp -i ~/.ssh/joshua-IAM-keypair.pem examples/event_bench/main.py hadoop@ec2-18-232-54-233.compute-1.amazonaws.com:/home/hadoop/
|
||||
|
||||
|
||||
# Run Window: Head Node
|
||||
#spark-submit --master yarn messaging_app.py
|
||||
spark-submit --master yarn --py-files distroduce.zip main.py
|
||||
|
||||
# Cluster Config
|
||||
#[
|
||||
# {
|
||||
# "Classification": "spark-env",
|
||||
# "Configurations": [
|
||||
# {
|
||||
# "Classification": "export",
|
||||
# "ConfigurationProperties": {
|
||||
# "PYSPARK_PYTHON": "/usr/bin/python3",
|
||||
# "PYSPARK_DRIVER_PYTHON": "/usr/bin/python3"
|
||||
# }
|
||||
# }
|
||||
# ]
|
||||
# },
|
||||
# {
|
||||
# "Classification": "spark-defaults",
|
||||
# "ConfigurationProperties": {
|
||||
# "spark.sql.execution.arrow.enabled": "true"
|
||||
# }
|
||||
# },
|
||||
# {
|
||||
# "Classification": "spark",
|
||||
# "Properties": {
|
||||
# "maximizeResourceAllocation": "true"
|
||||
# }
|
||||
# }
|
||||
#]
|
||||
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
#!/bin/bash
|
||||
aws emr create-cluster \
|
||||
--applications Name=Hadoop Name=Hive Name=Spark \
|
||||
--ec2-attributes '{"KeyName":"joshua-IAM-keypair","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-0034e615b047fd112","EmrManagedSlaveSecurityGroup":"sg-08e546ae27d86d6a3","EmrManagedMasterSecurityGroup":"sg-08e546ae27d86d6a3"}' \
|
||||
--release-label emr-5.26.0 \
|
||||
--log-uri 's3n://aws-logs-251682129355-us-east-1/elasticmapreduce/' \
|
||||
--instance-groups '[{"InstanceCount":5,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"CORE","InstanceType":"m4.xlarge","Name":"Core - 2"},{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"MASTER","InstanceType":"m4.xlarge","Name":"Master - 1"}]' \
|
||||
--configurations '[{"Classification":"spark-env","Properties":{},"Configurations":[{"Classification":"export","Properties":{}}]},{"Classification":"spark-defaults","Properties":{}},{"Classification":"spark","Properties":{"maximizeResourceAllocation":"true"}}]' \
|
||||
--auto-scaling-role EMR_AutoScaling_DefaultRole \
|
||||
--bootstrap-actions '[{"Path":"s3://insightde/emr/bootstraps/distroduce.sh","Name":"bootstrap"}]' \
|
||||
--ebs-root-volume-size 10 \
|
||||
--service-role EMR_DefaultRole \
|
||||
--enable-debugging \
|
||||
--name 'distibuted_produce' \
|
||||
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
|
||||
--region us-east-1
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
#!/bin/bash
|
||||
wget http://apache.spinellicreations.com/kafka/2.3.0/kafka_2.12-2.3.0.tgz
|
||||
tar -xzf kafka_2.12-2.3.0.tgz
|
||||
cd kafka_2.12-2.3.0
|
||||
bin/zookeeper-server-start.sh config/zookeeper.properties &
|
||||
bin/kafka-server-start.sh config/server.properties &
|
||||
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
|
||||
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
#!/bin/bash
|
||||
#spark-submit --master yarn event_bench.py
|
||||
spark-submit --py-files dist/distroduce.zip examples/event_bench/main.py
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
#!/bin/bash
|
||||
PRIVATE_IP=`hostname -I | xargs`
|
||||
spark-submit --master yarn --py-files distroduce.zip messaging_sim.py $PRIVATE_IP
|
||||
|
|
@ -0,0 +1,4 @@
|
|||
#!/bin/bash
|
||||
scp -i ~/.ssh/joshua-IAM-keypair.pem ~/Projects/event-bench/event_bench.py \
|
||||
hadoop@ec2-3-230-158-62.compute-1.amazonaws.com:/home/hadoop/
|
||||
|
||||
Binary file not shown.
|
|
@ -0,0 +1,39 @@
|
|||
import os, json, boto3
|
||||
from typing import List
|
||||
|
||||
|
||||
def launch_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations):
|
||||
def log_uri(name, region):
|
||||
return f's3n://{name}-{region}/elasticmapreduce/'
|
||||
|
||||
os.system(f"""
|
||||
aws emr create-cluster \
|
||||
--applications Name=Hadoop Name=Hive Name=Spark \
|
||||
--ec2-attributes '{json.dumps(ec2_attributes)}' \
|
||||
--release-label emr-5.26.0 \
|
||||
--log-uri '{str(log_uri(name, region))}' \
|
||||
--instance-groups '{json.dumps(instance_groups)}' \
|
||||
--configurations '{json.dumps(configurations)}' \
|
||||
--auto-scaling-role EMR_AutoScaling_DefaultRole \
|
||||
--bootstrap-actions '{json.dumps(bootstrap_actions)}' \
|
||||
--ebs-root-volume-size 10 \
|
||||
--service-role EMR_DefaultRole \
|
||||
--enable-debugging \
|
||||
--name '{name}' \
|
||||
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
|
||||
--region {region}
|
||||
""")
|
||||
|
||||
|
||||
# def benchmark(names: List[str], region, ec2_attributes, bootstrap_actions, instance_groups, configurations):
|
||||
# current_dir = os.path.dirname(__file__)
|
||||
# s3 = boto3.client('s3')
|
||||
# bucket = 'insightde'
|
||||
#
|
||||
# file = 'distroduce.sh'
|
||||
# abs_path = os.path.join(current_dir, file)
|
||||
# key = f'emr/bootstraps/{file}'
|
||||
#
|
||||
# s3.upload_file(abs_path, bucket, key)
|
||||
# for name in names:
|
||||
# launch_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
from distroduce.emr import launch_cluster
|
||||
from distroduce.configuration.cluster import ec2_attributes, bootstrap_actions, instance_groups, configurations
|
||||
region = 'us-east-1'
|
||||
cluster_name = 'distibuted_produce'
|
||||
launch_cluster(cluster_name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)
|
||||
|
|
@ -0,0 +1 @@
|
|||
from distroduce.executor.spark.jobs import distributed_produce
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
from distroduce.spark.session import sc
|
||||
|
||||
|
||||
def distributed_produce(
|
||||
simulation_execs,
|
||||
var_dict_list,
|
||||
states_lists,
|
||||
configs_structs,
|
||||
env_processes_list,
|
||||
Ts,
|
||||
Ns,
|
||||
userIDs,
|
||||
sessionIDs,
|
||||
simulationIDs,
|
||||
runIDs,
|
||||
spark_context=sc,
|
||||
kafka_config=None
|
||||
):
|
||||
func_params_zipped = list(
|
||||
zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list)
|
||||
)
|
||||
func_params_kv = [((t[0], t[1], t[2], t[3]), (t[4], t[5], t[6])) for t in func_params_zipped]
|
||||
def simulate(k, v):
|
||||
from kafka import KafkaProducer
|
||||
prod_config = kafka_config['producer_config']
|
||||
kafka_config['producer'] = KafkaProducer(**prod_config)
|
||||
(sim_exec, config, env_procs) = [f[1] for f in func_params_kv if f[0] == k][0]
|
||||
results = sim_exec(
|
||||
v['var_dict'], v['states_lists'], config, env_procs, v['Ts'], v['Ns'],
|
||||
k[0], k[1], k[2], k[3], kafka_config
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns))
|
||||
val_params_kv = [
|
||||
(
|
||||
(t[0], t[1], t[2], t[3]),
|
||||
{'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}
|
||||
) for t in val_params
|
||||
]
|
||||
results_rdd = spark_context.parallelize(val_params_kv).coalesce(1)
|
||||
|
||||
return list(results_rdd.map(lambda x: simulate(*x)).collect())
|
||||
|
|
@ -0,0 +1,46 @@
|
|||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
from cadCAD import configs
|
||||
from cadCAD.configuration.utils import config_sim
|
||||
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
|
||||
|
||||
from distroduce.simulation import main, sim_composition
|
||||
from distroduce.spark.session import sc
|
||||
from distroduce.executor.spark import distributed_produce
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Initial States
|
||||
initial_conditions = {
|
||||
'record_creation': datetime.now(),
|
||||
'client_a': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0},
|
||||
'client_b': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0},
|
||||
'total_msg_count': 0,
|
||||
'total_send_time': 0.000000
|
||||
}
|
||||
|
||||
'''
|
||||
Simulation Configuration:
|
||||
N = Simulation Runs
|
||||
T = Timesteps for each Partial State Update Block
|
||||
'''
|
||||
sim_config = config_sim(
|
||||
{
|
||||
"N": 1,
|
||||
"T": range(10),
|
||||
}
|
||||
)
|
||||
|
||||
# Configuration for Kafka Producer
|
||||
kafkaConfig = {
|
||||
'send_topic': 'test',
|
||||
'producer_config': {
|
||||
'bootstrap_servers': f'{sys.argv[1]}:9092',
|
||||
'acks': 'all'
|
||||
}
|
||||
}
|
||||
exec_mode = ExecutionMode()
|
||||
dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc, method=distributed_produce, kafka_config=kafkaConfig)
|
||||
run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc)
|
||||
|
||||
main(run, sim_config, initial_conditions, sim_composition)
|
||||
|
|
@ -0,0 +1,46 @@
|
|||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
from cadCAD import configs
|
||||
from cadCAD.configuration.utils import config_sim
|
||||
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
|
||||
|
||||
from distroduce.simulation import main, sim_composition
|
||||
from distroduce.spark.session import sc
|
||||
from distroduce.executor.spark import distributed_produce
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Initial States
|
||||
initial_conditions = {
|
||||
'record_creation': datetime.now(),
|
||||
'client_a': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0},
|
||||
'client_b': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0},
|
||||
'total_msg_count': 0,
|
||||
'total_send_time': 0.000000
|
||||
}
|
||||
|
||||
'''
|
||||
Simulation Configuration:
|
||||
N = Simulation Runs
|
||||
T = Timesteps for each Partial State Update Block
|
||||
'''
|
||||
sim_config = config_sim(
|
||||
{
|
||||
"N": 1,
|
||||
"T": range(5000),
|
||||
}
|
||||
)
|
||||
|
||||
# Configuration for Kafka Producer
|
||||
kafkaConfig = {
|
||||
'send_topic': 'test',
|
||||
'producer_config': {
|
||||
'bootstrap_servers': f'{sys.argv[1]}:9092',
|
||||
'acks': 'all'
|
||||
}
|
||||
}
|
||||
exec_mode = ExecutionMode()
|
||||
dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc, method=distributed_produce, kafka_config=kafkaConfig)
|
||||
run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc)
|
||||
|
||||
main(run, sim_config, initial_conditions, sim_composition)
|
||||
|
|
@ -0,0 +1,87 @@
|
|||
import sys
|
||||
from pprint import pprint
|
||||
|
||||
import pandas as pd
|
||||
from tabulate import tabulate
|
||||
|
||||
from cadCAD.utils import arrange_cols
|
||||
from cadCAD.configuration import append_configs
|
||||
|
||||
from distroduce.action_policies import enter_action, message_actions, exit_action
|
||||
from distroduce.state_updates import send_message, count_messages, add_send_time, current_time
|
||||
|
||||
# State Updates
|
||||
variables = {
|
||||
'client_a': send_message('client_a'),
|
||||
'client_b': send_message('client_b'),
|
||||
'total_msg_count': count_messages,
|
||||
'total_send_time': add_send_time,
|
||||
'record_creation': current_time('record_creation')
|
||||
}
|
||||
|
||||
# Action Policies
|
||||
policy_group_1 = {
|
||||
"action_1": enter_action('server', 'room_1', 'A'),
|
||||
"action_2": enter_action('server', 'room_1', 'B')
|
||||
}
|
||||
|
||||
policy_group_2 = {
|
||||
"action_1": message_actions('client_A', 'room_1', "Hi B", 'A', 'B'),
|
||||
"action_2": message_actions('client_B', 'room_1', "Hi A", 'B', 'A')
|
||||
}
|
||||
|
||||
policy_group_3 = {
|
||||
"action_1": message_actions('client_A', 'room_1', "Bye B", 'A', 'B'),
|
||||
"action_2": message_actions('client_B', 'room_1', "Bye A", 'B', 'A')
|
||||
}
|
||||
|
||||
policy_group_4 = {
|
||||
"action_1": exit_action('server', 'room_1', 'A'),
|
||||
"action_2": exit_action('server', 'room_1', 'B')
|
||||
}
|
||||
|
||||
policy_groups = [policy_group_1, policy_group_2, policy_group_3, policy_group_4]
|
||||
# Partial State update Block
|
||||
sim_composition = [{'policies': policy_group, 'variables': variables} for policy_group in policy_groups]
|
||||
|
||||
|
||||
def main(executor, sim_config, intitial_conditions, sim_composition):
|
||||
append_configs(
|
||||
user_id='Joshua',
|
||||
sim_configs=sim_config,
|
||||
initial_state=intitial_conditions,
|
||||
partial_state_update_blocks=sim_composition,
|
||||
policy_ops=[lambda a, b: a + b]
|
||||
)
|
||||
pprint(sim_composition)
|
||||
|
||||
i = 0
|
||||
for raw_result, tensor_field in executor.execute():
|
||||
result = arrange_cols(pd.DataFrame(raw_result), False)
|
||||
metrics_result = result[
|
||||
[
|
||||
'run_id', 'timestep', 'substep',
|
||||
'record_creation', 'total_msg_count', 'total_send_time'
|
||||
]
|
||||
]
|
||||
msgs_result = result[
|
||||
[
|
||||
'run_id', 'timestep', 'substep',
|
||||
'record_creation',
|
||||
'client_a', 'client_b'
|
||||
]
|
||||
]
|
||||
print()
|
||||
if i == 0:
|
||||
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
|
||||
last = metrics_result.tail(1)
|
||||
last['msg_per_sec'] = last['total_msg_count'] / last['total_send_time']
|
||||
print("Messages Output: Head")
|
||||
print(tabulate(msgs_result.head(5), headers='keys', tablefmt='psql'))
|
||||
print("Metrics Output: Head")
|
||||
print(tabulate(metrics_result.head(5), headers='keys', tablefmt='psql'))
|
||||
print("Metrics Output: Tail")
|
||||
print(tabulate(metrics_result.tail(5), headers='keys', tablefmt='psql'))
|
||||
print(tabulate(last, headers='keys', tablefmt='psql'))
|
||||
print()
|
||||
i += 1
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
from pyspark.sql import SparkSession
|
||||
from pyspark.context import SparkContext
|
||||
import os
|
||||
|
||||
# os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3'
|
||||
# os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3'
|
||||
# os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'
|
||||
|
||||
spark = SparkSession\
|
||||
.builder\
|
||||
.appName("distroduce")\
|
||||
.getOrCreate()
|
||||
|
||||
sc: SparkContext = spark.sparkContext
|
||||
print(f"Spark UI: {sc.uiWebUrl}")
|
||||
print()
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
from copy import deepcopy
|
||||
from datetime import datetime
|
||||
from functools import reduce
|
||||
|
||||
add = lambda a, b: a + b
|
||||
|
||||
|
||||
'''
|
||||
Function that maintains the state of users in a chat room
|
||||
'''
|
||||
def update_users(users, actions, action_types=['send','enter','exit']):
|
||||
users = deepcopy(users)
|
||||
for action_type in action_types:
|
||||
if action_type in actions['types']:
|
||||
for msg in actions['messages']:
|
||||
if msg['action'] == 'send' and action_type == 'send':
|
||||
continue
|
||||
elif msg['action'] == 'enter' and action_type == 'enter':
|
||||
for user in msg['sender']:
|
||||
users.append(user) # register_entered
|
||||
elif msg['action'] == 'exit' and action_type == 'exit':
|
||||
for user in msg['sender']:
|
||||
users.remove(user) # remove_exited
|
||||
return users
|
||||
|
||||
|
||||
'''
|
||||
State Update used to counts sent messages
|
||||
'''
|
||||
def count_messages(_g, step, sL, s, actions, kafkaConfig):
|
||||
return 'total_msg_count', s['total_msg_count'] + reduce(add, actions['msg_counts'])
|
||||
|
||||
|
||||
'''
|
||||
State Update used to create to log message events
|
||||
'''
|
||||
def send_message(state):
|
||||
return lambda _g, step, sL, s, actions, kafkaConfig: (
|
||||
state,
|
||||
{
|
||||
'users': update_users(s[state]['users'], actions),
|
||||
'messages': actions['messages'],
|
||||
'msg_counts': reduce(add, actions['msg_counts']),
|
||||
'send_times': reduce(add, actions['send_times'])
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
'''
|
||||
State Update used to sum the time taken for the Kafka Producer to send messages between users
|
||||
'''
|
||||
def add_send_time(_g, step, sL, s, actions, kafkaConfig):
|
||||
return 'total_send_time', s['total_send_time'] + reduce(add, actions['send_times'])
|
||||
|
||||
|
||||
'''
|
||||
State Update used to record the event record creation time
|
||||
'''
|
||||
def current_time(state):
|
||||
return lambda _g, step, sL, s, actions, kafkaConfig: (state, datetime.now())
|
||||
|
|
@ -4,3 +4,5 @@ pathos
|
|||
fn
|
||||
tabulate
|
||||
funcy
|
||||
pyspark
|
||||
kafka-python
|
||||
|
|
|
|||
13
setup.py
13
setup.py
|
|
@ -19,7 +19,7 @@ provided.
|
|||
"""
|
||||
|
||||
setup(name='cadCAD',
|
||||
version='0.3.0',
|
||||
version='0.0.2',
|
||||
description="cadCAD: a differential games based simulation software package for research, validation, and \
|
||||
Computer Aided Design of economic systems",
|
||||
long_description=long_description,
|
||||
|
|
@ -27,5 +27,14 @@ setup(name='cadCAD',
|
|||
author='Joshua E. Jodesty',
|
||||
author_email='joshua@block.science, joshua.jodesty@gmail.com',
|
||||
license='LICENSE.txt',
|
||||
packages=find_packages()
|
||||
packages=find_packages(),
|
||||
install_requires=[
|
||||
"pandas",
|
||||
"wheel",
|
||||
"pathos",
|
||||
"fn",
|
||||
"tabulate",
|
||||
"funcy",
|
||||
"kafka-python"
|
||||
]
|
||||
)
|
||||
|
|
@ -152,6 +152,7 @@ sim_config = config_sim(
|
|||
)
|
||||
|
||||
append_configs(
|
||||
user_id='user_a',
|
||||
sim_configs=sim_config,
|
||||
initial_state=genesis_states,
|
||||
env_processes=env_processes,
|
||||
|
|
|
|||
|
|
@ -140,6 +140,7 @@ sim_config = config_sim(
|
|||
)
|
||||
|
||||
append_configs(
|
||||
user_id='user_b',
|
||||
sim_configs=sim_config,
|
||||
initial_state=genesis_states,
|
||||
env_processes=env_processes,
|
||||
|
|
|
|||
|
|
@ -144,6 +144,7 @@ sim_config = config_sim(
|
|||
# New Convention
|
||||
partial_state_update_blocks = psub_list(psu_block, psu_steps)
|
||||
append_configs(
|
||||
user_id='user_a',
|
||||
sim_configs=sim_config,
|
||||
initial_state=genesis_states,
|
||||
seeds=seeds,
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ from typing import List
|
|||
from tabulate import tabulate
|
||||
# The following imports NEED to be in the exact order
|
||||
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
|
||||
from cadCAD.utils import arrange_cols
|
||||
from simulations.regression_tests import config1
|
||||
from cadCAD import configs
|
||||
|
||||
|
|
@ -14,8 +15,11 @@ first_config = configs # only contains config1
|
|||
single_proc_ctx = ExecutionContext(context=exec_mode.single_proc)
|
||||
run = Executor(exec_context=single_proc_ctx, configs=first_config)
|
||||
|
||||
# print(set(result.columns) - set(['user_id', 'session_id', 'simulation_id', 'run_id']) - set(['run', 'timestep', 'substep']))
|
||||
# print(['run', 'timestep', 'substep'])
|
||||
|
||||
raw_result, tensor_field = run.execute()
|
||||
result = pd.DataFrame(raw_result)
|
||||
result = arrange_cols(pd.DataFrame(raw_result), False)
|
||||
print()
|
||||
print("Tensor Field: config1")
|
||||
# print(raw_result)
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ import pandas as pd
|
|||
from tabulate import tabulate
|
||||
# The following imports NEED to be in the exact order
|
||||
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
|
||||
from cadCAD.utils import arrange_cols
|
||||
from simulations.regression_tests import config2
|
||||
from cadCAD import configs
|
||||
|
||||
|
|
@ -9,15 +10,15 @@ exec_mode = ExecutionMode()
|
|||
|
||||
print("Simulation Execution: Single Configuration")
|
||||
print()
|
||||
first_config = configs # only contains config2
|
||||
single_proc_ctx = ExecutionContext(context=exec_mode.single_proc)
|
||||
run = Executor(exec_context=single_proc_ctx, configs=first_config)
|
||||
single_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
|
||||
run = Executor(exec_context=single_proc_ctx, configs=configs)
|
||||
|
||||
raw_result, tensor_field = run.execute()
|
||||
result = pd.DataFrame(raw_result)
|
||||
print()
|
||||
print("Tensor Field: config1")
|
||||
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
|
||||
print("Output:")
|
||||
print(tabulate(result, headers='keys', tablefmt='psql'))
|
||||
print()
|
||||
for raw_result, tensor_field in run.execute():
|
||||
result = arrange_cols(pd.DataFrame(raw_result), False)
|
||||
print()
|
||||
# print("Tensor Field: " + config_names[i])
|
||||
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
|
||||
print("Output:")
|
||||
print(tabulate(result, headers='keys', tablefmt='psql'))
|
||||
print()
|
||||
# i += 1
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ import pandas as pd
|
|||
from tabulate import tabulate
|
||||
# The following imports NEED to be in the exact order
|
||||
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
|
||||
from cadCAD.utils import arrange_cols
|
||||
from simulations.regression_tests import config1, config2
|
||||
from cadCAD import configs
|
||||
|
||||
|
|
@ -15,9 +16,9 @@ run = Executor(exec_context=multi_proc_ctx, configs=configs)
|
|||
i = 0
|
||||
config_names = ['config1', 'config2']
|
||||
for raw_result, tensor_field in run.execute():
|
||||
result = pd.DataFrame(raw_result)
|
||||
result = arrange_cols(pd.DataFrame(raw_result), False)
|
||||
print()
|
||||
print(f"Tensor Field: {config_names[i]}")
|
||||
# print(f"Tensor Field: {config_names[i]}")
|
||||
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
|
||||
print("Output:")
|
||||
print(tabulate(result, headers='keys', tablefmt='psql'))
|
||||
|
|
|
|||
|
|
@ -14,11 +14,11 @@ multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
|
|||
run = Executor(exec_context=multi_proc_ctx, configs=configs)
|
||||
|
||||
i = 0
|
||||
config_names = ['sweep_config_A', 'sweep_config_B']
|
||||
# config_names = ['sweep_config_A', 'sweep_config_B']
|
||||
for raw_result, tensor_field in run.execute():
|
||||
result = pd.DataFrame(raw_result)
|
||||
print()
|
||||
print("Tensor Field: " + config_names[i])
|
||||
# print("Tensor Field: " + config_names[i])
|
||||
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
|
||||
print("Output:")
|
||||
print(tabulate(result, headers='keys', tablefmt='psql'))
|
||||
|
|
|
|||
Loading…
Reference in New Issue