Compare commits

...

29 Commits

Author SHA1 Message Date
Joshua E. Jodesty 8503ec1d6b semi-final _ 2019-10-03 18:50:34 -04:00
Joshua E. Jodesty c7c68a1abe semi-final to pr 2019-10-03 15:31:31 -04:00
Joshua E. Jodesty 8d5657cb6f readme pt. 3: new build 2019-10-03 12:33:14 -04:00
Joshua E. Jodesty ea5df50659 readme pt. 3: new build 2019-10-03 12:29:34 -04:00
Joshua E. Jodesty 8cf3b09582 readme pt. 2: new build 2019-10-03 11:28:58 -04:00
Joshua E. Jodesty 0a9b980ad7 readme pt. 1 2019-10-03 09:57:41 -04:00
Joshua E. Jodesty 2a8c1d5e8f added cluster launch 2019-10-02 22:05:32 -04:00
Joshua E. Jodesty 2f0316e5f0 fixed bootstrap & parameterized private_ip 2019-10-02 11:34:57 -04:00
Joshua E. Jodesty 0c90e2048f fixed bootstrap & parameterized private_ip 2019-10-02 11:33:24 -04:00
Joshua E. Jodesty 8897c1f973 fixed bootstrap & parameterized private_ip 2019-10-02 11:32:45 -04:00
Joshua E. Jodesty 5f25cb95fb MI6 2019-10-02 10:58:19 -04:00
Joshua E. Jodesty 6c235f9914 MI5 2019-10-01 21:09:22 -04:00
Joshua E. Jodesty cdfc4b474f deleted init integration - 'distributed_produce' 2019-10-01 12:03:43 -04:00
Joshua E. Jodesty 9fc12e7cce about to delete init integration - 'distributed_produce' 2019-10-01 12:03:07 -04:00
Joshua E. Jodesty 239cf94cae cleanup pt. 2: deleted more complicated messaging simulation 2019-10-01 08:25:44 -04:00
Joshua E. Jodesty 964c0b9123 cleanup pt. 1 2019-10-01 08:16:07 -04:00
Joshua E. Jodesty ff44b0bacf parameterized execution mone 2019-10-01 07:52:56 -04:00
Joshua E. Jodesty ca2a9db8ff missing chat config 2019-09-27 12:43:30 -04:00
Joshua E. Jodesty 9b8b1ba1a0 integration works knock on wood pt.2 2019-09-26 14:53:21 -04:00
Joshua E. Jodesty a7b01c1d07 integration works knock on wood pt.2 2019-09-26 14:51:05 -04:00
Joshua E. Jodesty cef6e652b3 integration works knock on wood 2019-09-26 14:16:13 -04:00
Joshua E. Jodesty 50f9dd9c40 ERROR: objc[5187]: +[__NSPlaceholderDictionary initialize] 2019-09-26 10:50:24 -04:00
Joshua E. Jodesty db2bc8c8b8 distroduce integration pt 1 in prog 2019-09-25 21:56:39 -04:00
Joshua E. Jodesty 71c334d5f6 in prog 2019-09-25 14:33:22 -04:00
Joshua E. Jodesty 662046f34f init distroduce 2019-09-24 12:15:43 -04:00
Joshua E. Jodesty b9c7775d07 what? 2019-09-24 12:05:43 -04:00
Joshua E. Jodesty fd0de2d1c0 dev2 2019-09-24 12:00:55 -04:00
Joshua E. Jodesty cae3ccb119 merge update desc from fork staging 2019-09-07 19:14:39 -04:00
Joshua E. Jodesty 903069f23b init 2019-09-07 17:39:10 -04:00
53 changed files with 1286 additions and 255 deletions

8
.gitignore vendored
View File

@ -3,7 +3,7 @@ jupyter notebook
.ipynb_checkpoints
.DS_Store
.idea
.pytest_cache/
.pytest_cache
notebooks
*.egg-info
__pycache__
@ -27,4 +27,8 @@ testing/udo_test.py
Simulation.md
monkeytype.sqlite3
monkeytype.sqlite3
distributed_produce/bash/
distributed_produce/notes.txt
notes.txt

279
README.md
View File

