From 239cf94caed9b34b39f2f7e89199b4af2f18c5cd Mon Sep 17 00:00:00 2001 From: "Joshua E. Jodesty" Date: Tue, 1 Oct 2019 08:25:44 -0400 Subject: [PATCH] cleanup pt. 2: deleted more complicated messaging simulation --- simulations/distributed/config1.py | 165 ------------------- simulations/distributed/dist_prod_exec.py | 31 ---- simulations/distributed/messaging.py | 132 --------------- simulations/distributed/messaging_app.py | 86 +--------- simulations/distributed/messaging_test.py | 28 ---- simulations/distributed/multi_config_test.py | 36 ---- simulations/distributed/policies.py | 50 ++++++ simulations/distributed/state_updates.py | 40 +++++ 8 files changed, 92 insertions(+), 476 deletions(-) delete mode 100644 simulations/distributed/config1.py delete mode 100644 simulations/distributed/dist_prod_exec.py delete mode 100644 simulations/distributed/messaging.py delete mode 100644 simulations/distributed/messaging_test.py delete mode 100644 simulations/distributed/multi_config_test.py create mode 100644 simulations/distributed/policies.py create mode 100644 simulations/distributed/state_updates.py diff --git a/simulations/distributed/config1.py b/simulations/distributed/config1.py deleted file mode 100644 index 8bf81da..0000000 --- a/simulations/distributed/config1.py +++ /dev/null @@ -1,165 +0,0 @@ -import random -from copy import deepcopy -from datetime import timedelta, datetime - -from cadCAD.configuration import append_configs -from cadCAD.configuration.utils import bound_norm_random, config_sim, time_step, env_trigger -from cadCAD.utils.sys_config import update_timestamp - - -def choose_rnd(x: list, choices): - def choose(x, choices): - for n in list(range(choices)): - elem = random.choice(x) - x.remove(elem) - yield elem - - copied_list = deepcopy(x) - results = list(choose(copied_list, choices)) - del copied_list - return results - - -def message(sender, receiver, _input): - return {'sender': sender, 'receiver': receiver, 'input': _input, 'sent_time': datetime.now()} - -def enter_room_msgs(room, users): - return [message(user, room, f"{user} enters chat-room") for user in users] - -def exit_room_msgs(room, users): - return [message(user, room, f"{user} exited chat-room") for user in users] - -rooms = ['room_1', 'room_2'] -user_group_1 = ['A', 'B', 'C', 'D'] -user_group_2 = ['E', 'F', 'G', 'H'] -room_1_messages = enter_room_msgs('room_1', random.shuffle(user_group_1)) -room_2_messages = enter_room_msgs('room_2', random.shuffle(user_group_2)) -def intitialize_conditions(): - users = user_group_1 + user_group_2 - messages = sorted(room_1_messages + room_2_messages, key=lambda i: i['time']) - room_1_session = {'room': 'room_1', 'users': user_group_1, 'messages': room_1_messages} - return { - 'client_a': room_1_session, - 'client_b': room_1_session, - 'server': {'rooms': rooms, 'users': users, 'messages': messages}, - 'record_creation': datetime.now() - } - - -def send_message(room, sender, receiver, _input): - return lambda _g, step, sL, s: { - 'types': ['send'], - 'events': [ - { - 'type': 'send', - 'room': room, - 'user': sender, - 'sent': [message(sender, receiver, _input)] - } - ] - } - - -def exit_room(room, sender): - return lambda _g, step, sL, s: { - 'types': ['exit'], - 'events': [ - { - 'type': 'exit', - 'room': room, - 'user': sender, - 'sent': exit_room_msgs(sender, room) - } - ] - } - -# Policies per Mechanism -# ToDo Randomize client choices in runtime -[alpha, omega] = choose_rnd(user_group_1, 2) -a_msg1 = send_message('room_1', alpha, omega, f'Hello {omega}') -b_msg1 = send_message('room_1', omega, alpha, f'Hello {alpha}') - -a_msg2 = send_message('room_1', alpha, omega, f'Bye {omega}') -b_msg2 = send_message('room_1', omega, alpha, f'Bye {alpha}') - -a_msg3 = exit_room('room_1', alpha) -b_msg3 = exit_room('room_1', omega) - -def remove_exited_users(users, actions): - users = deepcopy(users) - if 'exit' in actions['types']: - for event in actions['events']: - if event['type'] == 'exit': - for user in event['user']: - users.remove(user) - return users - -# State Updates - -# {'room': 'room_1', 'users': user_group_1, 'messages': room_1_messages} -def process_messages(_g, step, sL, s, actions): - - return 'client', {'room': s['room'], 'users': users, 'messages': actions['sent']} - -def process_exits(_g, step, sL, s, actions): - users = remove_exited_users(s['users'], actions) - return 'server', {'rooms': s['room'], 'users': users, 'messages': actions['sent']} - - - -update_record_creation = update_timestamp( - 'record_creation', - timedelta(days=0, minutes=0, seconds=30), - '%Y-%m-%d %H:%M:%S' -) - -# partial_state_update_block = [ -# { -# "policies": { -# "b1": a_msg1, -# "b2": b_msg1 -# }, -# "variables": { -# "client_a": client_a_m1, -# "client_b": client_b_m1, -# "received": update_timestamp -# } -# }, -# { -# "policies": { -# "b1": a_msg2, -# "b2": b_msg2 -# }, -# "variables": { -# "s1": s1m2, -# "s2": s2m2 -# } -# }, -# { -# "policies": { -# "b1": a_msg3, -# "b2": b_msg3 -# }, -# "variables": { -# "s1": s1m3, -# "s2": s2m3 -# } -# } -# ] - - -sim_config = config_sim( - { - "N": 1, - "T": range(5), - } -) - -# append_configs( -# user_id='user_a', -# sim_configs=sim_config, -# initial_state=genesis_states, -# env_processes=env_processes, -# partial_state_update_blocks=partial_state_update_block, -# policy_ops=[lambda a, b: a + b] -# ) \ No newline at end of file diff --git a/simulations/distributed/dist_prod_exec.py b/simulations/distributed/dist_prod_exec.py deleted file mode 100644 index aaca497..0000000 --- a/simulations/distributed/dist_prod_exec.py +++ /dev/null @@ -1,31 +0,0 @@ -from pprint import pprint - -import pandas as pd -from tabulate import tabulate - -from simulations.distributed.spark.session import spark_context as sc -from simulations.regression_tests import config1, config2 - -from cadCAD.engine import ExecutionMode, ExecutionContext, Executor -from cadCAD.utils import arrange_cols -from cadCAD import configs - -exec_mode = ExecutionMode() - -print("Simulation Execution: Distributed Execution") -dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc) -run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc) -# pprint(dist_proc_ctx) - -# print(configs) -i = 0 -config_names = ['config1', 'config2'] -for raw_result, tensor_field in run.execute(): - result = arrange_cols(pd.DataFrame(raw_result), False) - print() - # print(f"Tensor Field: {config_names[i]}") - print(tabulate(tensor_field, headers='keys', tablefmt='psql')) - print("Output:") - print(tabulate(result, headers='keys', tablefmt='psql')) - print() - i += 1 \ No newline at end of file diff --git a/simulations/distributed/messaging.py b/simulations/distributed/messaging.py deleted file mode 100644 index a6082f4..0000000 --- a/simulations/distributed/messaging.py +++ /dev/null @@ -1,132 +0,0 @@ -from copy import deepcopy -from datetime import timedelta, datetime -import time - -from cadCAD.configuration import append_configs -from cadCAD.configuration.utils import config_sim - -# session = enters('room_1', ['A', 'B']) -intitial_conditions = { - 'record_creation': datetime.now(), - 'client_a': {'users': [], 'messages': [], 'avg_send_time': 1}, - 'client_b': {'users': [], 'messages': [], 'avg_send_time': 1} -} - - -# Actions -def message(client_id, room, action, _input, sender, receiver=None): - # start_time = datetime.now() - result = { - 'types': [action], - 'messages': [ - { - 'client': client_id, 'room': room, 'action': action, - 'sender': sender, 'receiver': receiver, - 'input': _input, - 'creatred': datetime.now() - } - ] - } - # datetime.now() - start_time - return result - -def enter_action(state, room, user): - return lambda _g, step, sL, s: message(state, room, 'enter', f"{user} enters {room}", user) - -def message_action(state, room, _input, sender, receiver): - return lambda _g, step, sL, s: message(state, room, 'send', _input, sender, receiver) - -def exit_action(state, room, user): - return lambda _g, step, sL, s: message(state, room, 'exit', f"{user} exited {room}", user) - -# State Updates -def update_users(users, actions, action_types=['send','enter','exit']): - users = deepcopy(users) - for action_type in action_types: - if action_type in actions['types']: - for msg in actions['messages']: - if msg['action'] == 'send' and action_type == 'send': - continue - elif msg['action'] == 'enter' and action_type == 'enter': - for user in msg['sender']: - users.append(user) # register_entered - elif msg['action'] == 'exit' and action_type == 'exit': - for user in msg['sender']: - users.remove(user) # remove_exited - return users - - -def send_message(state): - return lambda _g, step, sL, s, actions: ( - state, - { - 'users': update_users(s[state]['users'], actions), - 'messages': actions['messages'], 'avg_send_time': 1 - } - ) - -def current_time(state): - return lambda _g, step, sL, s, actions: (state, datetime.now()) - -sim_composition = [ - { - "policies": { - "b1": enter_action('server', 'room_1', 'A'), - "b2": enter_action('server', 'room_1', 'B') - }, - "variables": { - 'client_a': send_message('client_a'), - 'client_b': send_message('client_b'), - 'record_creation': current_time('record_creation') - } - }, - { - "policies": { - "b1": message_action('client_A', 'room_1', "Hi B", 'A', 'B'), - "b2": message_action('client_B', 'room_1', "Hi A", 'B', 'A') - }, - "variables": { - 'client_a': send_message('client_a'), - 'client_b': send_message('client_b'), - 'record_creation': current_time('record_creation') - } - }, - { - "policies": { - "b1": message_action('client_A', 'room_1', "Bye B", 'A', 'B'), - "b2": message_action('client_B', 'room_1', "Bye A", 'B', 'A') - }, - "variables": { - 'client_a': send_message('client_a'), - 'client_b': send_message('client_b'), - 'record_creation': current_time('record_creation') - } - }, - { - "policies": { - "b1": exit_action('server', 'room_1', 'A'), - "b2": exit_action('server', 'room_1', 'B') - }, - "variables": { - 'client_a': send_message('client_a'), - 'client_b': send_message('client_b'), - 'record_creation': current_time('record_creation') - } - } -] - -# 5 = 10000 / (500 x 4) -sim_config = config_sim( - { - "N": 5, - "T": range(500), - } -) - -append_configs( - user_id='user_a', - sim_configs=sim_config, - initial_state=intitial_conditions, - partial_state_update_blocks=sim_composition, - policy_ops=[lambda a, b: a + b] -) \ No newline at end of file diff --git a/simulations/distributed/messaging_app.py b/simulations/distributed/messaging_app.py index 5e7cb66..db54a3f 100644 --- a/simulations/distributed/messaging_app.py +++ b/simulations/distributed/messaging_app.py @@ -19,7 +19,9 @@ from pyspark.context import SparkContext from kafka import KafkaProducer from simulations.distributed.executor.spark.jobs import distributed_simulations +from simulations.distributed.policies import enter_action, message_actions, exit_action from simulations.distributed.spark.session import sc +from simulations.distributed.state_updates import send_message, count_messages, add_send_time, current_time def count(start, step): @@ -36,91 +38,7 @@ intitial_conditions = { 'total_send_time': 0.000000 } - -# Actions -def messages(client_id, room, action, _input, sender, receiver=None): - return { - 'types': [action], - 'messages': [ - { - 'client': client_id, 'room': room, 'action': action, - 'sender': sender, 'receiver': receiver, - 'input': _input, - 'creatred': datetime.now() - } - ] - } - - -def enter_action(state, room, user): - def f(_g, step, sL, s, kafkaConfig): - msgs = messages(state, room, 'enter', f"{user} enters {room}", user) - msgs['send_times'] = [0.000000] - msgs['msg_counts'] = [len(msgs['messages'])] - return msgs - return f - -def message_actions(state, room, _input, sender, receiver): - msgs = messages(state, room, 'send', _input, sender, receiver) - msgs_list = msgs['messages'] - def send_action(_g, step, sL, s, kafkaConfig): - start_time = datetime.now() - for msg in msgs_list: - producer: KafkaProducer = kafkaConfig['producer'] - topic: str = kafkaConfig['send_topic'] - encoded_msg = str(msg).encode('utf-8') - producer.send(topic, encoded_msg) - msgs['send_times'] = [(datetime.now() - start_time).total_seconds()] - msgs['msg_counts'] = [len(msgs_list)] - return msgs - - return send_action - -def exit_action(state, room, user): - def f(_g, step, sL, s, kafkaConfig): - msgs = messages(state, room, 'exit', f"{user} exited {room}", user) - msgs_list = msgs['messages'] - msgs['send_times'] = [0.000000] - msgs['msg_counts'] = [len(msgs_list)] - return msgs - return f - # State Updates -def update_users(users, actions, action_types=['send','enter','exit']): - users = deepcopy(users) - for action_type in action_types: - if action_type in actions['types']: - for msg in actions['messages']: - if msg['action'] == 'send' and action_type == 'send': - continue - elif msg['action'] == 'enter' and action_type == 'enter': - for user in msg['sender']: - users.append(user) # register_entered - elif msg['action'] == 'exit' and action_type == 'exit': - for user in msg['sender']: - users.remove(user) # remove_exited - return users - -add = lambda a, b: a + b -def count_messages(_g, step, sL, s, actions, kafkaConfig): - return 'total_msg_count', s['total_msg_count'] + reduce(add, actions['msg_counts']) - -def add_send_time(_g, step, sL, s, actions, kafkaConfig): - return 'total_send_time', s['total_send_time'] + reduce(add, actions['send_times']) - -def send_message(state): - return lambda _g, step, sL, s, actions, kafkaConfig: ( - state, - { - 'users': update_users(s[state]['users'], actions), - 'messages': actions['messages'], - 'msg_counts': reduce(add, actions['msg_counts']), - 'send_times': reduce(add, actions['send_times']) - } - ) - -def current_time(state): - return lambda _g, step, sL, s, actions, kafkaConfig: (state, datetime.now()) sim_composition = [ { diff --git a/simulations/distributed/messaging_test.py b/simulations/distributed/messaging_test.py deleted file mode 100644 index e4150f5..0000000 --- a/simulations/distributed/messaging_test.py +++ /dev/null @@ -1,28 +0,0 @@ -import pandas as pd -from tabulate import tabulate - -from simulations.distributed.spark.session import spark_context as sc -from simulations.distributed import messaging - -from cadCAD.engine import ExecutionMode, ExecutionContext, Executor -from cadCAD.utils import arrange_cols -from cadCAD import configs - -exec_mode = ExecutionMode() - -print("Simulation Execution: Distributed Execution") -dist_proc_ctx = ExecutionContext(context=exec_mode.dist_proc) -run = Executor(exec_context=dist_proc_ctx, configs=configs, spark_context=sc) -# pprint(dist_proc_ctx) - -# print(configs) -i = 0 -for raw_result, tensor_field in run.execute(): - result = arrange_cols(pd.DataFrame(raw_result), False) - print() - print(tabulate(tensor_field, headers='keys', tablefmt='psql')) - print("Output:") - print(tabulate(result.head(1), headers='keys', tablefmt='psql')) - print(tabulate(result.tail(1), headers='keys', tablefmt='psql')) - print() - i += 1 diff --git a/simulations/distributed/multi_config_test.py b/simulations/distributed/multi_config_test.py deleted file mode 100644 index 8468102..0000000 --- a/simulations/distributed/multi_config_test.py +++ /dev/null @@ -1,36 +0,0 @@ -import pandas as pd -from tabulate import tabulate -# The following imports NEED to be in the exact order -from cadCAD.engine import ExecutionMode, ExecutionContext, Executor -from cadCAD.utils import arrange_cols -from simulations.regression_tests import config1, config2 -from cadCAD import configs - -from simulations.distributed.spark.session import spark - -exec_mode = ExecutionMode() - -print("Simulation Execution: Concurrent Execution") -multi_proc_ctx = ExecutionContext(context=exec_mode.multi_proc) -run = Executor(exec_context=multi_proc_ctx, configs=configs) - -# print(configs) -tf = None -i = 0 -config_names = ['config1', 'config2'] -for raw_result, tensor_field in run.execute(): - result = arrange_cols(pd.DataFrame(raw_result), False) - print() - # print(f"Tensor Field: {config_names[i]}") - tf = tensor_field - # print(tabulate(tensor_field, headers='keys', tablefmt='psql')) - print("Output:") - # print(tabulate(result, headers='keys', tablefmt='psql')) - print() - i += 1 - -spark.conf.set("spark.sql.execution.arrow.enabled", "true") - -df = spark.createDataFrame(tf) - -df.show() \ No newline at end of file diff --git a/simulations/distributed/policies.py b/simulations/distributed/policies.py new file mode 100644 index 0000000..2c90c98 --- /dev/null +++ b/simulations/distributed/policies.py @@ -0,0 +1,50 @@ +from datetime import datetime +from kafka import KafkaProducer + +# Actions +def messages(client_id, room, action, _input, sender, receiver=None): + return { + 'types': [action], + 'messages': [ + { + 'client': client_id, 'room': room, 'action': action, + 'sender': sender, 'receiver': receiver, + 'input': _input, + 'creatred': datetime.now() + } + ] + } + + +def enter_action(state, room, user): + def f(_g, step, sL, s, kafkaConfig): + msgs = messages(state, room, 'enter', f"{user} enters {room}", user) + msgs['send_times'] = [0.000000] + msgs['msg_counts'] = [len(msgs['messages'])] + return msgs + return f + +def message_actions(state, room, _input, sender, receiver): + msgs = messages(state, room, 'send', _input, sender, receiver) + msgs_list = msgs['messages'] + def send_action(_g, step, sL, s, kafkaConfig): + start_time = datetime.now() + for msg in msgs_list: + producer: KafkaProducer = kafkaConfig['producer'] + topic: str = kafkaConfig['send_topic'] + encoded_msg = str(msg).encode('utf-8') + producer.send(topic, encoded_msg) + msgs['send_times'] = [(datetime.now() - start_time).total_seconds()] + msgs['msg_counts'] = [len(msgs_list)] + return msgs + + return send_action + +def exit_action(state, room, user): + def f(_g, step, sL, s, kafkaConfig): + msgs = messages(state, room, 'exit', f"{user} exited {room}", user) + msgs_list = msgs['messages'] + msgs['send_times'] = [0.000000] + msgs['msg_counts'] = [len(msgs_list)] + return msgs + return f \ No newline at end of file diff --git a/simulations/distributed/state_updates.py b/simulations/distributed/state_updates.py new file mode 100644 index 0000000..0e71932 --- /dev/null +++ b/simulations/distributed/state_updates.py @@ -0,0 +1,40 @@ +from copy import deepcopy +from datetime import datetime +from functools import reduce + +# State Updates +def update_users(users, actions, action_types=['send','enter','exit']): + users = deepcopy(users) + for action_type in action_types: + if action_type in actions['types']: + for msg in actions['messages']: + if msg['action'] == 'send' and action_type == 'send': + continue + elif msg['action'] == 'enter' and action_type == 'enter': + for user in msg['sender']: + users.append(user) # register_entered + elif msg['action'] == 'exit' and action_type == 'exit': + for user in msg['sender']: + users.remove(user) # remove_exited + return users + +add = lambda a, b: a + b +def count_messages(_g, step, sL, s, actions, kafkaConfig): + return 'total_msg_count', s['total_msg_count'] + reduce(add, actions['msg_counts']) + +def add_send_time(_g, step, sL, s, actions, kafkaConfig): + return 'total_send_time', s['total_send_time'] + reduce(add, actions['send_times']) + +def send_message(state): + return lambda _g, step, sL, s, actions, kafkaConfig: ( + state, + { + 'users': update_users(s[state]['users'], actions), + 'messages': actions['messages'], + 'msg_counts': reduce(add, actions['msg_counts']), + 'send_times': reduce(add, actions['send_times']) + } + ) + +def current_time(state): + return lambda _g, step, sL, s, actions, kafkaConfig: (state, datetime.now()) \ No newline at end of file