diff --git a/cadCAD/__init__.py b/cadCAD/__init__.py index 7346dd1..e2a753b 100644 --- a/cadCAD/__init__.py +++ b/cadCAD/__init__.py @@ -1,5 +1,20 @@ name = "cadCAD" configs = [] -from ascii_art import production -print(production) \ No newline at end of file +print(r''' + __________ ____ + ________ __ _____/ ____/ | / __ \ + / ___/ __` / __ / / / /| | / / / / +/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ / +\___/\__,_/\__,_/\____/_/ |_/_____/ +by BlockScience +====================================== + Complex Adaptive Dynamics + o i e + m d s + p e i + u d g + t n + e + r +''') \ No newline at end of file diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 2f9f781..e06b716 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -6,7 +6,9 @@ from pandas.core.frame import DataFrame from pyspark.context import SparkContext from pyspark import cloudpickle import pickle +from fn.func import curried +from cadCAD.distroduce.configuration.kakfa import configure_producer from cadCAD.utils import flatten from cadCAD.configuration import Configuration, Processor from cadCAD.configuration.utils import TensorFieldReport @@ -65,7 +67,6 @@ def parallelize_simulations( results = p.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10]), params) return results - def distributed_simulations( simulation_execs: List[Callable], var_dict_list: List[VarDictType], @@ -78,7 +79,8 @@ def distributed_simulations( sessionIDs, simulationIDs, runIDs: List[int], - sc: SparkContext = None + sc, + kafkaConfig ): func_params_zipped = list( @@ -86,11 +88,13 @@ def distributed_simulations( ) func_params_kv = [((t[0], t[1], t[2], t[3]), (t[4], t[5], t[6])) for t in func_params_zipped] def simulate(k, v): + from kafka import KafkaProducer + prod_config = kafkaConfig['producer_config'] + kafkaConfig['producer'] = KafkaProducer(**prod_config) (sim_exec, config, env_procs) = [f[1] for f in func_params_kv if f[0] == k][0] - print(env_procs) results = sim_exec( v['var_dict'], v['states_lists'], config, env_procs, v['Ts'], v['Ns'], - k[0], k[1], k[2], k[3] + k[0], k[1], k[2], k[3], kafkaConfig ) return results @@ -102,22 +106,36 @@ def distributed_simulations( {'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]} ) for t in val_params ] - results_rdd = sc.parallelize(val_params_kv).map(lambda x: simulate(*x)) + results_rdd = sc.parallelize(val_params_kv).coalesce(35) - return list(results_rdd.collect()) + return list(results_rdd.map(lambda x: simulate(*x)).collect()) class ExecutionContext: - def __init__(self, context: str = ExecutionMode.multi_proc) -> None: + def __init__(self, + context=ExecutionMode.multi_proc, + # spark_context=None, + # kafka_config=None, + # spark_data_transformation=None, + method=None) -> None: self.name = context - self.method = None + # self.method = method + + # def dist_proc_closure(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, + # userIDs, sessionIDs, simulationIDs, runIDs, + # sc=spark_context, kafkaConfig=kafka_config): + # return distributed_simulations( + # simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, + # userIDs, sessionIDs, simulationIDs, runIDs, + # spark_context, spark_data_transformation, kafka_config + # ) if context == 'single_proc': self.method = single_proc_exec elif context == 'multi_proc': self.method = parallelize_simulations elif context == 'dist_proc': - self.method = distributed_simulations + self.method = method class Executor: @@ -176,7 +194,6 @@ class Executor: simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, userIDs, sessionIDs, simulationIDs, runIDs ) - elif self.exec_context == ExecutionMode.dist_proc: simulations = self.exec_method( simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, diff --git a/cadCAD/engine/simulation.py b/cadCAD/engine/simulation.py index 24949c0..55573ae 100644 --- a/cadCAD/engine/simulation.py +++ b/cadCAD/engine/simulation.py @@ -1,11 +1,8 @@ -from pprint import pprint from typing import Any, Callable, Dict, List, Tuple -from pathos.pools import ThreadPool as TPool +# from pathos.pools import ThreadPool as TPool from copy import deepcopy from functools import reduce -from pyspark import SparkContext - from cadCAD.engine.utils import engine_exception from cadCAD.utils import flatten @@ -30,13 +27,14 @@ class Executor: sub_step: int, sL: List[Dict[str, Any]], s: Dict[str, Any], - funcs: List[Callable] + funcs: List[Callable], + kafkaConfig ) -> Dict[str, Any]: ops = self.policy_ops def get_col_results(sweep_dict, sub_step, sL, s, funcs): - return list(map(lambda f: f(sweep_dict, sub_step, sL, s), funcs)) + return list(map(lambda f: f(sweep_dict, sub_step, sL, s, kafkaConfig), funcs)) def compose(init_reduction_funct, funct_list, val_list): result, i = None, 0 @@ -108,18 +106,18 @@ class Executor: policy_funcs: List[Callable], env_processes: Dict[str, Callable], time_step: int, - run: int + run: int, + kafkaConfig ) -> List[Dict[str, Any]]: last_in_obj: Dict[str, Any] = deepcopy(sL[-1]) _input: Dict[str, Any] = self.policy_update_exception( - self.get_policy_input(sweep_dict, sub_step, sH, last_in_obj, policy_funcs) + self.get_policy_input(sweep_dict, sub_step, sH, last_in_obj, policy_funcs, kafkaConfig) ) - def generate_record(state_funcs): for f in state_funcs: - yield self.state_update_exception(f(sweep_dict, sub_step, sH, last_in_obj, _input)) + yield self.state_update_exception(f(sweep_dict, sub_step, sH, last_in_obj, _input, kafkaConfig)) def transfer_missing_fields(source, destination): for k in source: @@ -131,7 +129,6 @@ class Executor: last_in_copy: Dict[str, Any] = transfer_missing_fields(last_in_obj, dict(generate_record(state_funcs))) last_in_copy: Dict[str, Any] = self.apply_env_proc(sweep_dict, env_processes, last_in_copy) last_in_copy['substep'], last_in_copy['timestep'], last_in_copy['run'] = sub_step, time_step, run - sL.append(last_in_copy) del last_in_copy @@ -145,7 +142,8 @@ class Executor: configs: List[Tuple[List[Callable], List[Callable]]], env_processes: Dict[str, Callable], time_step: int, - run: int + run: int, + kafkaConfig ) -> List[Dict[str, Any]]: sub_step = 0 @@ -162,11 +160,9 @@ class Executor: states_list: List[Dict[str, Any]] = [genesis_states] sub_step += 1 - for [s_conf, p_conf] in configs: # tensor field - states_list: List[Dict[str, Any]] = self.partial_state_update( - sweep_dict, sub_step, states_list, simulation_list, s_conf, p_conf, env_processes, time_step, run + sweep_dict, sub_step, states_list, simulation_list, s_conf, p_conf, env_processes, time_step, run, kafkaConfig ) sub_step += 1 @@ -182,15 +178,15 @@ class Executor: configs: List[Tuple[List[Callable], List[Callable]]], env_processes: Dict[str, Callable], time_seq: range, - run: int + run: int, + kafkaConfig ) -> List[List[Dict[str, Any]]]: - time_seq: List[int] = [x + 1 for x in time_seq] simulation_list: List[List[Dict[str, Any]]] = [states_list] for time_step in time_seq: pipe_run: List[Dict[str, Any]] = self.state_update_pipeline( - sweep_dict, simulation_list, configs, env_processes, time_step, run + sweep_dict, simulation_list, configs, env_processes, time_step, run, kafkaConfig ) _, *pipe_run = pipe_run @@ -210,12 +206,11 @@ class Executor: session_id, simulation_id, run_id, - sc: SparkContext = None + kafkaConfig ) -> List[List[Dict[str, Any]]]: def execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run) -> List[Dict[str, Any]]: run += 1 - def generate_init_sys_metrics(genesis_states_list): for d in genesis_states_list: d['run'], d['substep'], d['timestep'] = run, 0, 0 @@ -225,15 +220,11 @@ class Executor: states_list_copy: List[Dict[str, Any]] = list(generate_init_sys_metrics(deepcopy(states_list))) first_timestep_per_run: List[Dict[str, Any]] = self.run_pipeline( - sweep_dict, states_list_copy, configs, env_processes, time_seq, run + sweep_dict, states_list_copy, configs, env_processes, time_seq, run, kafkaConfig ) del states_list_copy return first_timestep_per_run - # - # pprint() - # - # exit() pipe_run = flatten( [execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run) for run in range(runs)] diff --git a/dist/cadCAD-0.0.2-py3-none-any.whl b/dist/cadCAD-0.0.2-py3-none-any.whl new file mode 100644 index 0000000..bb931c5 Binary files /dev/null and b/dist/cadCAD-0.0.2-py3-none-any.whl differ diff --git a/dist/cadCAD-0.0.2.tar.gz b/dist/cadCAD-0.0.2.tar.gz new file mode 100644 index 0000000..ee8ebc9 Binary files /dev/null and b/dist/cadCAD-0.0.2.tar.gz differ diff --git a/dist/cadCAD-0.3.0-py3-none-any.whl b/dist/cadCAD-0.3.0-py3-none-any.whl deleted file mode 100644 index 08d2459..0000000 Binary files a/dist/cadCAD-0.3.0-py3-none-any.whl and /dev/null differ diff --git a/dist/cadCAD-0.3.0.tar.gz b/dist/cadCAD-0.3.0.tar.gz deleted file mode 100644 index 9c8e0fb..0000000 Binary files a/dist/cadCAD-0.3.0.tar.gz and /dev/null differ diff --git a/distributed_produce/examples/event_bench/main.py b/distributed_produce/examples/event_bench/main.py index c4d1edf..e4a96a2 100644 --- a/distributed_produce/examples/event_bench/main.py +++ b/distributed_produce/examples/event_bench/main.py @@ -11,7 +11,7 @@ from distributed_produce.examples.event_bench.spark.session import spark_context def main(sc, spark_runs, rdd_parts, sim_time, sim_runs, parameterized_message): publish_times, spark_job_times = [], [] - prod_config = {'bootstrap_servers': 'localhost:9092', 'acks': 0} + prod_config = {'bootstrap_servers': 'localhost:9092', 'acks': 'all'} exec_spark_job = lambda run: distributed_produce( sc, run, sim_time, sim_runs, rdd_parts, parameterized_message, prod_config ) diff --git a/requirements.txt b/requirements.txt index 5944b38..a46529d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,3 @@ tabulate funcy pyspark kafka-python -cloudpickle \ No newline at end of file diff --git a/setup.py b/setup.py index 56e3976..71eb94c 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ provided. """ setup(name='cadCAD', - version='0.3.0', + version='0.0.2', description="cadCAD: a differential games based simulation software package for research, validation, and \ Computer Aided Design of economic systems", long_description=long_description, @@ -27,5 +27,14 @@ setup(name='cadCAD', author='Joshua E. Jodesty', author_email='joshua@block.science, joshua.jodesty@gmail.com', license='LICENSE.txt', - packages=find_packages() + packages=find_packages(), + install_requires=[ + "pandas", + "wheel", + "pathos", + "fn", + "tabulate", + "funcy", + "kafka-python" + ] ) \ No newline at end of file diff --git a/simulations/distributed/config1.py b/simulations/distributed/config1.py index 1e32d9f..8bf81da 100644 --- a/simulations/distributed/config1.py +++ b/simulations/distributed/config1.py @@ -1,124 +1,151 @@ -import numpy as np -from datetime import timedelta +import random +from copy import deepcopy +from datetime import timedelta, datetime from cadCAD.configuration import append_configs from cadCAD.configuration.utils import bound_norm_random, config_sim, time_step, env_trigger +from cadCAD.utils.sys_config import update_timestamp -seeds = { - 'z': np.random.RandomState(1), - 'a': np.random.RandomState(2), - 'b': np.random.RandomState(3), - 'c': np.random.RandomState(4) -} +def choose_rnd(x: list, choices): + def choose(x, choices): + for n in list(range(choices)): + elem = random.choice(x) + x.remove(elem) + yield elem + + copied_list = deepcopy(x) + results = list(choose(copied_list, choices)) + del copied_list + return results + + +def message(sender, receiver, _input): + return {'sender': sender, 'receiver': receiver, 'input': _input, 'sent_time': datetime.now()} + +def enter_room_msgs(room, users): + return [message(user, room, f"{user} enters chat-room") for user in users] + +def exit_room_msgs(room, users): + return [message(user, room, f"{user} exited chat-room") for user in users] + +rooms = ['room_1', 'room_2'] +user_group_1 = ['A', 'B', 'C', 'D'] +user_group_2 = ['E', 'F', 'G', 'H'] +room_1_messages = enter_room_msgs('room_1', random.shuffle(user_group_1)) +room_2_messages = enter_room_msgs('room_2', random.shuffle(user_group_2)) +def intitialize_conditions(): + users = user_group_1 + user_group_2 + messages = sorted(room_1_messages + room_2_messages, key=lambda i: i['time']) + room_1_session = {'room': 'room_1', 'users': user_group_1, 'messages': room_1_messages} + return { + 'client_a': room_1_session, + 'client_b': room_1_session, + 'server': {'rooms': rooms, 'users': users, 'messages': messages}, + 'record_creation': datetime.now() + } + + +def send_message(room, sender, receiver, _input): + return lambda _g, step, sL, s: { + 'types': ['send'], + 'events': [ + { + 'type': 'send', + 'room': room, + 'user': sender, + 'sent': [message(sender, receiver, _input)] + } + ] + } + + +def exit_room(room, sender): + return lambda _g, step, sL, s: { + 'types': ['exit'], + 'events': [ + { + 'type': 'exit', + 'room': room, + 'user': sender, + 'sent': exit_room_msgs(sender, room) + } + ] + } # Policies per Mechanism -def p1m1(_g, step, sL, s): - return {'param1': 1} -def p2m1(_g, step, sL, s): - return {'param1': 1, 'param2': 4} +# ToDo Randomize client choices in runtime +[alpha, omega] = choose_rnd(user_group_1, 2) +a_msg1 = send_message('room_1', alpha, omega, f'Hello {omega}') +b_msg1 = send_message('room_1', omega, alpha, f'Hello {alpha}') -def p1m2(_g, step, sL, s): - return {'param1': 'a', 'param2': 2} -def p2m2(_g, step, sL, s): - return {'param1': 'b', 'param2': 4} +a_msg2 = send_message('room_1', alpha, omega, f'Bye {omega}') +b_msg2 = send_message('room_1', omega, alpha, f'Bye {alpha}') -def p1m3(_g, step, sL, s): - return {'param1': ['c'], 'param2': np.array([10, 100])} -def p2m3(_g, step, sL, s): - return {'param1': ['d'], 'param2': np.array([20, 200])} +a_msg3 = exit_room('room_1', alpha) +b_msg3 = exit_room('room_1', omega) + +def remove_exited_users(users, actions): + users = deepcopy(users) + if 'exit' in actions['types']: + for event in actions['events']: + if event['type'] == 'exit': + for user in event['user']: + users.remove(user) + return users + +# State Updates + +# {'room': 'room_1', 'users': user_group_1, 'messages': room_1_messages} +def process_messages(_g, step, sL, s, actions): + + return 'client', {'room': s['room'], 'users': users, 'messages': actions['sent']} + +def process_exits(_g, step, sL, s, actions): + users = remove_exited_users(s['users'], actions) + return 'server', {'rooms': s['room'], 'users': users, 'messages': actions['sent']} -def s1m1(_g, step, sL, s, _input): - y = 's1' - x = s['s1'] + 1 - return (y, x) -def s2m1(_g, step, sL, s, _input): - y = 's2' - x = _input['param2'] - return (y, x) -def s1m2(_g, step, sL, s, _input): - y = 's1' - x = s['s1'] + 1 - return (y, x) -def s2m2(_g, step, sL, s, _input): - y = 's2' - x = _input['param2'] - return (y, x) +update_record_creation = update_timestamp( + 'record_creation', + timedelta(days=0, minutes=0, seconds=30), + '%Y-%m-%d %H:%M:%S' +) -def s1m3(_g, step, sL, s, _input): - y = 's1' - x = s['s1'] + 1 - return (y, x) -def s2m3(_g, step, sL, s, _input): - y = 's2' - x = _input['param2'] - return (y, x) - -def policies(_g, step, sL, s, _input): - y = 'policies' - x = _input - return (y, x) - - -def update_timestamp(_g, step, sL, s, _input): - y = 'timestamp' - return y, time_step(dt_str=s[y], dt_format='%Y-%m-%d %H:%M:%S', _timedelta=timedelta(days=0, minutes=0, seconds=1)) - - -# Genesis States -genesis_states = { - 's1': 0.0, - 's2': 0.0, - 's3': 1.0, - 's4': 1.0, - 'timestamp': '2018-10-01 15:16:24' -} - - -# Environment Process -# ToDo: Depreciation Waring for env_proc_trigger convention -trigger_timestamps = ['2018-10-01 15:16:25', '2018-10-01 15:16:27', '2018-10-01 15:16:29'] -env_processes = { - "s3": [lambda _g, x: 5], - "s4": env_trigger(3)(trigger_field='timestamp', trigger_vals=trigger_timestamps, funct_list=[lambda _g, x: 10]) -} - - -partial_state_update_block = [ - { - "policies": { - "b1": p1m1, - "b2": p2m1 - }, - "variables": { - "s1": s1m1, - "s2": s2m1, - "timestamp": update_timestamp - } - }, - { - "policies": { - "b1": p1m2, - "b2": p2m2 - }, - "variables": { - "s1": s1m2, - "s2": s2m2 - } - }, - { - "policies": { - "b1": p1m3, - "b2": p2m3 - }, - "variables": { - "s1": s1m3, - "s2": s2m3 - } - } -] +# partial_state_update_block = [ +# { +# "policies": { +# "b1": a_msg1, +# "b2": b_msg1 +# }, +# "variables": { +# "client_a": client_a_m1, +# "client_b": client_b_m1, +# "received": update_timestamp +# } +# }, +# { +# "policies": { +# "b1": a_msg2, +# "b2": b_msg2 +# }, +# "variables": { +# "s1": s1m2, +# "s2": s2m2 +# } +# }, +# { +# "policies": { +# "b1": a_msg3, +# "b2": b_msg3 +# }, +# "variables": { +# "s1": s1m3, +# "s2": s2m3 +# } +# } +# ] sim_config = config_sim( @@ -128,11 +155,11 @@ sim_config = config_sim( } ) -append_configs( - user_id='user_a', - sim_configs=sim_config, - initial_state=genesis_states, - env_processes=env_processes, - partial_state_update_blocks=partial_state_update_block, - policy_ops=[lambda a, b: a + b] -) \ No newline at end of file +# append_configs( +# user_id='user_a', +# sim_configs=sim_config, +# initial_state=genesis_states, +# env_processes=env_processes, +# partial_state_update_blocks=partial_state_update_block, +# policy_ops=[lambda a, b: a + b] +# ) \ No newline at end of file diff --git a/simulations/distributed/messaging.py b/simulations/distributed/messaging.py new file mode 100644 index 0000000..a6082f4 --- /dev/null +++ b/simulations/distributed/messaging.py @@ -0,0 +1,132 @@ +from copy import deepcopy +from datetime import timedelta, datetime +import time + +from cadCAD.configuration import append_configs +from cadCAD.configuration.utils import config_sim + +# session = enters('room_1', ['A', 'B']) +intitial_conditions = { + 'record_creation': datetime.now(), + 'client_a': {'users': [], 'messages': [], 'avg_send_time': 1}, + 'client_b': {'users': [], 'messages': [], 'avg_send_time': 1} +} + + +# Actions +def message(client_id, room, action, _input, sender, receiver=None): + # start_time = datetime.now() + result = { + 'types': [action], + 'messages': [ + { + 'client': client_id, 'room': room, 'action': action, + 'sender': sender, 'receiver': receiver, + 'input': _input, + 'creatred': datetime.now() + } + ] + } + # datetime.now() - start_time + return result + +def enter_action(state, room, user): + return lambda _g, step, sL, s: message(state, room, 'enter', f"{user} enters {room}", user) + +def message_action(state, room, _input, sender, receiver): + return lambda _g, step, sL, s: message(state, room, 'send', _input, sender, receiver) + +def exit_action(state, room, user): + return lambda _g, step, sL, s: message(state, room, 'exit', f"{user} exited {room}", user) + +# State Updates +def update_users(users, actions, action_types=['send','enter','exit']): + users = deepcopy(users) + for action_type in action_types: + if action_type in actions['types']: + for msg in actions['messages']: + if msg['action'] == 'send' and action_type == 'send': + continue + elif msg['action'] == 'enter' and action_type == 'enter': + for user in msg['sender']: + users.append(user) # register_entered + elif msg['action'] == 'exit' and action_type == 'exit': + for user in msg['sender']: + users.remove(user) # remove_exited + return users + + +def send_message(state): + return lambda _g, step, sL, s, actions: ( + state, + { + 'users': update_users(s[state]['users'], actions), + 'messages': actions['messages'], 'avg_send_time': 1 + } + ) + +def current_time(state): + return lambda _g, step, sL, s, actions: (state, datetime.now()) + +sim_composition = [ + { + "policies": { + "b1": enter_action('server', 'room_1', 'A'), + "b2": enter_action('server', 'room_1', 'B') + }, + "variables": { + 'client_a': send_message('client_a'), + 'client_b': send_message('client_b'), + 'record_creation': current_time('record_creation') + } + }, + { + "policies": { + "b1": message_action('client_A', 'room_1', "Hi B", 'A', 'B'), + "b2": message_action('client_B', 'room_1', "Hi A", 'B', 'A') + }, + "variables": { + 'client_a': send_message('client_a'), + 'client_b': send_message('client_b'), + 'record_creation': current_time('record_creation') + } + }, + { + "policies": { + "b1": message_action('client_A', 'room_1', "Bye B", 'A', 'B'), + "b2": message_action('client_B', 'room_1', "Bye A", 'B', 'A') + }, + "variables": { + 'client_a': send_message('client_a'), + 'client_b': send_message('client_b'), + 'record_creation': current_time('record_creation') + } + }, + { + "policies": { + "b1": exit_action('server', 'room_1', 'A'), + "b2": exit_action('server', 'room_1', 'B') + }, + "variables": { + 'client_a': send_message('client_a'), + 'client_b': send_message('client_b'), + 'record_creation': current_time('record_creation') + } + } +] + +# 5 = 10000 / (500 x 4) +sim_config = config_sim( + { + "N": 5, + "T": range(500), + } +) + +append_configs( + user_id='user_a', + sim_configs=sim_config, + initial_state=intitial_conditions, + partial_state_update_blocks=sim_composition, + policy_ops=[lambda a, b: a + b] +) \ No newline at end of file diff --git a/simulations/distributed/messaging_app.py b/simulations/distributed/messaging_app.py new file mode 100644 index 0000000..fe3618c --- /dev/null +++ b/simulations/distributed/messaging_app.py @@ -0,0 +1,274 @@ +from functools import reduce + +import pandas as pd +from copy import deepcopy +from datetime import datetime + +from tabulate import tabulate + +from cadCAD.configuration import append_configs +from cadCAD.configuration.utils import config_sim + +from cadCAD.engine import ExecutionMode, ExecutionContext, Executor +from cadCAD.utils import arrange_cols +from cadCAD import configs + +from pyspark.sql import SparkSession +from pyspark.context import SparkContext + +from kafka import KafkaProducer + +def count(start, step): + while True: + yield start + start += step + +spark = SparkSession\ + .builder\ + .appName("distroduce")\ + .getOrCreate() + +sc: SparkContext = spark.sparkContext +print(f"Spark UI: {sc.uiWebUrl}") +print() + +# session = enters('room_1', ['A', 'B']) +intitial_conditions = { + 'record_creation': datetime.now(), + 'client_a': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0}, + 'client_b': {'users': [], 'messages': [], 'msg_count': 0, 'send_time': 0.0}, + 'total_msg_count': 0, + 'total_send_time': 0.000000 +} + + +# Actions +def messages(client_id, room, action, _input, sender, receiver=None): + return { + 'types': [action], + 'messages': [ + { + 'client': client_id, 'room': room, 'action': action, + 'sender': sender, 'receiver': receiver, + 'input': _input, + 'creatred': datetime.now() + } + ] + } + + +def enter_action(state, room, user): + def f(_g, step, sL, s, kafkaConfig): + msgs = messages(state, room, 'enter', f"{user} enters {room}", user) + msgs['send_times'] = [0.000000] + msgs['msg_counts'] = [len(msgs['messages'])] + return msgs + return f + +def message_actions(state, room, _input, sender, receiver): + msgs = messages(state, room, 'send', _input, sender, receiver) + msgs_list = msgs['messages'] + def send_action(_g, step, sL, s, kafkaConfig): + start_time = datetime.now() + for msg in msgs_list: + producer: KafkaProducer = kafkaConfig['producer'] + topic: str = kafkaConfig['send_topic'] + encoded_msg = str(msg).encode('utf-8') + producer.send(topic, encoded_msg) + msgs['send_times'] = [(datetime.now() - start_time).total_seconds()] + msgs['msg_counts'] = [len(msgs_list)] + return msgs + + return send_action + +def exit_action(state, room, user): + def f(_g, step, sL, s, kafkaConfig): + msgs = messages(state, room, 'exit', f"{user} exited {room}", user) + msgs_list = msgs['messages'] + msgs['send_times'] = [0.000000] + msgs['msg_counts'] = [len(msgs_list)] + return msgs + return f + +# State Updates +def update_users(users, actions, action_types=['send','enter','exit']): + users = deepcopy(users) + for action_type in action_types: + if action_type in actions['types']: + for msg in actions['messages']: + if msg['action'] == 'send' and action_type == 'send': + continue + elif msg['action'] == 'enter' and action_type == 'enter': + for user in msg['sender']: + users.append(user) # register_entered + elif msg['action'] == 'exit' and action_type == 'exit': + for user in msg['sender']: + users.remove(user) # remove_exited + return users + +add = lambda a, b: a + b +def count_messages(_g, step, sL, s, actions, kafkaConfig): + return 'total_msg_count', s['total_msg_count'] + reduce(add, actions['msg_counts']) + +def add_send_time(_g, step, sL, s, actions, kafkaConfig): + return 'total_send_time', s['total_send_time'] + reduce(add, actions['send_times']) + +def send_message(state): + return lambda _g, step, sL, s, actions, kafkaConfig: ( + state, + { + 'users': update_users(s[state]['users'], actions), + 'messages': actions['messages'], + 'msg_counts': reduce(add, actions['msg_counts']), + 'send_times': reduce(add, actions['send_times']) + } + ) + +def current_time(state): + return lambda _g, step, sL, s, actions, kafkaConfig: (state, datetime.now()) + +sim_composition = [ + { + "policies": { + "b1": enter_action('server', 'room_1', 'A'), + "b2": enter_action('server', 'room_1', 'B') + }, + "variables": { + 'client_a': send_message('client_a'), + 'client_b': send_message('client_b'), + 'total_msg_count': count_messages, + 'total_send_time': add_send_time, + 'record_creation': current_time('record_creation') + } + }, + { + "policies": { + "b1": message_actions('client_A', 'room_1', "Hi B", 'A', 'B'), + "b2": message_actions('client_B', 'room_1', "Hi A", 'B', 'A') + }, + "variables": { + 'client_a': send_message('client_a'), + 'client_b': send_message('client_b'), + 'total_msg_count': count_messages, + 'total_send_time': add_send_time, + 'record_creation': current_time('record_creation') + } + }, + { + "policies": { + "b1": message_actions('client_A', 'room_1', "Bye B", 'A', 'B'), + "b2": message_actions('client_B', 'room_1', "Bye A", 'B', 'A') + }, + "variables": { + 'client_a': send_message('client_a'), + 'client_b': send_message('client_b'), + 'total_msg_count': count_messages, + 'total_send_time': add_send_time, + 'record_creation': current_time('record_creation') + } + }, + { + "policies": { + "b1": exit_action('server', 'room_1', 'A'), + "b2": exit_action('server', 'room_1', 'B') + }, + "variables": { + 'client_a': send_message('client_a'), + 'client_b': send_message('client_b'), + 'total_msg_count': count_messages, + 'total_send_time': add_send_time, + 'record_creation': current_time('record_creation') + } + } +] + +# N = 5 = 10000 / (500 x 4) +# T = 500 +sim_config = config_sim( + { + "N": 1, + "T": range(10), + # "T": range(5000), + } +) + +append_configs( + user_id='user_a', + sim_configs=sim_config, + initial_state=intitial_conditions, + partial_state_update_blocks=sim_composition, + policy_ops=[lambda a, b: a + b] +) + +exec_mode = ExecutionMode() + +print("Simulation Execution: Distributed Execution") +kafka_config = {'send_topic': 'test', 'producer_config': {'bootstrap_servers': 'localhost:9092', 'acks': 'all'}} + +def distributed_simulations( + simulation_execs, + var_dict_list, + states_lists, + configs_structs, + env_processes_list, + Ts, + Ns, + userIDs, + sessionIDs, + simulationIDs, + runIDs, + sc=sc, + kafkaConfig=kafka_config + ): + + func_params_zipped = list( + zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list) + ) + func_params_kv = [((t[0], t[1], t[2], t[3]), (t[4], t[5], t[6])) for t in func_params_zipped] + def simulate(k, v): + from kafka import KafkaProducer + prod_config = kafkaConfig['producer_config'] + kafkaConfig['producer'] = KafkaProducer(**prod_config) + (sim_exec, config, env_procs) = [f[1] for f in func_params_kv if f[0] == k][0] + results = sim_exec( + v['var_dict'], v['states_lists'], config, env_procs, v['Ts'], v['Ns'], + k[0], k[1], k[2], k[3], kafkaConfig + ) + + return results + + val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns)) + val_params_kv = [ + ( + (t[0], t[1], t[2], t[3]), + {'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]} + ) for t in val_params + ] + results_rdd = sc.parallelize(val_params_kv).coalesce(35) + + return list(results_rdd.map(lambda x: simulate(*x)).collect()) + +dist_proc_ctx = ExecutionContext( + context=exec_mode.dist_proc, method=distributed_simulations +) +run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc) + +i = 0 +for raw_result, tensor_field in run.execute(): + result = arrange_cols(pd.DataFrame(raw_result), False)[ + [ + 'user_id', 'session_id', 'simulation_id', 'run_id', 'timestep', 'substep', + 'record_creation', 'total_msg_count', 'total_send_time' + ] + ] + print() + if i == 0: + print(tabulate(tensor_field, headers='keys', tablefmt='psql')) + last = result.tail(1) + last['msg_per_sec'] = last['total_msg_count']/last['total_send_time'] + print("Output:") + print(tabulate(result.head(5), headers='keys', tablefmt='psql')) + print(tabulate(result.tail(5), headers='keys', tablefmt='psql')) + print(tabulate(last, headers='keys', tablefmt='psql')) + print() + i += 1 \ No newline at end of file diff --git a/simulations/distributed/messaging_test.py b/simulations/distributed/messaging_test.py new file mode 100644 index 0000000..e4150f5 --- /dev/null +++ b/simulations/distributed/messaging_test.py @@ -0,0 +1,28 @@ +import pandas as pd +from tabulate import tabulate + +from simulations.distributed.spark.session import spark_context as sc +from simulations.distributed import messaging + +from cadCAD.engine import ExecutionMode, ExecutionContext, Executor +from cadCAD.utils import arrange_cols +from cadCAD import configs + +exec_mode = ExecutionMode() + +print("Simulation Execution: Distributed Execution") +dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc) +run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc) +# pprint(dist_proc_ctx) + +# print(configs) +i = 0 +for raw_result, tensor_field in run.execute(): + result = arrange_cols(pd.DataFrame(raw_result), False) + print() + print(tabulate(tensor_field, headers='keys', tablefmt='psql')) + print("Output:") + print(tabulate(result.head(1), headers='keys', tablefmt='psql')) + print(tabulate(result.tail(1), headers='keys', tablefmt='psql')) + print() + i += 1