@ -1,173 +1,142 @@
```
__________ ____
________ __ _____/ ____/ | / __ \
/ ___/ __` / __ / / / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/ |_/_____/
by BlockScience
___ _ __ _ __ __ __
/ _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
/ // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
/____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
/ _ \ ____ ___ ___/ /__ __ ____ ___
/ ___// __// _ \/ _ // // // __// -_)
/_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
by Joshua E. Jodesty
```
## What?: *Description*
***Distributed Produce*** (**[distroduce](distroduce)**) is a message simulation and throughput benchmarking framework /
[cadCAD](https://cadcad.org) execution mode that leverages [Apache Spark](https://spark.apache.org/) and
[Apache Kafka Producer](https://kafka.apache.org/documentation/#producerapi) for optimizing Kafka cluster configurations
and debugging real-time data transformations. *distroduce* leverages cadCAD's user-defined event simulation template and
framework to simulate messages sent to Kafka clusters. This enables rapid and iterative design, debugging, and message
publish benchmarking of Kafka clusters and real-time data processing using Kafka Streams and Spark (Structured)
Streaming.
**Introduction:**
##How?: *A Tail of Two Clusters*
***Distributed Produce*** is a Spark Application used as a cadCAD Execution Mode that distributes Kafka Producers,
message simulation, and message publishing to worker nodes of an EMR cluster. Messages published from these workers are
sent to Kafka topics on a Kafka cluster from a Spark bootstrapped EMR cluster.
***cadCAD*** is a Python library that assists in the processes of designing, testing and validating complex systems through
simulation. At its core, cadCAD is a differential games engine that supports parameter sweeping and Monte Carlo analyses
and can be easily integrated with other scientific computing Python modules and data science workflows.
##Why?: *Use Case*
* **IoT Event / Device Simulation:** Competes with *AWS IoT Device Simulator* and *Azure IoT Solution Acceleration:
Device Simulation*. Unlike these products, *Distributed Produce* enables a user-defined state updates and agent actions,
as well as message publish benchmarking
* **Development Environment for Real-Time Data Processing / Routing:**
**Description:**
##Get Started:
cadCAD (complex adaptive systems computer-aided design) is a python based, unified modeling framework for stochastic
dynamical systems and differential games for research, validation, and Computer Aided Design of economic systems created
by BlockScience. It is capable of modeling systems at all levels of abstraction from Agent Based Modeling (ABM) to
System Dynamics (SD), and enabling smooth integration of computational social science simulations with empirical data
science workflows.
An economic system is treated as a state-based model and defined through a set of endogenous and exogenous state
variables which are updated through mechanisms and environmental processes, respectively. Behavioral models, which may
be deterministic or stochastic, provide the evolution of the system within the action space of the mechanisms.
Mathematical formulations of these economic games treat agent utility as derived from the state rather than direct from
an action, creating a rich, dynamic modeling framework. Simulations may be run with a range of initial conditions and
parameters for states, behaviors, mechanisms, and environmental processes to understand and visualize network behavior
under various conditions. Support for A/B testing policies, Monte Carlo analysis, and other common numerical methods is
provided.
For example, cadCAD tool allows us to represent a companys or communitys current business model along with a desired
future state and helps make informed, rigorously tested decisions on how to get from todays stage to the future state.
It allows us to use code to solidify our conceptualized ideas and see if the outcome meets our expectations. We can
iteratively refine our work until we have constructed a model that closely reflects reality at the start of the model,
and see how it evolves. We can then use these results to inform business decisions.
#### Documentation:
* ##### [Tutorials](tutorials)
* ##### [System Model Configuration](documentation/Simulation_Configuration.md)
* ##### [System Simulation Execution](documentation/Simulation_Execution.md)
* ##### [Policy Aggregation](documentation/Policy_Aggregation.md)
* ##### [System Model Parameter Sweep](documentation/System_Model_Parameter_Sweep.md)
#### 0. Installation:
**Python 3.6.5** :: Anaconda, Inc.
**Option A:** Build From Source
### 0. Set Up Local Development Environment: see [Kafka Quickstart](https://kafka.apache.org/quickstart)
**a.** Install `pyspark`
```bash
pip3 install -r requirements.txt
python3 setup.py sdist bdist_wheel
pip3 install dist/*.whl
```
**Option B:** Proprietary Build Access
***IMPORTANT NOTE:*** Tokens are issued to those with access to proprietary builds of cadCAD and BlockScience employees **ONLY**.
Replace \<TOKEN\> with an issued token in the script below.
pip3 install pyspark
```
**b.** Install & Unzip Kafka, Create Kafka `test` topic, and Start Consumer
```bash
pip3 install pandas pathos fn funcy tabulate
pip3 install cadCAD --extra-index-url https://<TOKEN>@repo.fury.io/blockscience/
sh distroduce/configuration/launch_local_kafka.sh
```
**c.** Run Simulation locally
```bash
zip -rq distroduce/dist/distroduce.zip distroduce/
spark-submit --py-files distroduce/dist/distroduce.zip distroduce/local_messaging_sim.py `hostname | xargs`
```
### 1. Write cadCAD Simulation:
* **Simulation Description:**
To demonstration of *Distributed Produce*, I implemented a simulation of two users interacting over a messaging service.
* **Resources**
* [cadCAD Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation)
* [cadCAD Tutorials](https://github.com/BlockScience/cadCAD/tree/master/tutorials)
* **Terminology:**
* ***[Initial Conditions](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-variables)*** -
State Variables and their initial values (Start event of Simulation)
```python
initial_conditions = {
'state_variable_1': 0,
'state_variable_2': 0,
'state_variable_3': 1.5,
'timestamp': '2019-01-01 00:00:00'
}
```
* ***[Policy Functions:](https://github.com/BlockScience/cadCAD/tree/master/documentation#Policy-Functions)*** -
computes one or more signals to be passed to State Update Functions
```python
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig):
...
return 'state_variable_name', new_value
```
Parameters:
* **_params** : `dict` - [System parameters](https://github.com/BlockScience/cadCAD/blob/master/documentation/System_Model_Parameter_Sweep.md)
* **substep** : `int` - Current [substep](https://github.com/BlockScience/cadCAD/tree/master/documentation#Substep)
* **sH** : `list[list[dict]]` - Historical values of all state variables for the simulation. See
[Historical State Access](https://github.com/BlockScience/cadCAD/blob/master/documentation/Historically_State_Access.md) for details
* **s** : `dict` - Current state of the system, where the `dict_keys` are the names of the state variables and the
`dict_values` are their current values.
* **kafkaConfig:** `kafka.KafkaProducer` - Configuration for `kafka-python`
[Producer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
* ***[State Update Functions](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-update-functions):*** -
updates state variables change over time
```python
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig):
...
return 'state_variable_name', new_value
```
Parameters:
* **_params** : `dict` - [System parameters](https://github.com/BlockScience/cadCAD/blob/master/documentation/System_Model_Parameter_Sweep.md)
* **substep** : `int` - Current [substep](https://github.com/BlockScience/cadCAD/tree/master/documentation#Substep)
* **sH** : `list[list[dict]]` - Historical values of all state variables for the simulation. See
[Historical State Access](https://github.com/BlockScience/cadCAD/blob/master/documentation/Historically_State_Access.md) for details
* **s** : `dict` - Current state of the system, where the `dict_keys` are the names of the state variables and the
`dict_values` are their current values.
* **actions** : `dict` - Aggregation of the signals of all policy functions in the current
* **kafkaConfig:** `kafka.KafkaProducer` - Configuration for `kafka-python`
[Producer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
* ***[Partial State Update Block](https://github.com/BlockScience/cadCAD/tree/master/documentation#State-Variables) (PSUB):*** -
a set of State Update Functions and Policy Functions that update state records
![](https://i.imgur.com/9rlX9TG.png)
#### 1. [Configure System Model](documentation/Simulation_Configuration.md)
**Note:** State Update and Policy Functions now have the additional / undocumented parameter `kafkaConfig`
#### 2. [Execute Simulations:](documentation/Simulation_Execution.md)
**a.** **Define Policy Functions:**
* [Example:](distroduce/action_policies.py) Two users interacting on separate chat clients and entering / exiting chat
##### Single Process Execution:
Example System Model Configurations:
* [System Model A](documentation/examples/sys_model_A.py):
`/documentation/examples/sys_model_A.py`
* [System Model B](documentation/examples/sys_model_B.py):
`/documentation/examples/sys_model_B.py`
**b.** **Define State Update Functions:**
* [Example:](distroduce/state_updates.py) Used for logging and maintaining state of user actions defined by policies
Example Simulation Executions:
* [System Model A](documentation/examples/sys_model_A_exec.py):
`/documentation/examples/sys_model_A_exec.py`
* [System Model B](documentation/examples/sys_model_B_exec.py):
`/documentation/examples/sys_model_B_exec.py`
**c.** **Define Initial Conditions & Partial State Update Block:**
* **Initial Conditions:** [Example](distroduce/messaging_sim.py)
* **Partial State Update Block (PSUB):** [Example](distroduce/simulation.py)
**d.** **Create Simulation Executor:** Used for running a simulation
* [Local](distroduce/local_messaging_sim.py)
* [EMR](distroduce/messaging_sim.py)
### 2. [Configure EMR Cluster](distroduce/configuration/cluster.py)
### 3. Launch EMR Cluster:
**Option A:** Preconfigured Launch
```bash
python3 distroduce/emr/launch.py
```
**Option B:** Custom Launch - [Example](distroduce/emr/launch.py)
```python
import pandas as pd
from tabulate import tabulate
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from documentation.examples import sys_model_A
from cadCAD import configs
exec_mode = ExecutionMode()
# Single Process Execution using a Single System Model Configuration:
# sys_model_A
sys_model_A = [configs[0]] # sys_model_A
single_proc_ctx = ExecutionContext(context=exec_mode.single_proc)
sys_model_A_simulation = Executor(exec_context=single_proc_ctx, configs=sys_model_A)
sys_model_A_raw_result, sys_model_A_tensor_field = sys_model_A_simulation.execute()
sys_model_A_result = pd.DataFrame(sys_model_A_raw_result)
print()
print("Tensor Field: sys_model_A")
print(tabulate(sys_model_A_tensor_field, headers='keys', tablefmt='psql'))
print("Result: System Events DataFrame")
print(tabulate(sys_model_A_result, headers='keys', tablefmt='psql'))
print()
from distroduce.emr.launch import launch_cluster
from distroduce.configuration.cluster import ec2_attributes, bootstrap_actions, instance_groups, configurations
region = 'us-east-1'
cluster_name = 'distibuted_produce'
launch_cluster(cluster_name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)
```
##### Multiple Simulations (Concurrent):
###### Multiple Simulation Execution (Multi Process Execution)
System Model Configurations:
* [System Model A](documentation/examples/sys_model_A.py):
`/documentation/examples/sys_model_A.py`
* [System Model B](documentation/examples/sys_model_B.py):
`/documentation/examples/sys_model_B.py`
[Example Simulation Executions:](documentation/examples/sys_model_AB_exec.py)
`/documentation/examples/sys_model_AB_exec.py`
```python
import pandas as pd
from tabulate import tabulate
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from documentation.examples import sys_model_A, sys_model_B
from cadCAD import configs
exec_mode = ExecutionMode()
# # Multiple Processes Execution using Multiple System Model Configurations:
# # sys_model_A & sys_model_B
multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
sys_model_AB_simulation = Executor(exec_context=multi_proc_ctx, configs=configs)
i = 0
config_names = ['sys_model_A', 'sys_model_B']
for sys_model_AB_raw_result, sys_model_AB_tensor_field in sys_model_AB_simulation.execute():
sys_model_AB_result = pd.DataFrame(sys_model_AB_raw_result)
print()
print(f"Tensor Field: {config_names[i]}")
print(tabulate(sys_model_AB_tensor_field, headers='keys', tablefmt='psql'))
print("Result: System Events DataFrame:")
print(tabulate(sys_model_AB_result, headers='keys', tablefmt='psql'))
print()
i += 1
```
##### Parameter Sweep Simulation (Concurrent):
[Example:](documentation/examples/param_sweep.py)
`/documentation/examples/param_sweep.py`
```python
import pandas as pd
from tabulate import tabulate
# The following imports NEED to be in the exact order
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from documentation.examples import param_sweep
from cadCAD import configs
exec_mode = ExecutionMode()
multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
run = Executor(exec_context=multi_proc_ctx, configs=configs)
for raw_result, tensor_field in run.execute():
result = pd.DataFrame(raw_result)
print()
print("Tensor Field:")
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))
print()
```
### 4. Execute Benchmark(s):
* **Step 1:** ssh unto master node
* **Step 2:** Spark Submit
```
spark-submit --master yarn --py-files distroduce.zip messaging_sim.py `hostname | xargs`
```

38
ascii_art.py Normal file
View File

@ -0,0 +1,38 @@
text = r'''
Complex Adaptive Dynamics
o i e
m d s
p e i
u d g
t n
e
r
'''
block_letters = r'''
__________ ____
________ __ _____/ ____/ | / __ \
/ ___/ __` / __ / / / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/ |_/_____/
by BlockScience
'''
production = r'''
__________ ____
________ __ _____/ ____/ | / __ \
/ ___/ __` / __ / / / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/ |_/_____/
by BlockScience
======================================
Complex Adaptive Dynamics
o i e
m d s
p e i
u d g
t n
e
r
'''

View File

@ -1,15 +0,0 @@
Complex Adaptive Dynamics
o i e
m d s
p e i
u d g
t n
e
r
__________ ____
________ __ _____/ ____/ | / __ \
/ ___/ __` / __ / / / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/ |_/_____/
by BlockScience

View File

@ -1,2 +1,20 @@
name = "cadCAD"
configs = []
configs = []
print(r'''
__________ ____
________ __ _____/ ____/ | / __ \
/ ___/ __` / __ / / / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/ |_/_____/
by BlockScience
======================================
Complex Adaptive Dynamics
o i e
m d s
p e i
u d g
t n
e
r
''')

View File

@ -1,7 +1,9 @@
from copy import deepcopy
from typing import Dict, Callable, List, Tuple
from functools import reduce
import pandas as pd
from pandas.core.frame import DataFrame
# import cloudpickle, pickle
from cadCAD import configs
from cadCAD.utils import key_filter
@ -11,9 +13,9 @@ from cadCAD.configuration.utils.depreciationHandler import sanitize_partial_stat
class Configuration(object):
def __init__(self, sim_config={}, initial_state={}, seeds={}, env_processes={},
def __init__(self, user_id, sim_config={}, initial_state={}, seeds={}, env_processes={},
exogenous_states={}, partial_state_update_blocks={}, policy_ops=[lambda a, b: a + b],
**kwargs) -> None:
session_id=0, simulation_id=0, run_id=1, **kwargs) -> None:
# print(exogenous_states)
self.sim_config = sim_config
self.initial_state = initial_state
@ -24,11 +26,18 @@ class Configuration(object):
self.policy_ops = policy_ops
self.kwargs = kwargs
self.user_id = user_id
self.session_id = session_id
self.simulation_id = simulation_id
self.run_id = run_id
sanitize_config(self)
def append_configs(sim_configs={}, initial_state={}, seeds={}, raw_exogenous_states={}, env_processes={},
partial_state_update_blocks={}, policy_ops=[lambda a, b: a + b], _exo_update_per_ts: bool = True) -> None:
def append_configs(user_id='cadCAD_user', session_id=0, #ToDo: change to string
sim_configs={}, initial_state={}, seeds={}, raw_exogenous_states={}, env_processes={},
partial_state_update_blocks={}, policy_ops=[lambda a, b: a + b], _exo_update_per_ts: bool = True
) -> None:
if _exo_update_per_ts is True:
exogenous_states = exo_update_per_ts(raw_exogenous_states)
else:
@ -37,7 +46,27 @@ def append_configs(sim_configs={}, initial_state={}, seeds={}, raw_exogenous_sta
if isinstance(sim_configs, dict):
sim_configs = [sim_configs]
for sim_config in sim_configs:
new_sim_configs = []
for t in list(zip(sim_configs, list(range(len(sim_configs))))):
sim_config, simulation_id = t[0], t[1]
N = sim_config['N']
if N > 1:
for n in range(N):
sim_config['simulation_id'] = simulation_id
sim_config['run_id'] = n
sim_config['N'] = 1
new_sim_configs.append(deepcopy(sim_config))
del sim_config
else:
sim_config['simulation_id'] = simulation_id
sim_config['run_id'] = 0
new_sim_configs.append(deepcopy(sim_config))
print(new_sim_configs)
print()
# for sim_config in sim_configs:
for sim_config in new_sim_configs:
config = Configuration(
sim_config=sim_config,
initial_state=initial_state,
@ -45,10 +74,15 @@ def append_configs(sim_configs={}, initial_state={}, seeds={}, raw_exogenous_sta
exogenous_states=exogenous_states,
env_processes=env_processes,
partial_state_update_blocks=partial_state_update_blocks,
policy_ops=policy_ops
policy_ops=policy_ops,
user_id=user_id,
session_id=session_id,
simulation_id=sim_config['simulation_id'],
run_id=sim_config['run_id']
)
print(sim_configs)
#for each sim config create new config
# print(sim_configs)
# for each sim config create new config
configs.append(config)
@ -68,7 +102,10 @@ class Identity:
def state_identity(self, k: str) -> Callable:
return lambda var_dict, sub_step, sL, s, _input: (k, s[k])
# state_identity = cloudpickle.dumps(state_identity)
def apply_identity_funcs(self, identity: Callable, df: DataFrame, cols: List[str]) -> List[DataFrame]:
# identity = pickle.loads(identity)
def fillna_with_id_func(identity, df, col):
return df[[col]].fillna(value=identity(col))

View File

View File

@ -0,0 +1,30 @@
from datetime import datetime
# ToDo: Model Async communication here
# ToDo: Configuration of Input
def configure_producer(msg, config):
from kafka import KafkaProducer
def send_messages(events):
producer = KafkaProducer(**config)
start_timestamp = datetime.now()
for event in range(events):
producer.send('test', msg(event)).get()
delta = datetime.now() - start_timestamp
return start_timestamp, delta.total_seconds()
return send_messages
# def action(step):
# step += 1
# percent = step / 10000
# print(str(datetime.now()) + " - " + str(percent) + ": " + str(step))
# def configure_consumer(config, elemental_action: function):
# from kafka import KafkaConsumer
# def receive_messages():
# consumer = KafkaConsumer(**config)
# return [elemental_action(i) for i, message in enumerate(consumer)]
#
# return receive_messages

View File

@ -0,0 +1 @@
from cadCAD.distroduce.executor.spark.jobs import distributed_produce

View File

@ -0,0 +1,33 @@
from cadCAD.distroduce.configuration.kakfa import configure_producer
from pyspark.context import SparkContext
ascii_art = r'''
___ _ __ _ __ __ __
/ _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
/ // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
/____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
/ _ \ ____ ___ ___/ /__ __ ____ ___
/ ___// __// _ \/ _ // // // __// -_)
/_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
by Joshua E. Jodesty
'''
def distributed_produce(
sc: SparkContext,
spark_run,
sim_time,
sim_runs,
rdd_parts,
parameterized_message,
prod_config
):
print(ascii_art)
message_counts = [sim_time] * sim_runs
msg_rdd = sc.parallelize(message_counts).repartition(rdd_parts)
parts = msg_rdd.getNumPartitions()
print()
print(f"RDD_{spark_run} - Partitions: {parts}")
print()
produce = configure_producer(parameterized_message, prod_config)
return msg_rdd.map(produce).collect()

View File

@ -0,0 +1,2 @@
def flatten(l):
return [item for sublist in l for item in sublist]

View File

@ -1,7 +1,14 @@
from pprint import pprint
from typing import Callable, Dict, List, Any, Tuple
from pathos.multiprocessing import ProcessingPool as PPool
from pathos.multiprocessing import ThreadPool as TPool
from pandas.core.frame import DataFrame
from pyspark.context import SparkContext
from pyspark import cloudpickle
import pickle
from fn.func import curried
from cadCAD.distroduce.configuration.kakfa import configure_producer
from cadCAD.utils import flatten
from cadCAD.configuration import Configuration, Processor
from cadCAD.configuration.utils import TensorFieldReport
@ -16,6 +23,7 @@ EnvProcessesType = Dict[str, Callable]
class ExecutionMode:
single_proc = 'single_proc'
multi_proc = 'multi_proc'
dist_proc = 'dist_proc'
def single_proc_exec(
@ -25,11 +33,18 @@ def single_proc_exec(
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
Ns: List[int]
Ns: List[int],
userIDs,
sessionIDs,
simulationIDs,
runIDs: List[int],
):
l = [simulation_execs, states_lists, configs_structs, env_processes_list, Ts, Ns]
simulation_exec, states_list, config, env_processes, T, N = list(map(lambda x: x.pop(), l))
result = simulation_exec(var_dict_list, states_list, config, env_processes, T, N)
params = [simulation_execs, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs]
simulation_exec, states_list, config, env_processes, T, N, user_id, session_id, simulation_id, run_id = \
list(map(lambda x: x.pop(), params))
result = simulation_exec(var_dict_list, states_list, config, env_processes, T, N,
user_id, session_id, simulation_id, run_id)
return flatten(result)
@ -40,27 +55,87 @@ def parallelize_simulations(
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
Ns: List[int]
Ns: List[int],
userIDs,
sessionIDs,
simulationIDs,
runIDs: List[int]
):
l = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns))
params = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs))
with PPool(len(configs_structs)) as p:
results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6]), l)
results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10]), params)
return results
def distributed_simulations(
simulation_execs: List[Callable],
var_dict_list: List[VarDictType],
states_lists: List[StatesListsType],
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
Ns: List[int],
userIDs,
sessionIDs,
simulationIDs,
runIDs: List[int],
sc,
kafkaConfig
):
func_params_zipped = list(
zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list)
)
func_params_kv = [((t[0], t[1], t[2], t[3]), (t[4], t[5], t[6])) for t in func_params_zipped]
def simulate(k, v):
from kafka import KafkaProducer
prod_config = kafkaConfig['producer_config']
kafkaConfig['producer'] = KafkaProducer(**prod_config)
(sim_exec, config, env_procs) = [f[1] for f in func_params_kv if f[0] == k][0]
results = sim_exec(
v['var_dict'], v['states_lists'], config, env_procs, v['Ts'], v['Ns'],
k[0], k[1], k[2], k[3], kafkaConfig
)
return results
val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns))
val_params_kv = [
(
(t[0], t[1], t[2], t[3]),
{'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}
) for t in val_params
]
results_rdd = sc.parallelize(val_params_kv).coalesce(35)
return list(results_rdd.map(lambda x: simulate(*x)).collect())
class ExecutionContext:
def __init__(self, context: str = ExecutionMode.multi_proc) -> None:
def __init__(self, context=ExecutionMode.multi_proc, method=None, kafka_config=None) -> None:
self.name = context
self.method = None
if context == 'single_proc':
self.method = single_proc_exec
elif context == 'multi_proc':
self.method = parallelize_simulations
elif context == 'dist_proc':
def distroduce_proc(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs,
sc, kafkaConfig=kafka_config
):
return method(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs,
sc, kafkaConfig
)
self.method = distroduce_proc
class Executor:
def __init__(self, exec_context: ExecutionContext, configs: List[Configuration]) -> None:
def __init__(self, exec_context: ExecutionContext, configs: List[Configuration], spark_context: SparkContext = None) -> None:
self.sc = spark_context
self.SimExecutor = SimExecutor
self.exec_method = exec_context.method
self.exec_context = exec_context.name
@ -70,50 +145,59 @@ class Executor:
config_proc = Processor()
create_tensor_field = TensorFieldReport(config_proc).create_tensor_field
print(r'''
__________ ____
________ __ _____/ ____/ | / __ \
/ ___/ __` / __ / / / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/ |_/_____/
by BlockScience
''')
print(f'Execution Mode: {self.exec_context + ": " + str(self.configs)}')
print(f'Configurations: {self.configs}')
var_dict_list, states_lists, Ts, Ns, eps, configs_structs, env_processes_list, partial_state_updates, simulation_execs = \
[], [], [], [], [], [], [], [], []
userIDs, sessionIDs, simulationIDs, runIDs, \
var_dict_list, states_lists, \
Ts, Ns, \
eps, configs_structs, env_processes_list, \
partial_state_updates, simulation_execs = \
[], [], [], [], [], [], [], [], [], [], [], [], []
config_idx = 0
for x in self.configs:
userIDs.append(x.user_id)
sessionIDs.append(x.session_id)
simulationIDs.append(x.simulation_id)
runIDs.append(x.run_id)
Ts.append(x.sim_config['T'])
Ns.append(x.sim_config['N'])
var_dict_list.append(x.sim_config['M'])
states_lists.append([x.initial_state])
eps.append(list(x.exogenous_states.values()))
configs_structs.append(config_proc.generate_config(x.initial_state, x.partial_state_updates, eps[config_idx]))
# print(env_processes_list)
env_processes_list.append(x.env_processes)
partial_state_updates.append(x.partial_state_updates)
simulation_execs.append(SimExecutor(x.policy_ops).simulation)
config_idx += 1
final_result = None
if self.exec_context == ExecutionMode.single_proc:
tensor_field = create_tensor_field(partial_state_updates.pop(), eps.pop())
result = self.exec_method(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns)
result = self.exec_method(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs
)
final_result = result, tensor_field
elif self.exec_context == ExecutionMode.multi_proc:
# if len(self.configs) > 1:
simulations = self.exec_method(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns)
else:
if self.exec_context == ExecutionMode.multi_proc:
# if len(self.configs) > 1:
simulations = self.exec_method(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs
)
elif self.exec_context == ExecutionMode.dist_proc:
simulations = self.exec_method(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs, self.sc
)
results = []
for result, partial_state_updates, ep in list(zip(simulations, partial_state_updates, eps)):
results.append((flatten(result), create_tensor_field(partial_state_updates, ep)))
final_result = results
return final_result

View File

@ -1,5 +1,5 @@
from typing import Any, Callable, Dict, List, Tuple
from pathos.pools import ThreadPool as TPool
# from pathos.pools import ThreadPool as TPool
from copy import deepcopy
from functools import reduce
@ -27,13 +27,14 @@ class Executor:
sub_step: int,
sL: List[Dict[str, Any]],
s: Dict[str, Any],
funcs: List[Callable]
funcs: List[Callable],
kafkaConfig
) -> Dict[str, Any]:
ops = self.policy_ops
def get_col_results(sweep_dict, sub_step, sL, s, funcs):
return list(map(lambda f: f(sweep_dict, sub_step, sL, s), funcs))
return list(map(lambda f: f(sweep_dict, sub_step, sL, s, kafkaConfig), funcs))
def compose(init_reduction_funct, funct_list, val_list):
result, i = None, 0
@ -105,18 +106,18 @@ class Executor:
policy_funcs: List[Callable],
env_processes: Dict[str, Callable],
time_step: int,
run: int
run: int,
kafkaConfig
) -> List[Dict[str, Any]]:
last_in_obj: Dict[str, Any] = deepcopy(sL[-1])
_input: Dict[str, Any] = self.policy_update_exception(
self.get_policy_input(sweep_dict, sub_step, sH, last_in_obj, policy_funcs)
self.get_policy_input(sweep_dict, sub_step, sH, last_in_obj, policy_funcs, kafkaConfig)
)
def generate_record(state_funcs):
for f in state_funcs:
yield self.state_update_exception(f(sweep_dict, sub_step, sH, last_in_obj, _input))
yield self.state_update_exception(f(sweep_dict, sub_step, sH, last_in_obj, _input, kafkaConfig))
def transfer_missing_fields(source, destination):
for k in source:
@ -128,7 +129,6 @@ class Executor:
last_in_copy: Dict[str, Any] = transfer_missing_fields(last_in_obj, dict(generate_record(state_funcs)))
last_in_copy: Dict[str, Any] = self.apply_env_proc(sweep_dict, env_processes, last_in_copy)
last_in_copy['substep'], last_in_copy['timestep'], last_in_copy['run'] = sub_step, time_step, run
sL.append(last_in_copy)
del last_in_copy
@ -142,7 +142,8 @@ class Executor:
configs: List[Tuple[List[Callable], List[Callable]]],
env_processes: Dict[str, Callable],
time_step: int,
run: int
run: int,
kafkaConfig
) -> List[Dict[str, Any]]:
sub_step = 0
@ -159,11 +160,9 @@ class Executor:
states_list: List[Dict[str, Any]] = [genesis_states]
sub_step += 1
for [s_conf, p_conf] in configs: # tensor field
states_list: List[Dict[str, Any]] = self.partial_state_update(
sweep_dict, sub_step, states_list, simulation_list, s_conf, p_conf, env_processes, time_step, run
sweep_dict, sub_step, states_list, simulation_list, s_conf, p_conf, env_processes, time_step, run, kafkaConfig
)
sub_step += 1
@ -179,15 +178,15 @@ class Executor:
configs: List[Tuple[List[Callable], List[Callable]]],
env_processes: Dict[str, Callable],
time_seq: range,
run: int
run: int,
kafkaConfig
) -> List[List[Dict[str, Any]]]:
time_seq: List[int] = [x + 1 for x in time_seq]
simulation_list: List[List[Dict[str, Any]]] = [states_list]
for time_step in time_seq:
pipe_run: List[Dict[str, Any]] = self.state_update_pipeline(
sweep_dict, simulation_list, configs, env_processes, time_step, run
sweep_dict, simulation_list, configs, env_processes, time_step, run, kafkaConfig
)
_, *pipe_run = pipe_run
@ -202,33 +201,53 @@ class Executor:
configs: List[Tuple[List[Callable], List[Callable]]],
env_processes: Dict[str, Callable],
time_seq: range,
runs: int
runs: int,
user_id,
session_id,
simulation_id,
run_id,
kafkaConfig
) -> List[List[Dict[str, Any]]]:
def execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run) -> List[Dict[str, Any]]:
run += 1
def generate_init_sys_metrics(genesis_states_list):
for d in genesis_states_list:
d['run'], d['substep'], d['timestep'] = run, 0, 0
d['user_id'], d['simulation_id'], d['session_id'], d['run_id'] = user_id, simulation_id, session_id, run_id
yield d
states_list_copy: List[Dict[str, Any]] = list(generate_init_sys_metrics(deepcopy(states_list)))
first_timestep_per_run: List[Dict[str, Any]] = self.run_pipeline(
sweep_dict, states_list_copy, configs, env_processes, time_seq, run
sweep_dict, states_list_copy, configs, env_processes, time_seq, run, kafkaConfig
)
del states_list_copy
return first_timestep_per_run
tp = TPool(runs)
pipe_run: List[List[Dict[str, Any]]] = flatten(
tp.map(
lambda run: execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run),
list(range(runs))
)
pipe_run = flatten(
[execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run) for run in range(runs)]
)
tp.clear()
return pipe_run
# ******** Only has one run
# pipe_run: List[List[Dict[str, Any]]] = flatten(
# sc.parallelize(list(range(runs))).map(
# lambda run: execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run)
# ).collect()
# )
# print(type(run_id))
# print(runs)
# tp = TPool(runs)
# pipe_run: List[List[Dict[str, Any]]] = flatten(
# tp.map(
# lambda run: execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run),
# list(range(runs))
# )
# )
#
# tp.clear()
r

View File

@ -17,6 +17,14 @@ def append_dict(dict, new_dict):
return dict
def arrange_cols(df, reverse):
session_metrics = ['user_id', 'session_id', 'simulation_id', 'run_id']
sys_metrics = ['run', 'timestep', 'substep']
result_cols = list(set(df.columns) - set(session_metrics) - set(sys_metrics))
result_cols.sort(reverse=reverse)
return df[session_metrics + sys_metrics + result_cols]
class IndexCounter:
def __init__(self):
self.i = 0

BIN
dist/cadCAD-0.0.2-py3-none-any.whl vendored Normal file

Binary file not shown.

BIN
dist/cadCAD-0.0.2.tar.gz vendored Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.

145
distroduce/README.md Normal file
View File

@ -0,0 +1,145 @@
```
___ _ __ _ __ __ __
/ _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
/ // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
/____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
/ _ \ ____ ___ ___/ /__ __ ____ ___
/ ___// __// _ \/ _ // // // __// -_)
/_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
by Joshua E. Jodesty
```
## What?: *Description*
***Distributed Produce*** (**[distroduce](distroduce)**) is a distributed message simulation and throughput benchmarking
framework / [cadCAD](https://cadcad.org) execution mode that leverages [Apache Spark](https://spark.apache.org/) and
[Apache Kafka Producer](https://kafka.apache.org/documentation/#producerapi) for optimizing Kafka cluster configurations
and debugging real-time data transformations. *distroduce* leverages cadCAD's user-defined event simulation template and
framework to simulate messages sent to Kafka clusters. This enables rapid and iterative design, debugging, and message
publish benchmarking of Kafka clusters and real-time data processing using Kafka Streams and Spark (Structured)
Streaming.
##How?: *A Tail of Two Clusters*
***Distributed Produce*** is a Spark Application used as a cadCAD Execution Mode that distributes Kafka Producers,
message simulation, and message publishing to worker nodes of an [AWS EMR](https://aws.amazon.com/emr/) cluster.
Messages published from these workers are sent to Kafka topics on a Kafka cluster from a Spark bootstrapped EMR cluster.
##Why?: *Use Case*
* **IoT Event / Device Simulation:** Competes with *AWS IoT Device Simulator* and *Azure IoT Solution Acceleration:
Device Simulation*. Unlike these products, *Distributed Produce* enables a user-defined state updates and agent actions,
as well as message publish benchmarking
* **Development Environment for Real-Time Data Processing / Routing:**
##Get Started:
### 0. Set Up Local Development Environment: see [Kafka Quickstart](https://kafka.apache.org/quickstart)
**a.** Install `pyspark`
```bash
pip3 install pyspark
```
**b.** Install & Unzip Kafka, Create Kafka `test` topic, and Start Consumer
```bash
sh distroduce/configuration/launch_local_kafka.sh
```
**c.** Run Simulation locally
```bash
zip -rq distroduce/dist/distroduce.zip distroduce/
spark-submit --py-files distroduce/dist/distroduce.zip distroduce/local_messaging_sim.py `hostname | xargs`
```
### 1. Write cadCAD Simulation:
* **Simulation Description:**
To demonstration of *Distributed Produce*, I implemented a simulation of two users interacting over a messaging service.
* **cadCAD Resources:**
* [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation)
* [Tutorials](https://github.com/BlockScience/cadCAD/tree/master/tutorials)
* **Terminology:**
* ***[Initial Conditions](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-variables)*** - State Variables and their initial values (Start event of Simulation)
```python
initial_conditions = {
'state_variable_1': 0,
'state_variable_2': 0,
'state_variable_3': 1.5,
'timestamp': '2019-01-01 00:00:00'
}
```
* ***[Policy Functions:](https://github.com/BlockScience/cadCAD/tree/master/documentation#Policy-Functions)*** -
computes one or more signals to be passed to State Update Functions
```python
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig):
...
return 'state_variable_name', new_value
```
Parameters:
* **_params** : `dict` - [System parameters](https://github.com/BlockScience/cadCAD/blob/master/documentation/System_Model_Parameter_Sweep.md)
* **substep** : `int` - Current [substep](https://github.com/BlockScience/cadCAD/tree/master/documentation#Substep)
* **sH** : `list[list[dict]]` - Historical values of all state variables for the simulation. See
[Historical State Access](https://github.com/BlockScience/cadCAD/blob/master/documentation/Historically_State_Access.md) for details
* **s** : `dict` - Current state of the system, where the `dict_keys` are the names of the state variables and the
`dict_values` are their current values.
* **kafkaConfig:** `kafka.KafkaProducer` - Configuration for `kafka-python`
[Producer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
* ***[State Update Functions](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-update-functions):*** -
updates state variables change over time
```python
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig):
...
return 'state_variable_name', new_value
```
Parameters:
* **_params** : `dict` - [System parameters](https://github.com/BlockScience/cadCAD/blob/master/documentation/System_Model_Parameter_Sweep.md)
* **substep** : `int` - Current [substep](https://github.com/BlockScience/cadCAD/tree/master/documentation#Substep)
* **sH** : `list[list[dict]]` - Historical values of all state variables for the simulation. See
[Historical State Access](https://github.com/BlockScience/cadCAD/blob/master/documentation/Historically_State_Access.md) for details
* **s** : `dict` - Current state of the system, where the `dict_keys` are the names of the state variables and the
`dict_values` are their current values.
* **actions** : `dict` - Aggregation of the signals of all policy functions in the current
* **kafkaConfig:** `kafka.KafkaProducer` - Configuration for `kafka-python`
[Producer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
* ***[Partial State Update Block](https://github.com/BlockScience/cadCAD/tree/master/documentation#State-Variables) (PSUB):*** -
a set of State Update Functions and Policy Functions that update state records
![](https://i.imgur.com/9rlX9TG.png)
**Note:** State Update and Policy Functions now have the additional / undocumented parameter `kafkaConfig`
**a.** **Define Policy Functions:**
* [Example:](distroduce/action_policies.py) Two users interacting on separate chat clients and entering / exiting chat
**b.** **Define State Update Functions:**
* [Example:](distroduce/state_updates.py) Used for logging and maintaining state of user actions defined by policies
**c.** **Define Initial Conditions & Partial State Update Block:**
* **Initial Conditions:** [Example](distroduce/messaging_sim.py)
* **Partial State Update Block (PSUB):** [Example](distroduce/simulation.py)
**d.** **Create Simulation Executor:** Used for running a simulation
* [Local](distroduce/local_messaging_sim.py)
* [EMR](distroduce/messaging_sim.py)
### 2. [Configure EMR Cluster](distroduce/configuration/cluster.py)
### 3. Launch EMR Cluster:
**Option A:** Preconfigured Launch
```bash
python3 distroduce/emr/launch.py
```
**Option B:** Custom Launch - [Example](distroduce/emr/launch.py)
```python
from distroduce.emr.launch import launch_cluster
from distroduce.configuration.cluster import ec2_attributes, bootstrap_actions, instance_groups, configurations
region = 'us-east-1'
cluster_name = 'distibuted_produce'
launch_cluster(cluster_name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)
```
### 4. Execute Benchmark(s) on EMR:
* **Step 1:** ssh unto master node
```bash
zip -rq distroduce/dist/distroduce.zip distroduce/
```
* **Step 2:** ssh unto master node
* **Step 3:** Spark Submit
```
spark-submit --master yarn --py-files distroduce.zip messaging_sim.py `hostname | xargs`
```

0
distroduce/__init__.py Normal file
View File

View File

@ -0,0 +1,72 @@
from datetime import datetime
from kafka import KafkaProducer
'''
Function that produces a single actions taken by a user and its accompanying message.
The results will be combined with a user-defined aggregation function (UDAF)
given to the `policy_ops` argument of `cadCAD.configuration.append_configs`.
Current UDAF: `policy_ops=[lambda a, b: a + b]`
'''
def messages(client_id, room, action, _input, sender, receiver=None):
return {
'types': [action],
'messages': [
{
'client': client_id, 'room': room, 'action': action,
'sender': sender, 'receiver': receiver,
'input': _input,
'created': datetime.now()
}
]
}
'''
Policy representing a user entering a chat room and its accompanying message
'''
def enter_action(state, room, user):
def f(_g, step, sL, s, kafkaConfig):
msgs = messages(state, room, 'enter', f"{user} enters {room}", user)
msgs['send_times'] = [0.000000]
msgs['msg_counts'] = [len(msgs['messages'])]
return msgs
return f
'''
Policy representing a user sending a message to a receiver within a chat room
and its accompanying message
A Kafka Producer is used to send messages to a Kafka cluster.
The configuration of the Kafka Producer via `cadCAD.engine.ExecutionContext`
'''
def message_actions(state, room, message_input, sender, receiver):
msgs = messages(state, room, 'send', message_input, sender, receiver)
msgs_list = msgs['messages']
def send_action(_g, step, sL, s, kafkaConfig):
start_time = datetime.now()
for msg in msgs_list:
producer: KafkaProducer = kafkaConfig['producer']
topic: str = kafkaConfig['send_topic']
encoded_msg = str(msg).encode('utf-8')
producer.send(topic, encoded_msg)
msgs['send_times'] = [(datetime.now() - start_time).total_seconds()]
msgs['msg_counts'] = [len(msgs_list)]
return msgs
return send_action
'''
Policy representing a user exiting a chat room and its accompanying message
'''
def exit_action(state, room, user):
def f(_g, step, sL, s, kafkaConfig):
msgs = messages(state, room, 'exit', f"{user} exited {room}", user)
msgs_list = msgs['messages']
msgs['send_times'] = [0.000000]
msgs['msg_counts'] = [len(msgs_list)]
return msgs
return f

View File

@ -0,0 +1,3 @@
#!/bin/bash
pip3 install -r requirements.txt
zip -rq distroduce/dist/distroduce.zip distroduce/

View File

@ -0,0 +1,79 @@
ec2_attributes = {
"KeyName":"joshua-IAM-keypair",
"InstanceProfile":"EMR_EC2_DefaultRole",
"SubnetId":"subnet-0034e615b047fd112",
"EmrManagedSlaveSecurityGroup":"sg-08e546ae27d86d6a3",
"EmrManagedMasterSecurityGroup":"sg-08e546ae27d86d6a3"
}
bootstrap_actions = [
{
"Path":"s3://insightde/emr/bootstraps/distroduce.sh",
"Name":"bootstrap"
}
]
instance_groups = [
{
"InstanceCount":5,
"EbsConfiguration":
{"EbsBlockDeviceConfigs":
[
{
"VolumeSpecification":
{"SizeInGB":32,"VolumeType":"gp2"},
"VolumesPerInstance":2
}
]
},
"InstanceGroupType":"CORE",
"InstanceType":"m4.xlarge",
"Name":"Core - 2"
},
{
"InstanceCount":1,
"EbsConfiguration":
{
"EbsBlockDeviceConfigs":
[
{
"VolumeSpecification":
{"SizeInGB":32,"VolumeType":"gp2"},
"VolumesPerInstance":2
}
]
},
"InstanceGroupType":"MASTER",
"InstanceType":"m4.xlarge",
"Name":"Master - 1"
}
]
configurations = [
{
"Classification":"spark-env",
"Properties":{},
"Configurations":
[
{
"Classification":"export",
"Properties":{
"PYSPARK_PYTHON": "/usr/bin/python3",
"PYSPARK_DRIVER_PYTHON": "/usr/bin/python3"
}
}
]
},
{
"Classification":"spark-defaults",
"Properties":{
"spark.sql.execution.arrow.enabled": "true"
}
},
{
"Classification":"spark",
"Properties":{
"maximizeResourceAllocation":"true"
}
}
]

View File

@ -0,0 +1,23 @@
#!/bin/bash
cd ~
yes | sudo python3 -m pip install --upgrade pip
yes | sudo python3 -m pip install pathos kafka-python
wget https://raw.githubusercontent.com/JEJodesty/cadCAD/dev/dist/cadCAD-0.0.2-py3-none-any.whl
yes | sudo python3 -m pip install cadCAD-0.0.2-py3-none-any.whl
# check for master node
IS_MASTER=false
if grep -i isMaster /mnt/var/lib/info/instance.json | grep -i true;
then
IS_MASTER=true
PRIVATE_IP=`hostname -I | xargs`
wget https://raw.githubusercontent.com/JEJodesty/cadCAD/dev/distroduce/dist/distroduce.zip
wget https://raw.githubusercontent.com/JEJodesty/cadCAD/dev/distroduce/messaging_sim.py
sudo sed -i -e '$a\export PYSPARK_PYTHON=/usr/bin/python3' /etc/spark/conf/spark-env.sh
wget http://apache.spinellicreations.com/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -xzf kafka_2.12-2.3.0.tgz
cd kafka_2.12-2.3.0
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
bin/kafka-topics.sh --create --bootstrap-server ${PRIVATE_IP}:9092 --replication-factor 1 --partitions 1 --topic test
fi

View File

@ -0,0 +1,83 @@
#!/bin/bash
# SSH into all machines
ssh -i ~/.ssh/joshua-IAM-keypair.pem hadoop@
sudo python3 -m pip install --upgrade pip
sudo python3 -m pip install pathos kafka-python
# SetUp Window: head node
cd ~
#sudo python3 -m pip install --upgrade pip
#sudo python3 -m pip install pathos kafka-python
sudo sed -i -e '$a\export PYSPARK_PYTHON=/usr/bin/python3' /etc/spark/conf/spark-env.sh
wget http://apache.spinellicreations.com/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -xzf kafka_2.12-2.3.0.tgz
cd kafka_2.12-2.3.0
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
# get ip
bin/kafka-topics.sh --create --bootstrap-server 10.0.0.9:9092 --replication-factor 1 --partitions 1 --topic test
# bin/kafka-topics.sh --list --bootstrap-server 10.0.0.9:9092
# Consume (Window): head node
bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.9:9092 --topic test --from-beginning
# DELETE
# bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test
# local
python3 setup.py sdist bdist_wheel
pip3 install dist/*.whl
# SCP: all nodes
# S3 duuhhhh
# git clone specific file
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-18-206-12-181.compute-1.amazonaws.com:/home/hadoop/
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-34-239-171-181.compute-1.amazonaws.com:/home/hadoop/
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-3-230-154-170.compute-1.amazonaws.com:/home/hadoop/
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-18-232-52-219.compute-1.amazonaws.com:/home/hadoop/
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-34-231-70-210.compute-1.amazonaws.com:/home/hadoop/
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-34-231-243-101.compute-1.amazonaws.com:/home/hadoop/
sudo python3 -m pip install *.whl
# SCP head node
# ToDo: zip build for cadCAD OR give py library after whl install
scp -i ~/.ssh/joshua-IAM-keypair.pem distroduce/dist/distroduce.zip hadoop@ec2-18-206-12-181.compute-1.amazonaws.com:/home/hadoop/
scp -i ~/.ssh/joshua-IAM-keypair.pem distroduce/messaging_sim.py hadoop@ec2-18-206-12-181.compute-1.amazonaws.com:/home/hadoop/
#scp -i ~/.ssh/joshua-IAM-keypair.pem dist/distroduce.zip hadoop@ec2-18-232-54-233.compute-1.amazonaws.com:/home/hadoop/
#scp -i ~/.ssh/joshua-IAM-keypair.pem examples/event_bench/main.py hadoop@ec2-18-232-54-233.compute-1.amazonaws.com:/home/hadoop/
# Run Window: Head Node
#spark-submit --master yarn messaging_app.py
spark-submit --master yarn --py-files distroduce.zip main.py
# Cluster Config
#[
# {
# "Classification": "spark-env",
# "Configurations": [
# {
# "Classification": "export",
# "ConfigurationProperties": {
# "PYSPARK_PYTHON": "/usr/bin/python3",
# "PYSPARK_DRIVER_PYTHON": "/usr/bin/python3"
# }
# }
# ]
# },
# {
# "Classification": "spark-defaults",
# "ConfigurationProperties": {
# "spark.sql.execution.arrow.enabled": "true"
# }
# },
# {
# "Classification": "spark",
# "Properties": {
# "maximizeResourceAllocation": "true"
# }
# }
#]

View File

@ -0,0 +1,16 @@
#!/bin/bash
aws emr create-cluster \
--applications Name=Hadoop Name=Hive Name=Spark \
--ec2-attributes '{"KeyName":"joshua-IAM-keypair","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-0034e615b047fd112","EmrManagedSlaveSecurityGroup":"sg-08e546ae27d86d6a3","EmrManagedMasterSecurityGroup":"sg-08e546ae27d86d6a3"}' \
--release-label emr-5.26.0 \
--log-uri 's3n://aws-logs-251682129355-us-east-1/elasticmapreduce/' \
--instance-groups '[{"InstanceCount":5,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"CORE","InstanceType":"m4.xlarge","Name":"Core - 2"},{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"MASTER","InstanceType":"m4.xlarge","Name":"Master - 1"}]' \
--configurations '[{"Classification":"spark-env","Properties":{},"Configurations":[{"Classification":"export","Properties":{}}]},{"Classification":"spark-defaults","Properties":{}},{"Classification":"spark","Properties":{"maximizeResourceAllocation":"true"}}]' \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--bootstrap-actions '[{"Path":"s3://insightde/emr/bootstraps/distroduce.sh","Name":"bootstrap"}]' \
--ebs-root-volume-size 10 \
--service-role EMR_DefaultRole \
--enable-debugging \
--name 'distibuted_produce' \
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
--region us-east-1

View File

@ -0,0 +1,8 @@
#!/bin/bash
wget http://apache.spinellicreations.com/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -xzf kafka_2.12-2.3.0.tgz
cd kafka_2.12-2.3.0
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

View File

@ -0,0 +1,3 @@
#!/bin/bash
#spark-submit --master yarn event_bench.py
spark-submit --py-files dist/distroduce.zip examples/event_bench/main.py

View File

@ -0,0 +1,3 @@
#!/bin/bash
PRIVATE_IP=`hostname -I | xargs`
spark-submit --master yarn --py-files distroduce.zip messaging_sim.py $PRIVATE_IP

View File

@ -0,0 +1,4 @@
#!/bin/bash
scp -i ~/.ssh/joshua-IAM-keypair.pem ~/Projects/event-bench/event_bench.py \
hadoop@ec2-3-230-158-62.compute-1.amazonaws.com:/home/hadoop/

BIN
distroduce/dist/distroduce.zip vendored Normal file

Binary file not shown.

View File

@ -0,0 +1,39 @@
import os, json, boto3
from typing import List
def launch_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations):
def log_uri(name, region):
return f's3n://{name}-{region}/elasticmapreduce/'
os.system(f"""
aws emr create-cluster \
--applications Name=Hadoop Name=Hive Name=Spark \
--ec2-attributes '{json.dumps(ec2_attributes)}' \
--release-label emr-5.26.0 \
--log-uri '{str(log_uri(name, region))}' \
--instance-groups '{json.dumps(instance_groups)}' \
--configurations '{json.dumps(configurations)}' \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--bootstrap-actions '{json.dumps(bootstrap_actions)}' \
--ebs-root-volume-size 10 \
--service-role EMR_DefaultRole \
--enable-debugging \
--name '{name}' \
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
--region {region}
""")
# def benchmark(names: List[str], region, ec2_attributes, bootstrap_actions, instance_groups, configurations):
# current_dir = os.path.dirname(__file__)
# s3 = boto3.client('s3')
# bucket = 'insightde'
#
# file = 'distroduce.sh'
# abs_path = os.path.join(current_dir, file)
# key = f'emr/bootstraps/{file}'
#
# s3.upload_file(abs_path, bucket, key)
# for name in names:
# launch_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)

5
distroduce/emr/launch.py Normal file
View File

@ -0,0 +1,5 @@
from distroduce.emr import launch_cluster
from distroduce.configuration.cluster import ec2_attributes, bootstrap_actions, instance_groups, configurations
region = 'us-east-1'
cluster_name = 'distibuted_produce'
launch_cluster(cluster_name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)

View File

View File

@ -0,0 +1 @@
from distroduce.executor.spark.jobs import distributed_produce

View File

@ -0,0 +1,44 @@
from distroduce.spark.session import sc
def distributed_produce(
simulation_execs,
var_dict_list,
states_lists,
configs_structs,
env_processes_list,
Ts,
Ns,
userIDs,
sessionIDs,
simulationIDs,
runIDs,
spark_context=sc,
kafka_config=None
):
func_params_zipped = list(
zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list)
)
func_params_kv = [((t[0], t[1], t[2], t[3]), (t[4], t[5], t[6])) for t in func_params_zipped]
def simulate(k, v):
from kafka import KafkaProducer
prod_config = kafka_config['producer_config']
kafka_config['producer'] = KafkaProducer(**prod_config)
(sim_exec, config, env_procs) = [f[1] for f in func_params_kv if f[0] == k][0]
results = sim_exec(
v['var_dict'], v['states_lists'], config, env_procs, v['Ts'], v['Ns'],
k[0], k[1], k[2], k[3], kafka_config
)
return results
val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns))
val_params_kv = [
(
(t[0], t[1], t[2], t[3]),
{'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}
) for t in val_params
]
results_rdd = spark_context.parallelize(val_params_kv).coalesce(1)
return list(results_rdd.map(lambda x: simulate(*x)).collect())

View File

@ -0,0 +1,46 @@
import sys
from datetime import datetime
from cadCAD import configs
from cadCAD.configuration.utils import config_sim
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from distroduce.simulation import main, sim_composition
from distroduce.spark.session import sc
from distroduce.executor.spark import distributed_produce
if __name__ == "__main__":
# Initial States
initial_conditions = {
'record_creation': datetime.now(),
'client_a': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0},
'client_b': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0},
'total_msg_count': 0,
'total_send_time': 0.000000
}
'''
Simulation Configuration:
N = Simulation Runs
T = Timesteps for each Partial State Update Block
'''
sim_config = config_sim(
{
"N": 1,
"T": range(10),
}
)
# Configuration for Kafka Producer
kafkaConfig = {
'send_topic': 'test',
'producer_config': {
'bootstrap_servers': f'{sys.argv[1]}:9092',
'acks': 'all'
}
}
exec_mode = ExecutionMode()
dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc, method=distributed_produce, kafka_config=kafkaConfig)
run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc)
main(run, sim_config, initial_conditions, sim_composition)

View File

@ -0,0 +1,46 @@
import sys
from datetime import datetime
from cadCAD import configs
from cadCAD.configuration.utils import config_sim
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from distroduce.simulation import main, sim_composition
from distroduce.spark.session import sc
from distroduce.executor.spark import distributed_produce
if __name__ == "__main__":
# Initial States
initial_conditions = {
'record_creation': datetime.now(),
'client_a': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0},
'client_b': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0},
'total_msg_count': 0,
'total_send_time': 0.000000
}
'''
Simulation Configuration:
N = Simulation Runs
T = Timesteps for each Partial State Update Block
'''
sim_config = config_sim(
{
"N": 1,
"T": range(5000),
}
)
# Configuration for Kafka Producer
kafkaConfig = {
'send_topic': 'test',
'producer_config': {
'bootstrap_servers': f'{sys.argv[1]}:9092',
'acks': 'all'
}
}
exec_mode = ExecutionMode()
dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc, method=distributed_produce, kafka_config=kafkaConfig)
run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc)
main(run, sim_config, initial_conditions, sim_composition)

87
distroduce/simulation.py Normal file
View File

@ -0,0 +1,87 @@
import sys
from pprint import pprint
import pandas as pd
from tabulate import tabulate
from cadCAD.utils import arrange_cols
from cadCAD.configuration import append_configs
from distroduce.action_policies import enter_action, message_actions, exit_action
from distroduce.state_updates import send_message, count_messages, add_send_time, current_time
# State Updates
variables = {
'client_a': send_message('client_a'),
'client_b': send_message('client_b'),
'total_msg_count': count_messages,
'total_send_time': add_send_time,
'record_creation': current_time('record_creation')
}
# Action Policies
policy_group_1 = {
"action_1": enter_action('server', 'room_1', 'A'),
"action_2": enter_action('server', 'room_1', 'B')
}
policy_group_2 = {
"action_1": message_actions('client_A', 'room_1', "Hi B", 'A', 'B'),
"action_2": message_actions('client_B', 'room_1', "Hi A", 'B', 'A')
}
policy_group_3 = {
"action_1": message_actions('client_A', 'room_1', "Bye B", 'A', 'B'),
"action_2": message_actions('client_B', 'room_1', "Bye A", 'B', 'A')
}
policy_group_4 = {
"action_1": exit_action('server', 'room_1', 'A'),
"action_2": exit_action('server', 'room_1', 'B')
}
policy_groups = [policy_group_1, policy_group_2, policy_group_3, policy_group_4]
# Partial State update Block
sim_composition = [{'policies': policy_group, 'variables': variables} for policy_group in policy_groups]
def main(executor, sim_config, intitial_conditions, sim_composition):
append_configs(
user_id='Joshua',
sim_configs=sim_config,
initial_state=intitial_conditions,
partial_state_update_blocks=sim_composition,
policy_ops=[lambda a, b: a + b]
)
pprint(sim_composition)
i = 0
for raw_result, tensor_field in executor.execute():
result = arrange_cols(pd.DataFrame(raw_result), False)
metrics_result = result[
[
'run_id', 'timestep', 'substep',
'record_creation', 'total_msg_count', 'total_send_time'
]
]
msgs_result = result[
[
'run_id', 'timestep', 'substep',
'record_creation',
'client_a', 'client_b'
]
]
print()
if i == 0:
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
last = metrics_result.tail(1)
last['msg_per_sec'] = last['total_msg_count'] / last['total_send_time']
print("Messages Output: Head")
print(tabulate(msgs_result.head(5), headers='keys', tablefmt='psql'))
print("Metrics Output: Head")
print(tabulate(metrics_result.head(5), headers='keys', tablefmt='psql'))
print("Metrics Output: Tail")
print(tabulate(metrics_result.tail(5), headers='keys', tablefmt='psql'))
print(tabulate(last, headers='keys', tablefmt='psql'))
print()
i += 1

View File

View File

@ -0,0 +1,16 @@
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import os
# os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3'
# os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3'
# os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'
spark = SparkSession\
.builder\
.appName("distroduce")\
.getOrCreate()
sc: SparkContext = spark.sparkContext
print(f"Spark UI: {sc.uiWebUrl}")
print()

View File

@ -0,0 +1,60 @@
from copy import deepcopy
from datetime import datetime
from functools import reduce
add = lambda a, b: a + b
'''
Function that maintains the state of users in a chat room
'''
def update_users(users, actions, action_types=['send','enter','exit']):
users = deepcopy(users)
for action_type in action_types:
if action_type in actions['types']:
for msg in actions['messages']:
if msg['action'] == 'send' and action_type == 'send':
continue
elif msg['action'] == 'enter' and action_type == 'enter':
for user in msg['sender']:
users.append(user) # register_entered
elif msg['action'] == 'exit' and action_type == 'exit':
for user in msg['sender']:
users.remove(user) # remove_exited
return users
'''
State Update used to counts sent messages
'''
def count_messages(_g, step, sL, s, actions, kafkaConfig):
return 'total_msg_count', s['total_msg_count'] + reduce(add, actions['msg_counts'])
'''
State Update used to create to log message events
'''
def send_message(state):
return lambda _g, step, sL, s, actions, kafkaConfig: (
state,
{
'users': update_users(s[state]['users'], actions),
'messages': actions['messages'],
'msg_counts': reduce(add, actions['msg_counts']),
'send_times': reduce(add, actions['send_times'])
}
)
'''
State Update used to sum the time taken for the Kafka Producer to send messages between users
'''
def add_send_time(_g, step, sL, s, actions, kafkaConfig):
return 'total_send_time', s['total_send_time'] + reduce(add, actions['send_times'])
'''
State Update used to record the event record creation time
'''
def current_time(state):
return lambda _g, step, sL, s, actions, kafkaConfig: (state, datetime.now())

View File

@ -3,4 +3,6 @@ wheel
pathos
fn
tabulate
funcy
funcy
pyspark
kafka-python

View File

@ -19,7 +19,7 @@ provided.
"""
setup(name='cadCAD',
version='0.3.0',
version='0.0.2',
description="cadCAD: a differential games based simulation software package for research, validation, and \
Computer Aided Design of economic systems",
long_description=long_description,
@ -27,5 +27,14 @@ setup(name='cadCAD',
author='Joshua E. Jodesty',
author_email='joshua@block.science, joshua.jodesty@gmail.com',
license='LICENSE.txt',
packages=find_packages()
packages=find_packages(),
install_requires=[
"pandas",
"wheel",
"pathos",
"fn",
"tabulate",
"funcy",
"kafka-python"
]
)

0
simulations/__init__.py Normal file
View File

View File

@ -152,6 +152,7 @@ sim_config = config_sim(
)
append_configs(
user_id='user_a',
sim_configs=sim_config,
initial_state=genesis_states,
env_processes=env_processes,

View File

@ -140,6 +140,7 @@ sim_config = config_sim(
)
append_configs(
user_id='user_b',
sim_configs=sim_config,
initial_state=genesis_states,
env_processes=env_processes,

View File

@ -144,6 +144,7 @@ sim_config = config_sim(
# New Convention
partial_state_update_blocks = psub_list(psu_block, psu_steps)
append_configs(
user_id='user_a',
sim_configs=sim_config,
initial_state=genesis_states,
seeds=seeds,

View File

@ -3,6 +3,7 @@ from typing import List
from tabulate import tabulate
# The following imports NEED to be in the exact order
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from cadCAD.utils import arrange_cols
from simulations.regression_tests import config1
from cadCAD import configs
@ -14,8 +15,11 @@ first_config = configs # only contains config1
single_proc_ctx = ExecutionContext(context=exec_mode.single_proc)
run = Executor(exec_context=single_proc_ctx, configs=first_config)
# print(set(result.columns) - set(['user_id', 'session_id', 'simulation_id', 'run_id']) - set(['run', 'timestep', 'substep']))
# print(['run', 'timestep', 'substep'])
raw_result, tensor_field = run.execute()
result = pd.DataFrame(raw_result)
result = arrange_cols(pd.DataFrame(raw_result), False)
print()
print("Tensor Field: config1")
# print(raw_result)

View File

@ -2,6 +2,7 @@ import pandas as pd
from tabulate import tabulate
# The following imports NEED to be in the exact order
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from cadCAD.utils import arrange_cols
from simulations.regression_tests import config2
from cadCAD import configs
@ -9,15 +10,15 @@ exec_mode = ExecutionMode()
print("Simulation Execution: Single Configuration")
print()
first_config = configs # only contains config2
single_proc_ctx = ExecutionContext(context=exec_mode.single_proc)
run = Executor(exec_context=single_proc_ctx, configs=first_config)
single_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
run = Executor(exec_context=single_proc_ctx, configs=configs)
raw_result, tensor_field = run.execute()
result = pd.DataFrame(raw_result)
print()
print("Tensor Field: config1")
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))
print()
for raw_result, tensor_field in run.execute():
result = arrange_cols(pd.DataFrame(raw_result), False)
print()
# print("Tensor Field: " + config_names[i])
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))
print()
# i += 1

View File

@ -2,6 +2,7 @@ import pandas as pd
from tabulate import tabulate
# The following imports NEED to be in the exact order
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from cadCAD.utils import arrange_cols
from simulations.regression_tests import config1, config2
from cadCAD import configs
@ -15,9 +16,9 @@ run = Executor(exec_context=multi_proc_ctx, configs=configs)
i = 0
config_names = ['config1', 'config2']
for raw_result, tensor_field in run.execute():
result = pd.DataFrame(raw_result)
result = arrange_cols(pd.DataFrame(raw_result), False)
print()
print(f"Tensor Field: {config_names[i]}")
# print(f"Tensor Field: {config_names[i]}")
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))

View File

@ -14,11 +14,11 @@ multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
run = Executor(exec_context=multi_proc_ctx, configs=configs)
i = 0
config_names = ['sweep_config_A', 'sweep_config_B']
# config_names = ['sweep_config_A', 'sweep_config_B']
for raw_result, tensor_field in run.execute():
result = pd.DataFrame(raw_result)
print()
print("Tensor Field: " + config_names[i])
# print("Tensor Field: " + config_names[i])
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))