diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 3f46a3f..6093e6f 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -79,11 +79,49 @@ def distributed_simulations( runIDs: List[int], sc: SparkContext = None ): + # params = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, + # userIDs, sessionIDs, simulationIDs, runIDs)) + # lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], t[10]) + func_params = list( + zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list) + ) + + func_params_dict = dict( + [((t[0], t[1], t[2], t[3]), {'sim_exec': t[4], 'config': t[5], 'env_procs': t[6]}) for t in func_params] + ) + + pprint(func_params_dict) + + val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns)) + val_params_list = [ + ( + {'user_id': t[0], 'session_id': t[1], 'sim_id': t[2], 'run_id': t[3]}, + {'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]} + ) for t in val_params + ] + + # pprint(val_params_dict) + def simulate(k, v): + (sim_exec, config, env_procs) = func_params_dict(k) + results = sim_exec( + v['var_dict'], v['states_lists'], config, env_procs, v['Ts'], v['Ns'], + k['user_id'], k['session_id'], k['sim_id'], k['run_id'] + ) + + return results + + + vals_rdd = sc.parallelize(val_params_list).map(lambda x: simulate(x[0], x[1])) #.map(lambda x: tuple(x[0].values())) + pprint(vals_rdd.take(1)) + # vals_rdd.foreach(print) + # .map( + # lambda t: {(t[0], t[1], t[2], t[3]): {'var_dict': t[4], 'states_lists': t[5], 'Ts': t[6], 'Ns': t[7]}} + # ) + # params = list(zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, Ns, # userIDs, sessionIDs, simulationIDs, runIDs)) # simulation_execs, configs_structs, env_processes_list - func_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, simulation_execs, configs_structs, env_processes_list)) # val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns)) # vals_rdd = sc.parallelize(val_params).map( # lambda t: {'user_id': t[0], 'session_id': t[1], 'simulation_id': t[2], 'run_id': t[3], @@ -105,16 +143,13 @@ def distributed_simulations( # func_params # ))) - func_keys = [(t[0], t[1], t[2], t[3]) for t in func_params] - func_values = [(t[4], t[5], t[6], t[7]) for t in func_params] + # func_keys = [(t[0], t[1], t[2], t[3]) for t in func_params] + # func_values = [(t[4], t[5], t[6], t[7]) for t in func_params] - d = {} - for key in func_keys: - d[key] = # func_params_dicts = dict([{(t[0], t[1], t[2], t[3]): {'sim_exec': t[4], 'configs': t[5], 'env_proc': t[6]}} for t in func_params]) - pprint(func_params_dicts) + # pprint(func_params_dicts) # val_params = list(zip(userIDs, sessionIDs, simulationIDs, runIDs, var_dict_list, states_lists, Ts, Ns)) # vals_rdd = sc.parallelize(val_params).map(