This commit is contained in:
Joshua E. Jodesty 2019-09-25 14:33:22 -04:00
parent 662046f34f
commit 71c334d5f6
8 changed files with 139 additions and 25 deletions

View File

@ -10,17 +10,32 @@ by BlockScience
through simulation, with support for Monte Carlo methods, A/B testing and parameter sweeping.
# Getting Started
## 1. Install cadCAD
cadCAD requires [Python 3](https://www.python.org/downloads/)
cadCAD can be installed using Pythons package manager, [pip](https://pypi.org/project/cadCAD/)
```bash
pip install cadCAD
```
## 2. Learn the basics
Check out our tutorials (available both as [Jupyter Notebooks](tutorials) and
[videos](https://www.youtube.com/watch?v=uJEiYHRWA9g&list=PLmWm8ksQq4YKtdRV-SoinhV6LbQMgX1we)) to familiarize yourself
with some system modelling concepts and cadCAD terminology. Alternatively, go straight to the
[documentation](documentation).
## 1. Installation:
Requires [Python 3](https://www.python.org/downloads/)
## 3. Connect
**Option A: Install Using [pip](https://pypi.org/project/cadCAD/)**
```bash
pip3 install cadCAD
```
**Option B:** Build From Source
```
pip3 install -r requirements.txt
python3 setup.py sdist bdist_wheel
pip3 install dist/*.whl
```
## 2. Learn the basics
**Tutorials:** available both as [Jupyter Notebooks](tutorials) and [videos](https://www.youtube.com/watch?v=uJEiYHRWA9g&list=PLmWm8ksQq4YKtdRV-SoinhV6LbQMgX1we)
Familiarize yourself with some system modelling concepts and cadCAD terminology.
## 3. Documentation:
* [System Model Configuration](documentation/Simulation_Configuration.md)
* [System Simulation Execution](documentation/Simulation_Execution.md)
* [Policy Aggregation](documentation/Policy_Aggregation.md)
* [System Model Parameter Sweep](documentation/System_Model_Parameter_Sweep.md)
## 4. Connect
Find other cadCAD users at our [Discourse](https://community.cadcad.org/). We are a small but rapidly growing community.

View File

@ -1,2 +1,11 @@
name = "cadCAD"
configs = []
configs = []
print(r'''
__________ ____
________ __ _____/ ____/ | / __ \
/ ___/ __` / __ / / / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/ |_/_____/
by BlockScience
''')

View File

@ -3,6 +3,7 @@ from typing import Dict, Callable, List, Tuple
from functools import reduce
import pandas as pd
from pandas.core.frame import DataFrame
# import cloudpickle, pickle
from cadCAD import configs
from cadCAD.utils import key_filter
@ -101,7 +102,10 @@ class Identity:
def state_identity(self, k: str) -> Callable:
return lambda var_dict, sub_step, sL, s, _input: (k, s[k])
# state_identity = cloudpickle.dumps(state_identity)
def apply_identity_funcs(self, identity: Callable, df: DataFrame, cols: List[str]) -> List[DataFrame]:
# identity = pickle.loads(identity)
def fillna_with_id_func(identity, df, col):
return df[[col]].fillna(value=identity(col))

View File

@ -12,7 +12,6 @@ ascii_art = r'''
by Joshua E. Jodesty
'''
def distributed_produce(
sc: SparkContext,
spark_run,

View File

@ -2,6 +2,7 @@ from pprint import pprint
from typing import Callable, Dict, List, Any, Tuple
from pathos.multiprocessing import ProcessingPool as PPool
from pandas.core.frame import DataFrame
from pyspark.context import SparkContext
from cadCAD.utils import flatten
from cadCAD.configuration import Configuration, Processor
@ -17,6 +18,7 @@ EnvProcessesType = Dict[str, Callable]
class ExecutionMode:
single_proc = 'single_proc'
multi_proc = 'multi_proc'
dist_proc = 'dist_proc'
def single_proc_exec(
@ -52,7 +54,7 @@ def parallelize_simulations(
userIDs,
sessionIDs,
simulationIDs,
runIDs: List[int],
runIDs: List[int]
):
params = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs))
@ -60,6 +62,38 @@ def parallelize_simulations(
results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10]), params)
return results
def distributed_simulations(
sc: SparkContext,
simulation_execs: List[Callable],
var_dict_list: List[VarDictType],
states_lists: List[StatesListsType],
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
Ns: List[int],
userIDs,
sessionIDs,
simulationIDs,
runIDs: List[int]
):
params = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs))
pprint(runIDs)
with PPool(len(configs_structs)) as p:
results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10]), params)
return results
# _pickle.PicklingError: Can't pickle <function <lambda> at 0x1115149e0>: attribute lookup <lambda> on
# simulations.regression_tests.config1 failed
# simulation_execs, env_processes_list
# Configuration Layer: configs_structs
# AttributeError: Can't pickle local object 'Identity.state_identity.<locals>.<lambda>'
# pprint(configs_structs)
# sc.parallelize([configs_structs])
# exit()
# result = sc.parallelize(params) \
# .map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10])) \
# .collect()
class ExecutionContext:
def __init__(self, context: str = ExecutionMode.multi_proc) -> None:
@ -70,10 +104,13 @@ class ExecutionContext:
self.method = single_proc_exec
elif context == 'multi_proc':
self.method = parallelize_simulations
elif context == 'dist_proc':
self.method = distributed_simulations
class Executor:
def __init__(self, exec_context: ExecutionContext, configs: List[Configuration]) -> None:
def __init__(self, exec_context: ExecutionContext, configs: List[Configuration], spark_context: SparkContext = None) -> None:
self.sc = spark_context
self.SimExecutor = SimExecutor
self.exec_method = exec_context.method
self.exec_context = exec_context.name
@ -83,14 +120,6 @@ class Executor:
config_proc = Processor()
create_tensor_field = TensorFieldReport(config_proc).create_tensor_field
print(r'''
__________ ____
________ __ _____/ ____/ | / __ \
/ ___/ __` / __ / / / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/ |_/_____/
by BlockScience
''')
print(f'Execution Mode: {self.exec_context + ": " + str(self.configs)}')
print(f'Configurations: {self.configs}')
@ -141,5 +170,16 @@ class Executor:
results.append((flatten(result), create_tensor_field(partial_state_updates, ep)))
final_result = results
elif self.exec_context == ExecutionMode.dist_proc:
# if len(self.configs) > 1:
simulations = self.exec_method(
self.sc,
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns,
userIDs, sessionIDs, simulationIDs, runIDs
)
results = []
for result, partial_state_updates, ep in list(zip(simulations, partial_state_updates, eps)):
results.append((flatten(result), create_tensor_field(partial_state_updates, ep)))
final_result = None
return final_result

View File

@ -5,4 +5,5 @@ fn
tabulate
funcy
pyspark
kafka-python
kafka-python
cloudpickle

View File

@ -0,0 +1,31 @@
from pprint import pprint
import pandas as pd
from tabulate import tabulate
from simulations.test_executions.spark.session import spark_context as sc
from simulations.regression_tests import config1, config2
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
from cadCAD.utils import arrange_cols
from cadCAD import configs
exec_mode = ExecutionMode()
print("Simulation Execution: Distributed Execution")
dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc)
run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc)
# pprint(dist_proc_ctx)
# print(configs)
i = 0
config_names = ['config1', 'config2']
for raw_result, tensor_field in run.execute():
result = arrange_cols(pd.DataFrame(raw_result), False)
print()
# print(f"Tensor Field: {config_names[i]}")
print(tabulate(tensor_field, headers='keys', tablefmt='psql'))
print("Output:")
print(tabulate(result, headers='keys', tablefmt='psql'))
print()
i += 1

View File

@ -0,0 +1,15 @@
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import os
os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3'
spark = SparkSession\
.builder\
.appName("distroduce")\
.getOrCreate()
spark_context: SparkContext = spark.sparkContext
print(f"Spark UI: {spark_context.uiWebUrl}")
print()