diff --git a/README.md b/README.md index 339c9a2..8950918 100644 --- a/README.md +++ b/README.md @@ -10,17 +10,32 @@ by BlockScience through simulation, with support for Monte Carlo methods, A/B testing and parameter sweeping. # Getting Started -## 1. Install cadCAD -cadCAD requires [Python 3](https://www.python.org/downloads/) -cadCAD can be installed using Python’s package manager, [pip](https://pypi.org/project/cadCAD/) -```bash -pip install cadCAD -``` -## 2. Learn the basics -Check out our tutorials (available both as [Jupyter Notebooks](tutorials) and -[videos](https://www.youtube.com/watch?v=uJEiYHRWA9g&list=PLmWm8ksQq4YKtdRV-SoinhV6LbQMgX1we)) to familiarize yourself -with some system modelling concepts and cadCAD terminology. Alternatively, go straight to the -[documentation](documentation). +## 1. Installation: +Requires [Python 3](https://www.python.org/downloads/) -## 3. Connect +**Option A: Install Using [pip](https://pypi.org/project/cadCAD/)** +```bash +pip3 install cadCAD +``` + +**Option B:** Build From Source +``` +pip3 install -r requirements.txt +python3 setup.py sdist bdist_wheel +pip3 install dist/*.whl +``` + + +## 2. Learn the basics +**Tutorials:** available both as [Jupyter Notebooks](tutorials) and [videos](https://www.youtube.com/watch?v=uJEiYHRWA9g&list=PLmWm8ksQq4YKtdRV-SoinhV6LbQMgX1we) + +Familiarize yourself with some system modelling concepts and cadCAD terminology. + +## 3. Documentation: +* [System Model Configuration](documentation/Simulation_Configuration.md) +* [System Simulation Execution](documentation/Simulation_Execution.md) +* [Policy Aggregation](documentation/Policy_Aggregation.md) +* [System Model Parameter Sweep](documentation/System_Model_Parameter_Sweep.md) + +## 4. Connect Find other cadCAD users at our [Discourse](https://community.cadcad.org/). We are a small but rapidly growing community. diff --git a/cadCAD/__init__.py b/cadCAD/__init__.py index ce0b0f9..cba296c 100644 --- a/cadCAD/__init__.py +++ b/cadCAD/__init__.py @@ -1,2 +1,11 @@ name = "cadCAD" -configs = [] \ No newline at end of file +configs = [] + +print(r''' + __________ ____ + ________ __ _____/ ____/ | / __ \ + / ___/ __` / __ / / / /| | / / / / +/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ / +\___/\__,_/\__,_/\____/_/ |_/_____/ +by BlockScience +''') \ No newline at end of file diff --git a/cadCAD/configuration/__init__.py b/cadCAD/configuration/__init__.py index efa790b..32e4bc3 100644 --- a/cadCAD/configuration/__init__.py +++ b/cadCAD/configuration/__init__.py @@ -3,6 +3,7 @@ from typing import Dict, Callable, List, Tuple from functools import reduce import pandas as pd from pandas.core.frame import DataFrame +# import cloudpickle, pickle from cadCAD import configs from cadCAD.utils import key_filter @@ -101,7 +102,10 @@ class Identity: def state_identity(self, k: str) -> Callable: return lambda var_dict, sub_step, sL, s, _input: (k, s[k]) + # state_identity = cloudpickle.dumps(state_identity) + def apply_identity_funcs(self, identity: Callable, df: DataFrame, cols: List[str]) -> List[DataFrame]: + # identity = pickle.loads(identity) def fillna_with_id_func(identity, df, col): return df[[col]].fillna(value=identity(col)) diff --git a/cadCAD/distroduce/executor/spark/jobs/__init__.py b/cadCAD/distroduce/executor/spark/jobs/__init__.py index b22f7e4..9fb8d3a 100644 --- a/cadCAD/distroduce/executor/spark/jobs/__init__.py +++ b/cadCAD/distroduce/executor/spark/jobs/__init__.py @@ -12,7 +12,6 @@ ascii_art = r''' by Joshua E. Jodesty ''' - def distributed_produce( sc: SparkContext, spark_run, diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 227feaf..803a4eb 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -2,6 +2,7 @@ from pprint import pprint from typing import Callable, Dict, List, Any, Tuple from pathos.multiprocessing import ProcessingPool as PPool from pandas.core.frame import DataFrame +from pyspark.context import SparkContext from cadCAD.utils import flatten from cadCAD.configuration import Configuration, Processor @@ -17,6 +18,7 @@ EnvProcessesType = Dict[str, Callable] class ExecutionMode: single_proc = 'single_proc' multi_proc = 'multi_proc' + dist_proc = 'dist_proc' def single_proc_exec( @@ -52,7 +54,7 @@ def parallelize_simulations( userIDs, sessionIDs, simulationIDs, - runIDs: List[int], + runIDs: List[int] ): params = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, userIDs, sessionIDs, simulationIDs, runIDs)) @@ -60,6 +62,38 @@ def parallelize_simulations( results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10]), params) return results +def distributed_simulations( + sc: SparkContext, + simulation_execs: List[Callable], + var_dict_list: List[VarDictType], + states_lists: List[StatesListsType], + configs_structs: List[ConfigsType], + env_processes_list: List[EnvProcessesType], + Ts: List[range], + Ns: List[int], + userIDs, + sessionIDs, + simulationIDs, + runIDs: List[int] + ): + params = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, + userIDs, sessionIDs, simulationIDs, runIDs)) + pprint(runIDs) + with PPool(len(configs_structs)) as p: + results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10]), params) + return results + + # _pickle.PicklingError: Can't pickle at 0x1115149e0>: attribute lookup on + # simulations.regression_tests.config1 failed + # simulation_execs, env_processes_list + # Configuration Layer: configs_structs + # AttributeError: Can't pickle local object 'Identity.state_identity..' + # pprint(configs_structs) + # sc.parallelize([configs_structs]) + # exit() + # result = sc.parallelize(params) \ + # .map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10])) \ + # .collect() class ExecutionContext: def __init__(self, context: str = ExecutionMode.multi_proc) -> None: @@ -70,10 +104,13 @@ class ExecutionContext: self.method = single_proc_exec elif context == 'multi_proc': self.method = parallelize_simulations + elif context == 'dist_proc': + self.method = distributed_simulations class Executor: - def __init__(self, exec_context: ExecutionContext, configs: List[Configuration]) -> None: + def __init__(self, exec_context: ExecutionContext, configs: List[Configuration], spark_context: SparkContext = None) -> None: + self.sc = spark_context self.SimExecutor = SimExecutor self.exec_method = exec_context.method self.exec_context = exec_context.name @@ -83,14 +120,6 @@ class Executor: config_proc = Processor() create_tensor_field = TensorFieldReport(config_proc).create_tensor_field - print(r''' - __________ ____ - ________ __ _____/ ____/ | / __ \ - / ___/ __` / __ / / / /| | / / / / - / /__/ /_/ / /_/ / /___/ ___ |/ /_/ / - \___/\__,_/\__,_/\____/_/ |_/_____/ - by BlockScience - ''') print(f'Execution Mode: {self.exec_context + ": " + str(self.configs)}') print(f'Configurations: {self.configs}') @@ -141,5 +170,16 @@ class Executor: results.append((flatten(result), create_tensor_field(partial_state_updates, ep))) final_result = results + elif self.exec_context == ExecutionMode.dist_proc: + # if len(self.configs) > 1: + simulations = self.exec_method( + self.sc, + simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, + userIDs, sessionIDs, simulationIDs, runIDs + ) + results = [] + for result, partial_state_updates, ep in list(zip(simulations, partial_state_updates, eps)): + results.append((flatten(result), create_tensor_field(partial_state_updates, ep))) + final_result = None return final_result diff --git a/requirements.txt b/requirements.txt index 640db81..5944b38 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ fn tabulate funcy pyspark -kafka-python \ No newline at end of file +kafka-python +cloudpickle \ No newline at end of file diff --git a/simulations/test_executions/dist_prod_exec.py b/simulations/test_executions/dist_prod_exec.py new file mode 100644 index 0000000..548b549 --- /dev/null +++ b/simulations/test_executions/dist_prod_exec.py @@ -0,0 +1,31 @@ +from pprint import pprint + +import pandas as pd +from tabulate import tabulate + +from simulations.test_executions.spark.session import spark_context as sc +from simulations.regression_tests import config1, config2 + +from cadCAD.engine import ExecutionMode, ExecutionContext, Executor +from cadCAD.utils import arrange_cols +from cadCAD import configs + +exec_mode = ExecutionMode() + +print("Simulation Execution: Distributed Execution") +dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc) +run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc) +# pprint(dist_proc_ctx) + +# print(configs) +i = 0 +config_names = ['config1', 'config2'] +for raw_result, tensor_field in run.execute(): + result = arrange_cols(pd.DataFrame(raw_result), False) + print() + # print(f"Tensor Field: {config_names[i]}") + print(tabulate(tensor_field, headers='keys', tablefmt='psql')) + print("Output:") + print(tabulate(result, headers='keys', tablefmt='psql')) + print() + i += 1 \ No newline at end of file diff --git a/simulations/test_executions/spark/session/__init__.py b/simulations/test_executions/spark/session/__init__.py new file mode 100644 index 0000000..0ad2588 --- /dev/null +++ b/simulations/test_executions/spark/session/__init__.py @@ -0,0 +1,15 @@ +from pyspark.sql import SparkSession +from pyspark.context import SparkContext +import os + +os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3' +os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3' + +spark = SparkSession\ + .builder\ + .appName("distroduce")\ + .getOrCreate() + +spark_context: SparkContext = spark.sparkContext +print(f"Spark UI: {spark_context.uiWebUrl}") +print() \ No newline at end of file