multithreading mech_pipeline again, block_pipeline later

This commit is contained in:
Joshua E. Jodesty 2018-12-11 01:07:23 -05:00
parent 181b7cf986
commit a0266641f7
2 changed files with 28 additions and 17 deletions

View File

@ -1,6 +1,7 @@
from pathos.threading import ThreadPool from pathos.threading import ThreadPool
from copy import deepcopy from copy import deepcopy
from fn.op import foldr, call from fn.op import foldr, call
import numpy as np
import pprint import pprint
pp = pprint.PrettyPrinter(indent=4) pp = pprint.PrettyPrinter(indent=4)
@ -72,9 +73,7 @@ class Executor:
# make env proc trigger field agnostic # make env proc trigger field agnostic
self.apply_env_proc(env_processes, last_in_copy, last_in_copy['timestamp']) # mutating last_in_copy self.apply_env_proc(env_processes, last_in_copy, last_in_copy['timestamp']) # mutating last_in_copy
# print()
# pp.pprint(last_in_copy)
# exit()
def set_sys_metrics(m_step, t_step, run): def set_sys_metrics(m_step, t_step, run):
last_in_copy["mech_step"], last_in_copy["time_step"], last_in_copy['run'] = m_step, t_step, run last_in_copy["mech_step"], last_in_copy["time_step"], last_in_copy['run'] = m_step, t_step, run
@ -90,12 +89,9 @@ class Executor:
del last_in_copy del last_in_copy
# print()
# pp.pprint(sL)
# exit()
return sL return sL
def mech_pipeline(self, states_list, configs, env_processes, t_step, run): def mech_pipeline(self, states_list, configs, env_processes, t_step, run):
m_step = 0 m_step = 0
states_list_copy = deepcopy(states_list) states_list_copy = deepcopy(states_list)
@ -110,20 +106,27 @@ class Executor:
for config in configs: for config in configs:
s_conf, b_conf = config[0], config[1] s_conf, b_conf = config[0], config[1]
last_states = states_list[-1] last_states = states_list[-1]
dropped_right_sL = drop_right(states_list, 1)
print()
# print(states_list)
if isinstance(last_states, list): if isinstance(last_states, list):
x = list(map(lambda last_state_dict: dropped_right_sL.append(last_state_dict), last_states)) pool = ThreadPool(nodes=len(last_states)) # ToDo: Optimize
print(x) dropped_right_sL = drop_right(states_list, 1)
def multithreaded_mech_step(mod_states_list):
return self.mech_step(m_step, mod_states_list, s_conf, b_conf, env_processes, t_step, run)
states_lists = pool.map(
lambda last_state_dict: dropped_right_sL + [last_state_dict],
last_states
)
print()
pp.pprint(configs)
else:
states_lists = self.mech_step(m_step, states_list, s_conf, b_conf, env_processes, t_step, run)
# states_list = self.mech_step(m_step, states_list, s_conf, b_conf, env_processes, t_step, run)
m_step += 1 m_step += 1
t_step += 1 t_step += 1
print()
# print(states_list)
exit() exit()
return states_list return states_list
@ -132,9 +135,17 @@ class Executor:
def block_pipeline(self, states_list, configs, env_processes, time_seq, run): def block_pipeline(self, states_list, configs, env_processes, time_seq, run):
time_seq = [x + 1 for x in time_seq] time_seq = [x + 1 for x in time_seq]
simulation_list = [states_list] simulation_list = [states_list]
print(len(configs))
for time_step in time_seq: for time_step in time_seq:
pipe_run = self.mech_pipeline(simulation_list[-1], configs, env_processes, time_step, run) # print(simulation_list)
if len(simulation_list) == 1:
pipe_run = self.mech_pipeline(simulation_list[-1], configs, env_processes, time_step, run)
exit()
# elif np.array(pipe_run[-1]) == 2:
# pipe_run = self.mech_pipeline(simulation_list[-1], configs, env_processes, time_step, run)
# print(pipe_run)
_, *pipe_run = pipe_run _, *pipe_run = pipe_run
# print(pipe_run)
simulation_list.append(pipe_run) simulation_list.append(pipe_run)
return simulation_list return simulation_list

View File

@ -33,7 +33,7 @@ def flatten(l):
return flattenDict(l) return flattenDict(l)
def drop_right(l, n=1): def drop_right(l, n):
return l[:len(l)-n] return l[:len(l)-n]
# def flatmap(f, items): # def flatmap(f, items):