Compare commits

..

40 Commits

Author SHA1 Message Date
Markus Buhatem Koch f5ca0857f3
Merge pull request #76 from BlockScience/staging
Update README.md
2019-10-02 15:22:25 -03:00
Markus Buhatem Koch d9ff2997a1
Update README.md
`System Model Configuration` is now the base MD in the documentation folder
2019-10-02 15:22:02 -03:00
Joshua E. Jodesty ddabdbb381
Merge pull request #75 from BlockScience/staging
Returning Build From Source and Made documentation prominent
2019-09-26 15:05:48 -04:00
Joshua E. Jodesty 17316277ff
Merge pull request #74 from JEJodesty/staging
Staging
2019-09-26 15:02:15 -04:00
Joshua E. Jodesty c326f3c8c0 docs 2019-09-26 14:57:00 -04:00
Joshua E. Jodesty 422b0fb671 documentation & build from source is important for contributors 2019-09-26 09:16:20 -04:00
Joshua E. Jodesty 319f74a89c documentation is important for contributors . . . 2019-09-24 20:22:01 -04:00
Joshua E. Jodesty 154a653c7f
Merge pull request #73 from BlockScience/staging
deploying 0.3.1
2019-09-21 11:05:42 -04:00
Markus Buhatem Koch 3683aa30dc
Merge pull request #72 from markusbkoch/master
Update README
2019-09-19 22:46:42 -03:00
Markus Buhatem Koch 837aad310e
Update README.md 2019-09-19 22:33:54 -03:00
Markus Buhatem Koch 4e5dca0cf9
Rename Simulation_Configuration.md to README.md 2019-09-19 22:33:06 -03:00
Markus Buhatem Koch 4fe3419b23
Update README.md 2019-09-19 22:32:40 -03:00
Joshua E. Jodesty 7c870e584b
Merge pull request #71 from JEJodesty/staging
Merging dist containing tp.clear()
2019-09-16 23:21:21 -04:00
Joshua E. Jodesty b515dd3fdb new dist 2019-09-16 23:18:10 -04:00
Joshua E. Jodesty 5048976f71
Merge pull request #69 from markusbkoch/markusbkoch-patch-1
merge setup.py update
2019-09-16 23:08:50 -04:00
Markus Buhatem Koch 4e1f730c27
update version number 2019-09-11 17:20:33 -03:00
Markus Buhatem Koch 9f96821b89
add dependencies to setup.py
so that pypi installs dependencies automatically (https://packaging.python.org/discussions/install-requires-vs-requirements/)
2019-09-11 17:10:20 -03:00
Markus Buhatem Koch 460b1ff67c
Merge pull request #1 from BlockScience/master
merge
2019-09-11 22:03:02 +02:00
Joshua E. Jodesty b8fa090222
Merge pull request #68 from BlockScience/staging
contribution draft rename
2019-09-07 20:57:38 -04:00
Joshua E. Jodesty 01285e6320
Merge pull request #67 from JEJodesty/master
contribution draft rename
2019-09-07 20:56:48 -04:00
Joshua E. Jodesty d3ef3d23f5 contribution draft 2019-09-07 20:55:48 -04:00
Joshua E. Jodesty ee8b3de331
Merge pull request #66 from BlockScience/staging
contributing.md
2019-09-07 20:53:30 -04:00
Joshua E. Jodesty ce3eacd971
Merge pull request #65 from JEJodesty/master
contributing.md
2019-09-07 20:52:17 -04:00
Joshua E. Jodesty 86e683b268 contribution draft 2019-09-07 20:50:48 -04:00
Joshua E. Jodesty a2346046f3 contribution draft 2019-09-07 20:48:51 -04:00
Joshua E. Jodesty 0619764aef
Merge pull request #64 from BlockScience/staging
Staging
2019-09-07 19:55:33 -04:00
Joshua E. Jodesty faae27f21e
Merge pull request #63 from JEJodesty/master
Open Source cadCAD 0.3.0!!!!!!
2019-09-07 19:54:38 -04:00
Joshua E. Jodesty dd872c3878 open sourced 0.3.0 2019-09-07 19:50:48 -04:00
Joshua E. Jodesty 130f85f0ef
Merge pull request #62 from JEJodesty/master
Hell
2019-09-07 19:31:42 -04:00
Joshua E. Jodesty bc4ab3113d behind gates of hell pt. 1 2019-09-07 19:30:06 -04:00
Joshua E. Jodesty c57e2d9840 behind gates of hell 2019-09-07 19:28:44 -04:00
Joshua E. Jodesty 81d666ce3e hell gates pt. 7 2019-09-07 19:27:20 -04:00
Joshua E. Jodesty f00b14d52e
Merge pull request #61 from BlockScience/staging
Update Description
2019-09-07 19:07:28 -04:00
Joshua E. Jodesty 5d0b1c4aec
Merge pull request #60 from JEJodesty/staging
Update Description
2019-09-07 19:05:47 -04:00
Joshua E. Jodesty 9a12b5d0d6
Merge pull request #58 from JEJodesty/staging
Staging
2019-09-05 16:18:18 -04:00
Markus Buhatem Koch f931945eaf
Merge pull request #57 from BlockScience/staging
add link to readme
2019-08-28 09:13:56 -03:00
Markus Buhatem Koch d56b5c1c5f
Merge pull request #56 from BlockScience/staging
Tutorial part 6
2019-08-28 09:12:18 -03:00
Joshua E. Jodesty 4f58a169c5
Merge pull request #55 from BlockScience/staging
restart threads
2019-08-27 15:28:54 -04:00
Joshua E. Jodesty de9a708d43
Merge pull request #54 from BlockScience/staging
Open Sourcing cadCad Pt. 2
2019-08-22 19:57:37 -04:00
Joshua E. Jodesty f9996163d0
Merge pull request #53 from BlockScience/staging
Open Sourcing cadCAD!!!!
2019-08-22 18:29:47 -04:00
55 changed files with 162 additions and 1288 deletions

8
.gitignore vendored
View File

@ -3,7 +3,7 @@ jupyter notebook
.ipynb_checkpoints
.DS_Store
.idea
.pytest_cache
.pytest_cache/
notebooks
*.egg-info
__pycache__
@ -27,8 +27,4 @@ testing/udo_test.py
Simulation.md
monkeytype.sqlite3
distributed_produce/bash/
distributed_produce/notes.txt
notes.txt
monkeytype.sqlite3

21
CONTRIBUTING.md Normal file
View File

@ -0,0 +1,21 @@
# Contributing to cadCAD (Draft)
:+1::tada: First off, thanks for taking the time to contribute! :tada::+1:
The following is a set of guidelines for contributing to cadCAD. These are mostly guidelines, not rules.
Use your best judgment, and feel free to propose changes to this document in a pull request.
### Pull Requests:
Pull Request (PR) presented as "->".
General Template:
fork/branch -> BlockScience/staging
Contributing a new feature:
fork/feature -> BlockScience/staging
Contributing to an existing feature:
fork/feature -> BlockScience/feature
Thanks! :heart:

172
README.md
View File

@ -1,142 +1,50 @@
```
___ _ __ _ __ __ __
/ _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
/ // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
/____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
/ _ \ ____ ___ ___/ /__ __ ____ ___
/ ___// __// _ \/ _ // // // __// -_)
/_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
by Joshua E. Jodesty
__________ ____
________ __ _____/ ____/ | / __ \
/ ___/ __` / __ / / / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/ |_/_____/
by BlockScience
======================================
Complex Adaptive Dynamics
o i e
m d s
p e i
u d g
t n
e
r
```
## What?: *Description*
***Distributed Produce*** (**[distroduce](distroduce)**) is a message simulation and throughput benchmarking framework /
[cadCAD](https://cadcad.org) execution mode that leverages [Apache Spark](https://spark.apache.org/) and
[Apache Kafka Producer](https://kafka.apache.org/documentation/#producerapi) for optimizing Kafka cluster configurations
and debugging real-time data transformations. *distroduce* leverages cadCAD's user-defined event simulation template and
framework to simulate messages sent to Kafka clusters. This enables rapid and iterative design, debugging, and message
publish benchmarking of Kafka clusters and real-time data processing using Kafka Streams and Spark (Structured)
Streaming.
***cadCAD*** is a Python package that assists in the processes of designing, testing and validating complex systems through simulation, with support for Monte Carlo methods, A/B testing and parameter sweeping.
##How?: *A Tail of Two Clusters*
***Distributed Produce*** is a Spark Application used as a cadCAD Execution Mode that distributes Kafka Producers,
message simulation, and message publishing to worker nodes of an EMR cluster. Messages published from these workers are
sent to Kafka topics on a Kafka cluster from a Spark bootstrapped EMR cluster.
# Getting Started
## 1. Installation:
Requires [Python 3](https://www.python.org/downloads/)
##Why?: *Use Case*
* **IoT Event / Device Simulation:** Competes with *AWS IoT Device Simulator* and *Azure IoT Solution Acceleration:
Device Simulation*. Unlike these products, *Distributed Produce* enables a user-defined state updates and agent actions,
as well as message publish benchmarking
* **Development Environment for Real-Time Data Processing / Routing:**
##Get Started:
### 0. Set Up Local Development Environment: see [Kafka Quickstart](https://kafka.apache.org/quickstart)
**a.** Install `pyspark`
**Option A: Install Using [pip](https://pypi.org/project/cadCAD/)**
```bash
pip3 install pyspark
```
**b.** Install & Unzip Kafka, Create Kafka `test` topic, and Start Consumer
```bash
sh distroduce/configuration/launch_local_kafka.sh
```
**c.** Run Simulation locally
```bash
zip -rq distroduce/dist/distroduce.zip distroduce/
spark-submit --py-files distroduce/dist/distroduce.zip distroduce/local_messaging_sim.py `hostname | xargs`
pip3 install cadCAD
```
### 1. Write cadCAD Simulation:
* **Simulation Description:**
To demonstration of *Distributed Produce*, I implemented a simulation of two users interacting over a messaging service.
* **Resources**
* [cadCAD Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation)
* [cadCAD Tutorials](https://github.com/BlockScience/cadCAD/tree/master/tutorials)
* **Terminology:**
* ***[Initial Conditions](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-variables)*** -
State Variables and their initial values (Start event of Simulation)
```python
initial_conditions = {
'state_variable_1': 0,
'state_variable_2': 0,
'state_variable_3': 1.5,
'timestamp': '2019-01-01 00:00:00'
}
```
* ***[Policy Functions:](https://github.com/BlockScience/cadCAD/tree/master/documentation#Policy-Functions)*** -
computes one or more signals to be passed to State Update Functions
```python
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig):
...
return 'state_variable_name', new_value
```
Parameters:
* **_params** : `dict` - [System parameters](https://github.com/BlockScience/cadCAD/blob/master/documentation/System_Model_Parameter_Sweep.md)
* **substep** : `int` - Current [substep](https://github.com/BlockScience/cadCAD/tree/master/documentation#Substep)
* **sH** : `list[list[dict]]` - Historical values of all state variables for the simulation. See
[Historical State Access](https://github.com/BlockScience/cadCAD/blob/master/documentation/Historically_State_Access.md) for details
* **s** : `dict` - Current state of the system, where the `dict_keys` are the names of the state variables and the
`dict_values` are their current values.
* **kafkaConfig:** `kafka.KafkaProducer` - Configuration for `kafka-python`
[Producer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
* ***[State Update Functions](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-update-functions):*** -
updates state variables change over time
```python
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig):
...
return 'state_variable_name', new_value
```
Parameters:
* **_params** : `dict` - [System parameters](https://github.com/BlockScience/cadCAD/blob/master/documentation/System_Model_Parameter_Sweep.md)
* **substep** : `int` - Current [substep](https://github.com/BlockScience/cadCAD/tree/master/documentation#Substep)
* **sH** : `list[list[dict]]` - Historical values of all state variables for the simulation. See
[Historical State Access](https://github.com/BlockScience/cadCAD/blob/master/documentation/Historically_State_Access.md) for details
* **s** : `dict` - Current state of the system, where the `dict_keys` are the names of the state variables and the
`dict_values` are their current values.
* **actions** : `dict` - Aggregation of the signals of all policy functions in the current
* **kafkaConfig:** `kafka.KafkaProducer` - Configuration for `kafka-python`
[Producer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
* ***[Partial State Update Block](https://github.com/BlockScience/cadCAD/tree/master/documentation#State-Variables) (PSUB):*** -
a set of State Update Functions and Policy Functions that update state records
![](https://i.imgur.com/9rlX9TG.png)
**Note:** State Update and Policy Functions now have the additional / undocumented parameter `kafkaConfig`
**a.** **Define Policy Functions:**
* [Example:](distroduce/action_policies.py) Two users interacting on separate chat clients and entering / exiting chat
**b.** **Define State Update Functions:**
* [Example:](distroduce/state_updates.py) Used for logging and maintaining state of user actions defined by policies
**c.** **Define Initial Conditions & Partial State Update Block:**
* **Initial Conditions:** [Example](distroduce/messaging_sim.py)
* **Partial State Update Block (PSUB):** [Example](distroduce/simulation.py)
**d.** **Create Simulation Executor:** Used for running a simulation
* [Local](distroduce/local_messaging_sim.py)
* [EMR](distroduce/messaging_sim.py)
### 2. [Configure EMR Cluster](distroduce/configuration/cluster.py)
### 3. Launch EMR Cluster:
**Option A:** Preconfigured Launch
```bash
python3 distroduce/emr/launch.py
**Option B:** Build From Source
```
**Option B:** Custom Launch - [Example](distroduce/emr/launch.py)
```python
from distroduce.emr.launch import launch_cluster
from distroduce.configuration.cluster import ec2_attributes, bootstrap_actions, instance_groups, configurations
region = 'us-east-1'
cluster_name = 'distibuted_produce'
launch_cluster(cluster_name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)
pip3 install -r requirements.txt
python3 setup.py sdist bdist_wheel
pip3 install dist/*.whl
```
### 4. Execute Benchmark(s):
* **Step 1:** ssh unto master node
* **Step 2:** Spark Submit
```
spark-submit --master yarn --py-files distroduce.zip messaging_sim.py `hostname | xargs`
```
## 2. Learn the basics
**Tutorials:** available both as [Jupyter Notebooks](tutorials)
and [videos](https://www.youtube.com/watch?v=uJEiYHRWA9g&list=PLmWm8ksQq4YKtdRV-SoinhV6LbQMgX1we)
Familiarize yourself with some system modelling concepts and cadCAD terminology.
## 3. Documentation:
* [System Model Configuration](documentation)
* [System Simulation Execution](documentation/Simulation_Execution.md)
* [Policy Aggregation](documentation/Policy_Aggregation.md)
* [System Model Parameter Sweep](documentation/System_Model_Parameter_Sweep.md)
## 4. Connect
Find other cadCAD users at our [Discourse](https://community.cadcad.org/). We are a small but rapidly growing community.

View File

@ -1,38 +0,0 @@
text = r'''
Complex Adaptive Dynamics
o i e
m d s
p e i
u d g
t n
e
r
'''
block_letters = r'''
__________ ____
________ __ _____/ ____/ | / __ \
/ ___/ __` / __ / / / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/ |_/_____/
by BlockScience
'''
production = r'''
__________ ____
________ __ _____/ ____/ | / __ \
/ ___/ __` / __ / / / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/ |_/_____/
by BlockScience
======================================
Complex Adaptive Dynamics
o i e
m d s
p e i
u d g
t n
e
r
'''

15
ascii_art.txt Normal file
View File

@ -0,0 +1,15 @@
Complex Adaptive Dynamics
o i e
m d s
p e i
u d g
t n
e
r
__________ ____
________ __ _____/ ____/ | / __ \
/ ___/ __` / __ / / / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/ |_/_____/
by BlockScience

View File

@ -1,20 +1,2 @@
name = "cadCAD"
configs = []
print(r'''
__________ ____
________ __ _____/ ____/ | / __ \
/ ___/ __` / __ / / / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/ |_/_____/
by BlockScience
======================================
Complex Adaptive Dynamics
o i e
m d s
p e i
u d g
t n
e
r
''')
configs = []

View File

@ -1,9 +1,7 @@
from copy import deepcopy
from typing import Dict, Callable, List, Tuple
from functools import reduce
import pandas as pd
from pandas.core.frame import DataFrame
# import cloudpickle, pickle
from cadCAD import configs
from cadCAD.utils import key_filter
@ -13,9 +11,9 @@ from cadCAD.configuration.utils.depreciationHandler import sanitize_partial_stat
class Configuration(object):
def __init__(self, user_id, sim_config={}, initial_state={}, seeds={}, env_processes={},
def __init__(self, sim_config={}, initial_state={}, seeds={}, env_processes={},
exogenous_states={}, partial_state_update_blocks={}, policy_ops=[lambda a, b: a + b],
session_id=0, simulation_id=0, run_id=1, **kwargs) -> None:
**kwargs) -> None:
# print(exogenous_states)
self.sim_config = sim_config
self.initial_state = initial_state
@ -26,18 +24,11 @@ class Configuration(object):
self.policy_ops = policy_ops
self.kwargs = kwargs
self.user_id = user_id
self.session_id = session_id
self.simulation_id = simulation_id
self.run_id = run_id
sanitize_config(self)
def append_configs(user_id='cadCAD_user', session_id=0, #ToDo: change to string
sim_configs={}, initial_state={}, seeds={}, raw_exogenous_states={}, env_processes={},
partial_state_update_blocks={}, policy_ops=[lambda a, b: a + b], _exo_update_per_ts: bool = True
) -> None:
def append_configs(sim_configs={}, initial_state={}, seeds={}, raw_exogenous_states={}, env_processes={},
partial_state_update_blocks={}, policy_ops=[lambda a, b: a + b], _exo_update_per_ts: bool = True) -> None:
if _exo_update_per_ts is True:
exogenous_states = exo_update_per_ts(raw_exogenous_states)
else:
@ -46,27 +37,7 @@ def append_configs(user_id='cadCAD_user', session_id=0, #ToDo: change to string
if isinstance(sim_configs, dict):
sim_configs = [sim_configs]
new_sim_configs = []
for t in list(zip(sim_configs, list(range(len(sim_configs))))):
sim_config, simulation_id = t[0], t[1]
N = sim_config['N']
if N > 1:
for n in range(N):
sim_config['simulation_id'] = simulation_id
sim_config['run_id'] = n
sim_config['N'] = 1
new_sim_configs.append(deepcopy(sim_config))
del sim_config
else:
sim_config['simulation_id'] = simulation_id
sim_config['run_id'] = 0
new_sim_configs.append(deepcopy(sim_config))
print(new_sim_configs)
print()
# for sim_config in sim_configs:
for sim_config in new_sim_configs:
for sim_config in sim_configs:
config = Configuration(
sim_config=sim_config,
initial_state=initial_state,
@ -74,15 +45,10 @@ def append_configs(user_id='cadCAD_user', session_id=0, #ToDo: change to string
exogenous_states=exogenous_states,
env_processes=env_processes,
partial_state_update_blocks=partial_state_update_blocks,
policy_ops=policy_ops,
user_id=user_id,
session_id=session_id,
simulation_id=sim_config['simulation_id'],
run_id=sim_config['run_id']
policy_ops=policy_ops
)
# print(sim_configs)
# for each sim config create new config
print(sim_configs)
#for each sim config create new config
configs.append(config)
@ -102,10 +68,7 @@ class Identity:
def state_identity(self, k: str) -> Callable:
return lambda var_dict, sub_step, sL, s, _input: (k, s[k])
# state_identity = cloudpickle.dumps(state_identity)
def apply_identity_funcs(self, identity: Callable, df: DataFrame, cols: List[str]) -> List[DataFrame]:
# identity = pickle.loads(identity)
def fillna_with_id_func(identity, df, col):
return df[[col]].fillna(value=identity(col))

View File

@ -1,30 +0,0 @@
from datetime import datetime
# ToDo: Model Async communication here
# ToDo: Configuration of Input
def configure_producer(msg, config):
from kafka import KafkaProducer
def send_messages(events):
producer = KafkaProducer(**config)
start_timestamp = datetime.now()
for event in range(events):
producer.send('test', msg(event)).get()
delta = datetime.now() - start_timestamp
return start_timestamp, delta.total_seconds()
return send_messages
# def action(step):
# step += 1
# percent = step / 10000
# print(str(datetime.now()) + " - " + str(percent) + ": " + str(step))
# def configure_consumer(config, elemental_action: function):
# from kafka import KafkaConsumer
# def receive_messages():
# consumer = KafkaConsumer(**config)
# return [elemental_action(i) for i, message in enumerate(consumer)]
#
# return receive_messages

View File

@ -1 +0,0 @@
from cadCAD.distroduce.executor.spark.jobs import distributed_produce

View File

@ -1,33 +0,0 @@
from cadCAD.distroduce.configuration.kakfa import configure_producer
from pyspark.context import SparkContext
ascii_art = r'''
___ _ __ _ __ __ __
/ _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
/ // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
/____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
/ _ \ ____ ___ ___/ /__ __ ____ ___
/ ___// __// _ \/ _ // // // __// -_)
/_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
by Joshua E. Jodesty
'''
def distributed_produce(
sc: SparkContext,
spark_run,
sim_time,
sim_runs,
rdd_parts,
parameterized_message,
prod_config
):
print(ascii_art)
message_counts = [sim_time] * sim_runs
msg_rdd = sc.parallelize(message_counts).repartition(rdd_parts)
parts = msg_rdd.getNumPartitions()
print()
print(f"RDD_{spark_run} - Partitions: {parts}")
print()
produce = configure_producer(parameterized_message, prod_config)
return msg_rdd.map(produce).collect()

View File

@ -1,2 +0,0 @@
def flatten(l):
return [item for sublist in l for item in sublist]

View File

@ -1,14 +1,7 @@
from pprint import pprint
from typing import Callable, Dict, List, Any, Tuple
from pathos.multiprocessing import ProcessingPool as PPool
from pathos.multiprocessing import ThreadPool as TPool
from pandas.core.frame import DataFrame
from pyspark.context import SparkContext
from pyspark import cloudpickle
import pickle
from fn.func import curried
from cadCAD.distroduce.configuration.kakfa import configure_producer
from cadCAD.utils import flatten
from cadCAD.configuration import Configuration, Processor
from cadCAD.configuration.utils import TensorFieldReport
@ -23,7 +16,6 @@ EnvProcessesType = Dict[str, Callable]
class ExecutionMode:
single_proc = 'single_proc'
multi_proc = 'multi_proc'
dist_proc = 'dist_proc'
def single_proc_exec(
@ -33,18 +25,11 @@ def single_proc_exec(
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
Ns: List[int],
userIDs,
sessionIDs,
simulationIDs,
runIDs: List[int],
Ns: List[int]
):
params = [simulation_execs, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs]
simulation_exec, states_list, config, env_processes, T, N, user_id, session_id, simulation_id, run_id = \
list(map(lambda x: x.pop(), params))
result = simulation_exec(var_dict_list, states_list, config, env_processes, T, N,
user_id, session_id, simulation_id, run_id)
l = [simulation_execs, states_lists, configs_structs, env_processes_list, Ts, Ns]
simulation_exec, states_list, config, env_processes, T, N = list(map(lambda x: x.pop(), l))
result = simulation_exec(var_dict_list, states_list, config, env_processes, T, N)
return flatten(result)
@ -55,87 +40,27 @@ def parallelize_simulations(
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
Ns: List[int],
userIDs,
sessionIDs,
simulationIDs,
runIDs: List[int]
Ns: List[int]
):
params = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs))
l = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns))
with PPool(len(configs_structs)) as p:
results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10]), params)
results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6]), l)
return results
def distributed_simulations(
simulation_execs: List[Callable],
var_dict_list: List[VarDictType],
states_lists: List[StatesListsType],
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
Ns: List[int],
userIDs,
sessionIDs,
simulationIDs,
runIDs: List[int],
sc,
kafkaConfig
):
func_params_zipped = list(
zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list)
)
func_params_kv = [((t[0], t[1], t[2], t[3]), (t[4], t[5], t[6])) for t in func_params_zipped]
def simulate(k, v):
from kafka import KafkaProducer
prod_config = kafkaConfig['producer_config']
kafkaConfig['producer'] = KafkaProducer(**prod_config)
(sim_exec, config, env_procs) = [f[1] for f in func_params_kv if f[0] == k][0]
results = sim_exec(
v['var_dict'], v['states_lists'], config, env_procs, v['Ts'], v['Ns'],
k[0], k[1], k[2], k[3], kafkaConfig
)
return results
val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns))
val_params_kv = [
(
(t[0], t[1], t[2], t[3]),
{'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}
) for t in val_params
]
results_rdd = sc.parallelize(val_params_kv).coalesce(35)
return list(results_rdd.map(lambda x: simulate(*x)).collect())
class ExecutionContext:
def __init__(self, context=ExecutionMode.multi_proc, method=None, kafka_config=None) -> None:
def __init__(self, context: str = ExecutionMode.multi_proc) -> None:
self.name = context
self.method = None
if context == 'single_proc':
self.method = single_proc_exec
elif context == 'multi_proc':
self.method = parallelize_simulations
elif context == 'dist_proc':
def distroduce_proc(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs,
sc, kafkaConfig=kafka_config
):
return method(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs,
sc, kafkaConfig
)
self.method = distroduce_proc
class Executor:
def __init__(self, exec_context: ExecutionContext, configs: List[Configuration], spark_context: SparkContext = None) -> None:
self.sc = spark_context
def __init__(self, exec_context: ExecutionContext, configs: List[Configuration]) -> None:
self.SimExecutor = SimExecutor
self.exec_method = exec_context.method
self.exec_context = exec_context.name
@ -145,59 +70,50 @@ class Executor:
config_proc = Processor()
create_tensor_field = TensorFieldReport(config_proc).create_tensor_field
print(r'''
__________ ____
________ __ _____/ ____/ | / __ \
/ ___/ __` / __ / / / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/ |_/_____/
by BlockScience
''')
print(f'Execution Mode: {self.exec_context + ": " + str(self.configs)}')
print(f'Configurations: {self.configs}')
userIDs, sessionIDs, simulationIDs, runIDs, \
var_dict_list, states_lists, \
Ts, Ns, \
eps, configs_structs, env_processes_list, \
partial_state_updates, simulation_execs = \
[], [], [], [], [], [], [], [], [], [], [], [], []
var_dict_list, states_lists, Ts, Ns, eps, configs_structs, env_processes_list, partial_state_updates, simulation_execs = \
[], [], [], [], [], [], [], [], []
config_idx = 0
for x in self.configs:
userIDs.append(x.user_id)
sessionIDs.append(x.session_id)
simulationIDs.append(x.simulation_id)
runIDs.append(x.run_id)
Ts.append(x.sim_config['T'])
Ns.append(x.sim_config['N'])
var_dict_list.append(x.sim_config['M'])
states_lists.append([x.initial_state])
eps.append(list(x.exogenous_states.values()))
configs_structs.append(config_proc.generate_config(x.initial_state, x.partial_state_updates, eps[config_idx]))
# print(env_processes_list)
env_processes_list.append(x.env_processes)
partial_state_updates.append(x.partial_state_updates)
simulation_execs.append(SimExecutor(x.policy_ops).simulation)
config_idx += 1
final_result = None
if self.exec_context == ExecutionMode.single_proc:
tensor_field = create_tensor_field(partial_state_updates.pop(), eps.pop())
result = self.exec_method(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs
)
result = self.exec_method(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns)
final_result = result, tensor_field
else:
if self.exec_context == ExecutionMode.multi_proc:
# if len(self.configs) > 1:
simulations = self.exec_method(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs
)
elif self.exec_context == ExecutionMode.dist_proc:
simulations = self.exec_method(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs, self.sc
)
elif self.exec_context == ExecutionMode.multi_proc:
# if len(self.configs) > 1:
simulations = self.exec_method(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns)
results = []
for result, partial_state_updates, ep in list(zip(simulations, partial_state_updates, eps)):
results.append((flatten(result), create_tensor_field(partial_state_updates, ep)))
final_result = results
return final_result

View File

@ -1,5 +1,5 @@
from typing import Any, Callable, Dict, List, Tuple
# from pathos.pools import ThreadPool as TPool
from pathos.pools import ThreadPool as TPool
from copy import deepcopy
from functools import reduce
@ -27,14 +27,13 @@ class Executor:
sub_step: int,
sL: List[Dict[str, Any]],
s: Dict[str, Any],
funcs: List[Callable],
kafkaConfig
funcs: List[Callable]
) -> Dict[str, Any]:
ops = self.policy_ops
def get_col_results(sweep_dict, sub_step, sL, s, funcs):
return list(map(lambda f: f(sweep_dict, sub_step, sL, s, kafkaConfig), funcs))
return list(map(lambda f: f(sweep_dict, sub_step, sL, s), funcs))
def compose(init_reduction_funct, funct_list, val_list):
result, i = None, 0
@ -106,18 +105,18 @@ class Executor:
policy_funcs: List[Callable],
env_processes: Dict[str, Callable],
time_step: int,
run: int,
kafkaConfig
run: int
) -> List[Dict[str, Any]]:
last_in_obj: Dict[str, Any] = deepcopy(sL[-1])
_input: Dict[str, Any] = self.policy_update_exception(
self.get_policy_input(sweep_dict, sub_step, sH, last_in_obj, policy_funcs, kafkaConfig)
self.get_policy_input(sweep_dict, sub_step, sH, last_in_obj, policy_funcs)
)
def generate_record(state_funcs):
for f in state_funcs:
yield self.state_update_exception(f(sweep_dict, sub_step, sH, last_in_obj, _input, kafkaConfig))
yield self.state_update_exception(f(sweep_dict, sub_step, sH, last_in_obj, _input))
def transfer_missing_fields(source, destination):
for k in source:
@ -129,6 +128,7 @@ class Executor:
last_in_copy: Dict[str, Any] = transfer_missing_fields(last_in_obj, dict(generate_record(state_funcs)))
last_in_copy: Dict[str, Any] = self.apply_env_proc(sweep_dict, env_processes, last_in_copy)
last_in_copy['substep'], last_in_copy['timestep'], last_in_copy['run'] = sub_step, time_step, run
sL.append(last_in_copy)
del last_in_copy
@ -142,8 +142,7 @@ class Executor:
configs: List[Tuple[List[Callable], List[Callable]]],
env_processes: Dict[str, Callable],
time_step: int,
run: int,
kafkaConfig
run: int
) -> List[Dict[str, Any]]:
sub_step = 0
@ -160,9 +159,11 @@ class Executor:
states_list: List[Dict[str, Any]] = [genesis_states]
sub_step += 1
for [s_conf, p_conf] in configs: # tensor field
states_list: List[Dict[str, Any]] = self.partial_state_update(
sweep_dict, sub_step, states_list, simulation_list, s_conf, p_conf, env_processes, time_step, run, kafkaConfig
sweep_dict, sub_step, states_list, simulation_list, s_conf, p_conf, env_processes, time_step, run
)
sub_step += 1
@ -178,15 +179,15 @@ class Executor:
configs: List[Tuple[List[Callable], List[Callable]]],
env_processes: Dict[str, Callable],
time_seq: range,
run: int,
kafkaConfig
run: int
) -> List[List[Dict[str, Any]]]:
time_seq: List[int] = [x + 1 for x in time_seq]
simulation_list: List[List[Dict[str, Any]]] = [states_list]
for time_step in time_seq:
pipe_run: List[Dict[str, Any]] = self.state_update_pipeline(
sweep_dict, simulation_list, configs, env_processes, time_step, run, kafkaConfig
sweep_dict, simulation_list, configs, env_processes, time_step, run
)
_, *pipe_run = pipe_run
@ -201,53 +202,33 @@ class Executor:
configs: List[Tuple[List[Callable], List[Callable]]],
env_processes: Dict[str, Callable],
time_seq: range,
runs: int,
user_id,
session_id,
simulation_id,
run_id,
kafkaConfig
runs: int
) -> List[List[Dict[str, Any]]]:
def execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run) -> List[Dict[str, Any]]:
run += 1
def generate_init_sys_metrics(genesis_states_list):
for d in genesis_states_list:
d['run'], d['substep'], d['timestep'] = run, 0, 0
d['user_id'], d['simulation_id'], d['session_id'], d['run_id'] = user_id, simulation_id, session_id, run_id
yield d
states_list_copy: List[Dict[str, Any]] = list(generate_init_sys_metrics(deepcopy(states_list)))
first_timestep_per_run: List[Dict[str, Any]] = self.run_pipeline(
sweep_dict, states_list_copy, configs, env_processes, time_seq, run, kafkaConfig
sweep_dict, states_list_copy, configs, env_processes, time_seq, run
)
del states_list_copy
return first_timestep_per_run
pipe_run = flatten(
[execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run) for run in range(runs)]
tp = TPool(runs)
pipe_run: List[List[Dict[str, Any]]] = flatten(
tp.map(
lambda run: execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run),
list(range(runs))
)
)
tp.clear()
return pipe_run
# ******** Only has one run
# pipe_run: List[List[Dict[str, Any]]] = flatten(
# sc.parallelize(list(range(runs))).map(
# lambda run: execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run)
# ).collect()
# )
# print(type(run_id))
# print(runs)
# tp = TPool(runs)
# pipe_run: List[List[Dict[str, Any]]] = flatten(
# tp.map(
# lambda run: execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run),
# list(range(runs))
# )
# )
#
# tp.clear()
r

View File

@ -17,14 +17,6 @@ def append_dict(dict, new_dict):
return dict
def arrange_cols(df, reverse):
session_metrics = ['user_id', 'session_id', 'simulation_id', 'run_id']
sys_metrics = ['run', 'timestep', 'substep']
result_cols = list(set(df.columns) - set(session_metrics) - set(sys_metrics))
result_cols.sort(reverse=reverse)
return df[session_metrics + sys_metrics + result_cols]
class IndexCounter:
def __init__(self):
self.i = 0

Binary file not shown.

Binary file not shown.

BIN
dist/cadCAD-0.3.1-py3-none-any.whl vendored Normal file

Binary file not shown.

BIN
dist/cadCAD-0.3.1.tar.gz vendored Normal file

Binary file not shown.

View File

@ -1,145 +0,0 @@
```
___ _ __ _ __ __ __
/ _ \ (_)___ / /_ ____ (_)/ / __ __ / /_ ___ ___/ /
/ // // /(_-</ __// __// // _ \/ // // __// -_)/ _ /
/____//_//___/\__//_/ _/_//_.__/\_,_/ \__/ \__/ \_,_/
/ _ \ ____ ___ ___/ /__ __ ____ ___
/ ___// __// _ \/ _ // // // __// -_)
/_/ /_/ \___/\_,_/ \_,_/ \__/ \__/
by Joshua E. Jodesty
```
## What?: *Description*
***Distributed Produce*** (**[distroduce](distroduce)**) is a distributed message simulation and throughput benchmarking
framework / [cadCAD](https://cadcad.org) execution mode that leverages [Apache Spark](https://spark.apache.org/) and
[Apache Kafka Producer](https://kafka.apache.org/documentation/#producerapi) for optimizing Kafka cluster configurations
and debugging real-time data transformations. *distroduce* leverages cadCAD's user-defined event simulation template and
framework to simulate messages sent to Kafka clusters. This enables rapid and iterative design, debugging, and message
publish benchmarking of Kafka clusters and real-time data processing using Kafka Streams and Spark (Structured)
Streaming.
##How?: *A Tail of Two Clusters*
***Distributed Produce*** is a Spark Application used as a cadCAD Execution Mode that distributes Kafka Producers,
message simulation, and message publishing to worker nodes of an [AWS EMR](https://aws.amazon.com/emr/) cluster.
Messages published from these workers are sent to Kafka topics on a Kafka cluster from a Spark bootstrapped EMR cluster.
##Why?: *Use Case*
* **IoT Event / Device Simulation:** Competes with *AWS IoT Device Simulator* and *Azure IoT Solution Acceleration:
Device Simulation*. Unlike these products, *Distributed Produce* enables a user-defined state updates and agent actions,
as well as message publish benchmarking
* **Development Environment for Real-Time Data Processing / Routing:**
##Get Started:
### 0. Set Up Local Development Environment: see [Kafka Quickstart](https://kafka.apache.org/quickstart)
**a.** Install `pyspark`
```bash
pip3 install pyspark
```
**b.** Install & Unzip Kafka, Create Kafka `test` topic, and Start Consumer
```bash
sh distroduce/configuration/launch_local_kafka.sh
```
**c.** Run Simulation locally
```bash
zip -rq distroduce/dist/distroduce.zip distroduce/
spark-submit --py-files distroduce/dist/distroduce.zip distroduce/local_messaging_sim.py `hostname | xargs`
```
### 1. Write cadCAD Simulation:
* **Simulation Description:**
To demonstration of *Distributed Produce*, I implemented a simulation of two users interacting over a messaging service.
* **cadCAD Resources:**
* [Documentation](https://github.com/BlockScience/cadCAD/tree/master/documentation)
* [Tutorials](https://github.com/BlockScience/cadCAD/tree/master/tutorials)
* **Terminology:**
* ***[Initial Conditions](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-variables)*** - State Variables and their initial values (Start event of Simulation)
```python
initial_conditions = {
'state_variable_1': 0,
'state_variable_2': 0,
'state_variable_3': 1.5,
'timestamp': '2019-01-01 00:00:00'
}
```
* ***[Policy Functions:](https://github.com/BlockScience/cadCAD/tree/master/documentation#Policy-Functions)*** -
computes one or more signals to be passed to State Update Functions
```python
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig):
...
return 'state_variable_name', new_value
```
Parameters:
* **_params** : `dict` - [System parameters](https://github.com/BlockScience/cadCAD/blob/master/documentation/System_Model_Parameter_Sweep.md)
* **substep** : `int` - Current [substep](https://github.com/BlockScience/cadCAD/tree/master/documentation#Substep)
* **sH** : `list[list[dict]]` - Historical values of all state variables for the simulation. See
[Historical State Access](https://github.com/BlockScience/cadCAD/blob/master/documentation/Historically_State_Access.md) for details
* **s** : `dict` - Current state of the system, where the `dict_keys` are the names of the state variables and the
`dict_values` are their current values.
* **kafkaConfig:** `kafka.KafkaProducer` - Configuration for `kafka-python`
[Producer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
* ***[State Update Functions](https://github.com/BlockScience/cadCAD/tree/master/documentation#state-update-functions):*** -
updates state variables change over time
```python
def state_update_function_A(_params, substep, sH, s, actions, kafkaConfig):
...
return 'state_variable_name', new_value
```
Parameters:
* **_params** : `dict` - [System parameters](https://github.com/BlockScience/cadCAD/blob/master/documentation/System_Model_Parameter_Sweep.md)
* **substep** : `int` - Current [substep](https://github.com/BlockScience/cadCAD/tree/master/documentation#Substep)
* **sH** : `list[list[dict]]` - Historical values of all state variables for the simulation. See
[Historical State Access](https://github.com/BlockScience/cadCAD/blob/master/documentation/Historically_State_Access.md) for details
* **s** : `dict` - Current state of the system, where the `dict_keys` are the names of the state variables and the
`dict_values` are their current values.
* **actions** : `dict` - Aggregation of the signals of all policy functions in the current
* **kafkaConfig:** `kafka.KafkaProducer` - Configuration for `kafka-python`
[Producer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
* ***[Partial State Update Block](https://github.com/BlockScience/cadCAD/tree/master/documentation#State-Variables) (PSUB):*** -
a set of State Update Functions and Policy Functions that update state records
![](https://i.imgur.com/9rlX9TG.png)
**Note:** State Update and Policy Functions now have the additional / undocumented parameter `kafkaConfig`
**a.** **Define Policy Functions:**
* [Example:](distroduce/action_policies.py) Two users interacting on separate chat clients and entering / exiting chat
**b.** **Define State Update Functions:**
* [Example:](distroduce/state_updates.py) Used for logging and maintaining state of user actions defined by policies
**c.** **Define Initial Conditions & Partial State Update Block:**
* **Initial Conditions:** [Example](distroduce/messaging_sim.py)
* **Partial State Update Block (PSUB):** [Example](distroduce/simulation.py)
**d.** **Create Simulation Executor:** Used for running a simulation
* [Local](distroduce/local_messaging_sim.py)
* [EMR](distroduce/messaging_sim.py)
### 2. [Configure EMR Cluster](distroduce/configuration/cluster.py)
### 3. Launch EMR Cluster:
**Option A:** Preconfigured Launch
```bash
python3 distroduce/emr/launch.py
```
**Option B:** Custom Launch - [Example](distroduce/emr/launch.py)
```python
from distroduce.emr.launch import launch_cluster
from distroduce.configuration.cluster import ec2_attributes, bootstrap_actions, instance_groups, configurations
region = 'us-east-1'
cluster_name = 'distibuted_produce'
launch_cluster(cluster_name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)
```
### 4. Execute Benchmark(s) on EMR:
* **Step 1:** ssh unto master node
```bash
zip -rq distroduce/dist/distroduce.zip distroduce/
```
* **Step 2:** ssh unto master node
* **Step 3:** Spark Submit
```
spark-submit --master yarn --py-files distroduce.zip messaging_sim.py `hostname | xargs`
```

View File

View File

@ -1,72 +0,0 @@
from datetime import datetime
from kafka import KafkaProducer
'''
Function that produces a single actions taken by a user and its accompanying message.
The results will be combined with a user-defined aggregation function (UDAF)
given to the `policy_ops` argument of `cadCAD.configuration.append_configs`.
Current UDAF: `policy_ops=[lambda a, b: a + b]`
'''
def messages(client_id, room, action, _input, sender, receiver=None):
return {
'types': [action],
'messages': [
{
'client': client_id, 'room': room, 'action': action,
'sender': sender, 'receiver': receiver,
'input': _input,
'created': datetime.now()
}
]
}
'''
Policy representing a user entering a chat room and its accompanying message
'''
def enter_action(state, room, user):
def f(_g, step, sL, s, kafkaConfig):
msgs = messages(state, room, 'enter', f"{user} enters {room}", user)
msgs['send_times'] = [0.000000]
msgs['msg_counts'] = [len(msgs['messages'])]
return msgs
return f
'''
Policy representing a user sending a message to a receiver within a chat room
and its accompanying message
A Kafka Producer is used to send messages to a Kafka cluster.
The configuration of the Kafka Producer via `cadCAD.engine.ExecutionContext`
'''
def message_actions(state, room, message_input, sender, receiver):
msgs = messages(state, room, 'send', message_input, sender, receiver)
msgs_list = msgs['messages']
def send_action(_g, step, sL, s, kafkaConfig):
start_time = datetime.now()
for msg in msgs_list:
producer: KafkaProducer = kafkaConfig['producer']
topic: str = kafkaConfig['send_topic']
encoded_msg = str(msg).encode('utf-8')
producer.send(topic, encoded_msg)
msgs['send_times'] = [(datetime.now() - start_time).total_seconds()]
msgs['msg_counts'] = [len(msgs_list)]
return msgs
return send_action
'''
Policy representing a user exiting a chat room and its accompanying message
'''
def exit_action(state, room, user):
def f(_g, step, sL, s, kafkaConfig):
msgs = messages(state, room, 'exit', f"{user} exited {room}", user)
msgs_list = msgs['messages']
msgs['send_times'] = [0.000000]
msgs['msg_counts'] = [len(msgs_list)]
return msgs
return f

View File

@ -1,3 +0,0 @@
#!/bin/bash
pip3 install -r requirements.txt
zip -rq distroduce/dist/distroduce.zip distroduce/

View File

@ -1,79 +0,0 @@
ec2_attributes = {
"KeyName":"joshua-IAM-keypair",
"InstanceProfile":"EMR_EC2_DefaultRole",
"SubnetId":"subnet-0034e615b047fd112",
"EmrManagedSlaveSecurityGroup":"sg-08e546ae27d86d6a3",
"EmrManagedMasterSecurityGroup":"sg-08e546ae27d86d6a3"
}
bootstrap_actions = [
{
"Path":"s3://insightde/emr/bootstraps/distroduce.sh",
"Name":"bootstrap"
}
]
instance_groups = [
{
"InstanceCount":5,
"EbsConfiguration":
{"EbsBlockDeviceConfigs":
[
{
"VolumeSpecification":
{"SizeInGB":32,"VolumeType":"gp2"},
"VolumesPerInstance":2
}
]
},
"InstanceGroupType":"CORE",
"InstanceType":"m4.xlarge",
"Name":"Core - 2"
},
{
"InstanceCount":1,
"EbsConfiguration":
{
"EbsBlockDeviceConfigs":
[
{
"VolumeSpecification":
{"SizeInGB":32,"VolumeType":"gp2"},
"VolumesPerInstance":2
}
]
},
"InstanceGroupType":"MASTER",
"InstanceType":"m4.xlarge",
"Name":"Master - 1"
}
]
configurations = [
{
"Classification":"spark-env",
"Properties":{},
"Configurations":
[
{
"Classification":"export",
"Properties":{
"PYSPARK_PYTHON": "/usr/bin/python3",
"PYSPARK_DRIVER_PYTHON": "/usr/bin/python3"
}
}
]
},
{
"Classification":"spark-defaults",
"Properties":{
"spark.sql.execution.arrow.enabled": "true"
}
},
{
"Classification":"spark",
"Properties":{
"maximizeResourceAllocation":"true"
}
}
]

View File

@ -1,23 +0,0 @@
#!/bin/bash
cd ~
yes | sudo python3 -m pip install --upgrade pip
yes | sudo python3 -m pip install pathos kafka-python
wget https://raw.githubusercontent.com/JEJodesty/cadCAD/dev/dist/cadCAD-0.0.2-py3-none-any.whl
yes | sudo python3 -m pip install cadCAD-0.0.2-py3-none-any.whl
# check for master node
IS_MASTER=false
if grep -i isMaster /mnt/var/lib/info/instance.json | grep -i true;
then
IS_MASTER=true
PRIVATE_IP=`hostname -I | xargs`
wget https://raw.githubusercontent.com/JEJodesty/cadCAD/dev/distroduce/dist/distroduce.zip
wget https://raw.githubusercontent.com/JEJodesty/cadCAD/dev/distroduce/messaging_sim.py
sudo sed -i -e '$a\export PYSPARK_PYTHON=/usr/bin/python3' /etc/spark/conf/spark-env.sh
wget http://apache.spinellicreations.com/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -xzf kafka_2.12-2.3.0.tgz
cd kafka_2.12-2.3.0
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
bin/kafka-topics.sh --create --bootstrap-server ${PRIVATE_IP}:9092 --replication-factor 1 --partitions 1 --topic test
fi

View File

@ -1,83 +0,0 @@
#!/bin/bash
# SSH into all machines
ssh -i ~/.ssh/joshua-IAM-keypair.pem hadoop@
sudo python3 -m pip install --upgrade pip
sudo python3 -m pip install pathos kafka-python
# SetUp Window: head node
cd ~
#sudo python3 -m pip install --upgrade pip
#sudo python3 -m pip install pathos kafka-python
sudo sed -i -e '$a\export PYSPARK_PYTHON=/usr/bin/python3' /etc/spark/conf/spark-env.sh
wget http://apache.spinellicreations.com/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -xzf kafka_2.12-2.3.0.tgz
cd kafka_2.12-2.3.0
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
# get ip
bin/kafka-topics.sh --create --bootstrap-server 10.0.0.9:9092 --replication-factor 1 --partitions 1 --topic test
# bin/kafka-topics.sh --list --bootstrap-server 10.0.0.9:9092
# Consume (Window): head node
bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.9:9092 --topic test --from-beginning
# DELETE
# bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test
# local
python3 setup.py sdist bdist_wheel
pip3 install dist/*.whl
# SCP: all nodes
# S3 duuhhhh
# git clone specific file
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-18-206-12-181.compute-1.amazonaws.com:/home/hadoop/
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-34-239-171-181.compute-1.amazonaws.com:/home/hadoop/
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-3-230-154-170.compute-1.amazonaws.com:/home/hadoop/
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-18-232-52-219.compute-1.amazonaws.com:/home/hadoop/
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-34-231-70-210.compute-1.amazonaws.com:/home/hadoop/
scp -i ~/.ssh/joshua-IAM-keypair.pem dist/*.whl hadoop@ec2-34-231-243-101.compute-1.amazonaws.com:/home/hadoop/
sudo python3 -m pip install *.whl
# SCP head node
# ToDo: zip build for cadCAD OR give py library after whl install
scp -i ~/.ssh/joshua-IAM-keypair.pem distroduce/dist/distroduce.zip hadoop@ec2-18-206-12-181.compute-1.amazonaws.com:/home/hadoop/
scp -i ~/.ssh/joshua-IAM-keypair.pem distroduce/messaging_sim.py hadoop@ec2-18-206-12-181.compute-1.amazonaws.com:/home/hadoop/
#scp -i ~/.ssh/joshua-IAM-keypair.pem dist/distroduce.zip hadoop@ec2-18-232-54-233.compute-1.amazonaws.com:/home/hadoop/
#scp -i ~/.ssh/joshua-IAM-keypair.pem examples/event_bench/main.py hadoop@ec2-18-232-54-233.compute-1.amazonaws.com:/home/hadoop/
# Run Window: Head Node
#spark-submit --master yarn messaging_app.py
spark-submit --master yarn --py-files distroduce.zip main.py
# Cluster Config
#[
# {
# "Classification": "spark-env",
# "Configurations": [
# {
# "Classification": "export",
# "ConfigurationProperties": {
# "PYSPARK_PYTHON": "/usr/bin/python3",
# "PYSPARK_DRIVER_PYTHON": "/usr/bin/python3"
# }
# }
# ]
# },
# {
# "Classification": "spark-defaults",
# "ConfigurationProperties": {
# "spark.sql.execution.arrow.enabled": "true"
# }
# },
# {
# "Classification": "spark",
# "Properties": {
# "maximizeResourceAllocation": "true"
# }
# }
#]

View File

@ -1,16 +0,0 @@
#!/bin/bash
aws emr create-cluster \
--applications Name=Hadoop Name=Hive Name=Spark \
--ec2-attributes '{"KeyName":"joshua-IAM-keypair","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-0034e615b047fd112","EmrManagedSlaveSecurityGroup":"sg-08e546ae27d86d6a3","EmrManagedMasterSecurityGroup":"sg-08e546ae27d86d6a3"}' \
--release-label emr-5.26.0 \
--log-uri 's3n://aws-logs-251682129355-us-east-1/elasticmapreduce/' \
--instance-groups '[{"InstanceCount":5,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"CORE","InstanceType":"m4.xlarge","Name":"Core - 2"},{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"MASTER","InstanceType":"m4.xlarge","Name":"Master - 1"}]' \
--configurations '[{"Classification":"spark-env","Properties":{},"Configurations":[{"Classification":"export","Properties":{}}]},{"Classification":"spark-defaults","Properties":{}},{"Classification":"spark","Properties":{"maximizeResourceAllocation":"true"}}]' \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--bootstrap-actions '[{"Path":"s3://insightde/emr/bootstraps/distroduce.sh","Name":"bootstrap"}]' \
--ebs-root-volume-size 10 \
--service-role EMR_DefaultRole \
--enable-debugging \
--name 'distibuted_produce' \
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
--region us-east-1

View File

@ -1,8 +0,0 @@
#!/bin/bash
wget http://apache.spinellicreations.com/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -xzf kafka_2.12-2.3.0.tgz
cd kafka_2.12-2.3.0
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

View File

@ -1,3 +0,0 @@
#!/bin/bash
#spark-submit --master yarn event_bench.py
spark-submit --py-files dist/distroduce.zip examples/event_bench/main.py

View File

@ -1,3 +0,0 @@
#!/bin/bash
PRIVATE_IP=`hostname -I | xargs`
spark-submit --master yarn --py-files distroduce.zip messaging_sim.py $PRIVATE_IP

View File

@ -1,4 +0,0 @@
#!/bin/bash
scp -i ~/.ssh/joshua-IAM-keypair.pem ~/Projects/event-bench/event_bench.py \
hadoop@ec2-3-230-158-62.compute-1.amazonaws.com:/home/hadoop/

Binary file not shown.

View File

@ -1,39 +0,0 @@
import os, json, boto3
from typing import List
def launch_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations):
def log_uri(name, region):
return f's3n://{name}-{region}/elasticmapreduce/'
os.system(f"""
aws emr create-cluster \
--applications Name=Hadoop Name=Hive Name=Spark \
--ec2-attributes '{json.dumps(ec2_attributes)}' \
--release-label emr-5.26.0 \
--log-uri '{str(log_uri(name, region))}' \
--instance-groups '{json.dumps(instance_groups)}' \
--configurations '{json.dumps(configurations)}' \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--bootstrap-actions '{json.dumps(bootstrap_actions)}' \
--ebs-root-volume-size 10 \
--service-role EMR_DefaultRole \
--enable-debugging \
--name '{name}' \
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
--region {region}
""")
# def benchmark(names: List[str], region, ec2_attributes, bootstrap_actions, instance_groups, configurations):
# current_dir = os.path.dirname(__file__)
# s3 = boto3.client('s3')
# bucket = 'insightde'
#
# file = 'distroduce.sh'
# abs_path = os.path.join(current_dir, file)
# key = f'emr/bootstraps/{file}'
#
# s3.upload_file(abs_path, bucket, key)
# for name in names:
# launch_cluster(name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)

View File

@ -1,5 +0,0 @@
from distroduce.emr import launch_cluster
from distroduce.configuration.cluster import ec2_attributes, bootstrap_actions, instance_groups, configurations
region = 'us-east-1'
cluster_name = 'distibuted_produce'
launch_cluster(cluster_name, region, ec2_attributes, bootstrap_actions, instance_groups, configurations)

View File

@ -1 +0,0 @@
from distroduce.executor.spark.jobs import distributed_produce

View File

@ -1,44 +0,0 @@
from distroduce.spark.session import sc
def distributed_produce(
simulation_execs,
var_dict_list,
states_lists,
configs_structs,
env_processes_list,
Ts,
Ns,
userIDs,
sessionIDs,
simulationIDs,
runIDs,
spark_context=sc,
kafka_config=None
):
func_params_zipped = list(
zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list)
)
func_params_kv = [((t[0], t[1], t[2], t[3]), (t[4], t[5], t[6])) for t in func_params_zipped]
def simulate(k, v):
from kafka import KafkaProducer
prod_config = kafka_config['producer_config']
kafka_config['producer'] = KafkaProducer(**prod_config)
(sim_exec, config, env_procs) = [f[1] for f in func_params_kv if f[0] == k][0]
results = sim_exec(
v['var_dict'], v['states_lists'], config, env_procs, v['Ts'], v['Ns'],
k[0], k[1], k[2], k[3], kafka_config
)
return results
val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns))
val_params_kv = [
(
(t[0], t[1], t[2], t[3]),
{'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}
) for t in val_params
]
results_rdd = spark_context.parallelize(val_params_kv).coalesce(1)
return list(results_rdd.map(lambda x: simulate(*x)).collect())

View File

@ -1,46 +0,0 @@
import sys
from datetime import datetime
from cadCAD import configs
from cadCAD.configuration.utils import config_sim
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from distroduce.simulation import main, sim_composition
from distroduce.spark.session import sc
from distroduce.executor.spark import distributed_produce
if __name__ == "__main__":
# Initial States
initial_conditions = {
'record_creation': datetime.now(),
'client_a': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0},
'client_b': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0},
'total_msg_count': 0,
'total_send_time': 0.000000
}
'''
Simulation Configuration:
N = Simulation Runs
T = Timesteps for each Partial State Update Block
'''
sim_config = config_sim(
{
"N": 1,
"T": range(10),
}
)
# Configuration for Kafka Producer
kafkaConfig = {
'send_topic': 'test',
'producer_config': {
'bootstrap_servers': f'{sys.argv[1]}:9092',
'acks': 'all'
}
}
exec_mode = ExecutionMode()
dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc, method=distributed_produce, kafka_config=kafkaConfig)
run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc)
main(run, sim_config, initial_conditions, sim_composition)

View File

@ -1,46 +0,0 @@
import sys
from datetime import datetime
from cadCAD import configs
from cadCAD.configuration.utils import config_sim
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from distroduce.simulation import main, sim_composition
from distroduce.spark.session import sc
from distroduce.executor.spark import distributed_produce
if __name__ == "__main__":
# Initial States
initial_conditions = {
'record_creation': datetime.now(),
'client_a': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0},
'client_b': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0},
'total_msg_count': 0,
'total_send_time': 0.000000
}
'''
Simulation Configuration:
N = Simulation Runs
T = Timesteps for each Partial State Update Block
'''
sim_config = config_sim(
{
"N": 1,
"T": range(5000),
}
)
# Configuration for Kafka Producer
kafkaConfig = {
'send_topic': 'test',
'producer_config': {
'bootstrap_servers': f'{sys.argv[1]}:9092',
'acks': 'all'
}
}
exec_mode = ExecutionMode()
dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc, method=distributed_produce, kafka_config=kafkaConfig)
run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc)
main(run, sim_config, initial_conditions, sim_composition)

View File

@ -1,87 +0,0 @@
import sys
from pprint import pprint
import pandas as pd
from tabulate import tabulate
from cadCAD.utils import arrange_cols
from cadCAD.configuration import append_configs
from distroduce.action_policies import enter_action, message_actions, exit_action
from distroduce.state_updates import send_message, count_messages, add_send_time, current_time
# State Updates
variables = {
'client_a': send_message('client_a'),
'client_b': send_message('client_b'),
'total_msg_count': count_messages,
'total_send_time': add_send_time,
'record_creation': current_time('record_creation')
}
# Action Policies
policy_group_1 = {
"action_1": enter_action('server', 'room_1', 'A'),
"action_2": enter_action('server', 'room_1', 'B')
}
policy_group_2 = {
"action_1": message_actions('client_A', 'room_1', "Hi B", 'A', 'B'),
"action_2": message_actions('client_B', 'room_1', "Hi A", 'B', 'A')
}
policy_group_3 = {
"action_1": message_actions('client_A', 'room_1', "Bye B", 'A', 'B'),
"action_2": message_actions('client_B', 'room_1', "Bye A", 'B', 'A')
}
policy_group_4 = {
"action_1": exit_action('server', 'room_1', 'A'),
"action_2": exit_action('server', 'room_1', 'B')
}
policy_groups = [policy_group_1, policy_group_2, policy_group_3, policy_group_4]
# Partial State update Block
sim_composition = [{'policies': policy_group, 'variables': variables} for policy_group in policy_groups]
def main(executor, sim_config, intitial_conditions, sim_composition):
append_configs(
user_id='Joshua',
sim_configs=sim_config,
initial_state=intitial_conditions,
partial_state_update_blocks=sim_composition,
policy_ops=[lambda a, b: a + b]
)
pprint(sim_composition)
i = 0
for raw_result, tensor_field in executor.execute():
result = arrange_cols(pd.DataFrame(raw_result), False)
metrics_result = result[
[
'run_id', 'timestep', 'substep',
'record_creation', 'total_msg_count', 'total_send_time'
]
]
msgs_result = result[
[
'run_id', 'timestep', 'substep',
'record_creation',
'client_a', 'client_b'
]
]
print()
if i == 0:
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
last = metrics_result.tail(1)
last['msg_per_sec'] = last['total_msg_count'] / last['total_send_time']
print("Messages Output: Head")
print(tabulate(msgs_result.head(5), headers='keys', tablefmt='psql'))
print("Metrics Output: Head")
print(tabulate(metrics_result.head(5), headers='keys', tablefmt='psql'))
print("Metrics Output: Tail")
print(tabulate(metrics_result.tail(5), headers='keys', tablefmt='psql'))
print(tabulate(last, headers='keys', tablefmt='psql'))
print()
i += 1

View File

@ -1,16 +0,0 @@
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import os
# os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3'
# os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3'
# os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'
spark = SparkSession\
.builder\
.appName("distroduce")\
.getOrCreate()
sc: SparkContext = spark.sparkContext
print(f"Spark UI: {sc.uiWebUrl}")
print()

View File

@ -1,60 +0,0 @@
from copy import deepcopy
from datetime import datetime
from functools import reduce
add = lambda a, b: a + b
'''
Function that maintains the state of users in a chat room
'''
def update_users(users, actions, action_types=['send','enter','exit']):
users = deepcopy(users)
for action_type in action_types:
if action_type in actions['types']:
for msg in actions['messages']:
if msg['action'] == 'send' and action_type == 'send':
continue
elif msg['action'] == 'enter' and action_type == 'enter':
for user in msg['sender']:
users.append(user) # register_entered
elif msg['action'] == 'exit' and action_type == 'exit':
for user in msg['sender']:
users.remove(user) # remove_exited
return users
'''
State Update used to counts sent messages
'''
def count_messages(_g, step, sL, s, actions, kafkaConfig):
return 'total_msg_count', s['total_msg_count'] + reduce(add, actions['msg_counts'])
'''
State Update used to create to log message events
'''
def send_message(state):
return lambda _g, step, sL, s, actions, kafkaConfig: (
state,
{
'users': update_users(s[state]['users'], actions),
'messages': actions['messages'],
'msg_counts': reduce(add, actions['msg_counts']),
'send_times': reduce(add, actions['send_times'])
}
)
'''
State Update used to sum the time taken for the Kafka Producer to send messages between users
'''
def add_send_time(_g, step, sL, s, actions, kafkaConfig):
return 'total_send_time', s['total_send_time'] + reduce(add, actions['send_times'])
'''
State Update used to record the event record creation time
'''
def current_time(state):
return lambda _g, step, sL, s, actions, kafkaConfig: (state, datetime.now())

View File

@ -3,6 +3,4 @@ wheel
pathos
fn
tabulate
funcy
pyspark
kafka-python
funcy

View File

@ -7,7 +7,6 @@ by BlockScience. It is capable of modeling systems at all levels of abstraction
System Dynamics (SD), and enabling smooth integration of computational social science simulations with empirical data
science workflows.
An economic system is treated as a state-based model and defined through a set of endogenous and exogenous state
variables which are updated through mechanisms and environmental processes, respectively. Behavioral models, which may
be deterministic or stochastic, provide the evolution of the system within the action space of the mechanisms.
@ -19,7 +18,7 @@ provided.
"""
setup(name='cadCAD',
version='0.0.2',
version='0.3.1',
description="cadCAD: a differential games based simulation software package for research, validation, and \
Computer Aided Design of economic systems",
long_description=long_description,
@ -34,7 +33,6 @@ setup(name='cadCAD',
"pathos",
"fn",
"tabulate",
"funcy",
"kafka-python"
"funcy"
]
)
)

View File

@ -152,7 +152,6 @@ sim_config = config_sim(
)
append_configs(
user_id='user_a',
sim_configs=sim_config,
initial_state=genesis_states,
env_processes=env_processes,

View File

@ -140,7 +140,6 @@ sim_config = config_sim(
)
append_configs(
user_id='user_b',
sim_configs=sim_config,
initial_state=genesis_states,
env_processes=env_processes,

View File

@ -144,7 +144,6 @@ sim_config = config_sim(
# New Convention
partial_state_update_blocks = psub_list(psu_block, psu_steps)
append_configs(
user_id='user_a',
sim_configs=sim_config,
initial_state=genesis_states,
seeds=seeds,

View File

@ -3,7 +3,6 @@ from typing import List
from tabulate import tabulate
# The following imports NEED to be in the exact order
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from cadCAD.utils import arrange_cols
from simulations.regression_tests import config1
from cadCAD import configs
@ -15,11 +14,8 @@ first_config = configs # only contains config1
single_proc_ctx = ExecutionContext(context=exec_mode.single_proc)
run = Executor(exec_context=single_proc_ctx, configs=first_config)
# print(set(result.columns) - set(['user_id', 'session_id', 'simulation_id', 'run_id']) - set(['run', 'timestep', 'substep']))
# print(['run', 'timestep', 'substep'])
raw_result, tensor_field = run.execute()
result = arrange_cols(pd.DataFrame(raw_result), False)
result = pd.DataFrame(raw_result)
print()
print("Tensor Field: config1")
# print(raw_result)

View File

@ -2,7 +2,6 @@ import pandas as pd
from tabulate import tabulate
# The following imports NEED to be in the exact order
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from cadCAD.utils import arrange_cols
from simulations.regression_tests import config2
from cadCAD import configs
@ -10,15 +9,15 @@ exec_mode = ExecutionMode()
print("Simulation Execution: Single Configuration")
print()
single_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
run = Executor(exec_context=single_proc_ctx, configs=configs)
first_config = configs # only contains config2
single_proc_ctx = ExecutionContext(context=exec_mode.single_proc)
run = Executor(exec_context=single_proc_ctx, configs=first_config)
for raw_result, tensor_field in run.execute():
result = arrange_cols(pd.DataFrame(raw_result), False)
print()
# print("Tensor Field: " + config_names[i])
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))
print()
# i += 1
raw_result, tensor_field = run.execute()
result = pd.DataFrame(raw_result)
print()
print("Tensor Field: config1")
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))
print()

View File

@ -2,7 +2,6 @@ import pandas as pd
from tabulate import tabulate
# The following imports NEED to be in the exact order
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from cadCAD.utils import arrange_cols
from simulations.regression_tests import config1, config2
from cadCAD import configs
@ -16,9 +15,9 @@ run = Executor(exec_context=multi_proc_ctx, configs=configs)
i = 0
config_names = ['config1', 'config2']
for raw_result, tensor_field in run.execute():
result = arrange_cols(pd.DataFrame(raw_result), False)
result = pd.DataFrame(raw_result)
print()
# print(f"Tensor Field: {config_names[i]}")
print(f"Tensor Field: {config_names[i]}")
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))

View File

@ -14,11 +14,11 @@ multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc)
run = Executor(exec_context=multi_proc_ctx, configs=configs)
i = 0
# config_names = ['sweep_config_A', 'sweep_config_B']
config_names = ['sweep_config_A', 'sweep_config_B']
for raw_result, tensor_field in run.execute():
result = pd.DataFrame(raw_result)
print()
# print("Tensor Field: " + config_names[i])
print("Tensor Field: " + config_names[i])
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))