Community_Inclusion_Currencies/Simulation_param/model/parts/supportingFunctions.py

442 lines
16 KiB
Python

import numpy as np
from scipy.stats import gamma
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
default_kappa= 4
default_exit_tax = .02
#value function for a given state (R,S)
def invariant(R,S,kappa=default_kappa):
return (S**kappa)/R
#given a value function (parameterized by kappa)
#and an invariant coeficient V0
#return Supply S as a function of reserve R
def reserve(S, V0, kappa=default_kappa):
return (S**kappa)/V0
#given a value function (parameterized by kappa)
#and an invariant coeficient V0
#return Supply S as a function of reserve R
def supply(R, V0, kappa=default_kappa):
return (V0*R)**(1/kappa)
#given a value function (parameterized by kappa)
#and an invariant coeficient V0
#return a spot price P as a function of reserve R
def spot_price(R, V0, kappa=default_kappa):
return kappa*R**((kappa-1)/kappa)/V0**(1/kappa)
#for a given state (R,S)
#given a value function (parameterized by kappa)
#and an invariant coeficient V0
#deposit deltaR to Mint deltaS
#with realized price deltaR/deltaS
def mint(deltaR, R,S, V0, kappa=default_kappa):
deltaS = (V0*(R+deltaR))**(1/kappa)-S
if deltaS ==0:
realized_price = spot_price(R+deltaR, V0, kappa)
else:
realized_price = deltaR/deltaS
deltaS = round(deltaS,2)
return deltaS, realized_price
#for a given state (R,S)
#given a value function (parameterized by kappa)
#and an invariant coeficient V0
#burn deltaS to Withdraw deltaR
#with realized price deltaR/deltaS
def withdraw(deltaS, R,S, V0, kappa=default_kappa):
deltaR = R-((S-deltaS)**kappa)/V0
if deltaS ==0:
realized_price = spot_price(R+deltaR, V0, kappa)
else:
realized_price = deltaR/deltaS
deltaR = round(deltaR,2)
return deltaR, realized_price
def iterateEdges(network,edgeToIterate):
'''
Description:
Iterate through a network on a weighted edge and return
two dictionaries: the inflow and outflow for the given agents
in the format:
{'Agent':amount}
'''
outflows = {}
inflows = {}
for i,j in network.edges:
try:
amount = network[i][j][edgeToIterate]
if i in outflows:
outflows[i] = outflows[i] + amount
else:
outflows[i] = amount
if j in inflows:
inflows[j] = inflows[j] + amount
else:
inflows[j] = amount
except:
pass
return outflows,inflows
def inflowAndOutflowDictionaryMerge(inflow,outflow):
'''
Description:
Merge two dictionaries and return one dictionary with zero floor'''
merged = {}
inflowsKeys = [k for k,v in inflow.items() if k not in outflow]
for i in inflowsKeys:
merged[i] = inflow[i]
outflowsKeys = [k for k,v in outflow.items() if k not in inflow]
for i in outflowsKeys:
merged[i] = outflow[i]
overlapKeys = [k for k,v in inflow.items() if k in outflow]
for i in overlapKeys:
amt = outflow[i] - inflow[i]
if amt < 0:
merged[i] = 0
else:
merged[i] = amt
pass
return merged
def spendCalculation(agentToPay,agentToReceive,rankOrderDemand,maxSpendCurrency,maxSpendTokens,cicPercentage):
'''
Function to calculate if an agent can pay for demand given token and currency contraints
'''
if (rankOrderDemand[agentToReceive] * (1-cicPercentage)) > maxSpendCurrency[agentToPay]:
verdict_currency = 'No'
else:
verdict_currency = 'Enough'
if (rankOrderDemand[agentToReceive] * cicPercentage) > maxSpendTokens[agentToPay]:
verdict_cic = 'No'
else:
verdict_cic = 'Enough'
if verdict_currency == 'Enough'and verdict_cic == 'Enough':
spend = rankOrderDemand[agentToReceive]
elif maxSpendCurrency[agentToPay] > 0:
spend = maxSpendCurrency[agentToPay]
else:
spend = 0
return spend
def spendCalculationExternal(agentToPay,agentToReceive,rankOrderDemand,maxSpendCurrency):
'''
'''
if rankOrderDemand[agentToReceive] > maxSpendCurrency[agentToPay]:
verdict_currency = 'No'
else:
verdict_currency = 'Enough'
if verdict_currency == 'Enough':
spend = rankOrderDemand[agentToReceive]
elif maxSpendCurrency[agentToPay] > 0:
spend = maxSpendCurrency[agentToPay]
else:
spend = 0
return spend
def DictionaryMergeAddition(inflow,outflow):
'''
Description:
Merge two dictionaries and return one dictionary'''
merged = {}
inflowsKeys = [k for k,v in inflow.items() if k not in outflow]
for i in inflowsKeys:
merged[i] = inflow[i]
outflowsKeys = [k for k,v in outflow.items() if k not in inflow]
for i in outflowsKeys:
merged[i] = outflow[i]
overlapKeys = [k for k,v in inflow.items() if k in outflow]
for i in overlapKeys:
merged[i] = outflow[i] + inflow[i]
return merged
def mint_burn_logic_control(ideal,actual,variance,fiat,fiat_variance,ideal_fiat):
'''
Inventory control function to test if the current balance is in an acceptable range. Tolerance range
'''
if ideal - variance <= actual <= ideal + (2*variance):
decision = 'none'
amount = 0
else:
if (ideal + variance) > actual:
decision = 'mint'
amount = (ideal + variance) - actual
else:
pass
if actual > (ideal + variance):
decision = 'burn'
amount = actual - (ideal + variance)
else:
pass
if decision == 'mint':
if fiat < (ideal_fiat - fiat_variance):
if amount > fiat:
decision = 'none'
amount = 0
else:
pass
if decision == 'none':
if fiat < (ideal_fiat - fiat_variance):
decision = 'mint'
amount = (ideal_fiat-fiat_variance)
else:
pass
amount = round(amount,2)
return decision, amount
#NetworkX functions
def get_nodes_by_type(g, node_type_selection):
return [node for node in g.nodes if g.nodes[node]['type']== node_type_selection]
def get_edges_by_type(g, edge_type_selection):
return [edge for edge in g.edges if g.edges[edge]['type']== edge_type_selection]
def get_edges(g):
return [edge for edge in g.edges if g.edges[edge]]
def get_nodes(g):
'''
df.network.apply(lambda g: np.array([g.nodes[j]['balls'] for j in get_nodes(g)]))
'''
return [node for node in g.nodes if g.nodes[node]]
def aggregate_runs(df,aggregate_dimension):
'''
Function to aggregate the monte carlo runs along a single dimension.
Parameters:
df: dataframe name
aggregate_dimension: the dimension you would like to aggregate on, the standard one is timestep.
Example run:
mean_df,median_df,std_df,min_df = aggregate_runs(df,'timestep')
'''
df = df[df['substep'] == df.substep.max()]
mean_df = df.groupby(aggregate_dimension).mean().reset_index()
median_df = df.groupby(aggregate_dimension).median().reset_index()
std_df = df.groupby(aggregate_dimension).std().reset_index()
min_df = df.groupby(aggregate_dimension).min().reset_index()
return mean_df,median_df,std_df,min_df
def plot_averaged_runs(df,aggregate_dimension,x, y,run_count,lx=False,ly=False, suppMin=False):
'''
Function to plot the mean, median, etc of the monte carlo runs along a single variable.
Parameters:
df: dataframe name
aggregate_dimension: the dimension you would like to aggregate on, the standard one is timestep.
x = x axis variable for plotting
y = y axis variable for plotting
run_count = the number of monte carlo simulations
lx = True/False for if the x axis should be logged
ly = True/False for if the x axis should be logged
suppMin: True/False for if the miniumum value should be plotted
Note: Run aggregate_runs before using this function
Example run:
'''
mean_df,median_df,std_df,min_df = aggregate_runs(df,aggregate_dimension)
plt.figure(figsize=(10,6))
if not(suppMin):
plt.plot(mean_df[x].values, mean_df[y].values,
mean_df[x].values,median_df[y].values,
mean_df[x].values,mean_df[y].values+std_df[y].values,
mean_df[x].values,min_df[y].values)
plt.legend(['mean', 'median', 'mean+ 1*std', 'min'],bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
else:
plt.plot(mean_df[x].values, mean_df[y].values,
mean_df[x].values,median_df[y].values,
mean_df[x].values,mean_df[y].values+std_df[y].values,
mean_df[x].values,mean_df[y].values-std_df[y].values)
plt.legend(['mean', 'median', 'mean+ 1*std', 'mean - 1*std'],bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
plt.xlabel(x)
plt.ylabel(y)
title_text = 'Performance of ' + y + ' over all of ' + str(run_count) + ' Monte Carlo runs'
plt.title(title_text)
if lx:
plt.xscale('log')
if ly:
plt.yscale('log')
def plot_median_with_quantiles(df,aggregate_dimension,x, y):
'''
Function to plot the median and 1st and 3rd quartiles of the monte carlo runs along a single variable.
Parameters:
df: dataframe name
aggregate_dimension: the dimension you would like to aggregate on, the standard one is timestep.
x = x axis variable for plotting
y = y axis variable for plotting
Example run:
plot_median_with_quantiles(df,'timestep','timestep','AggregatedAgentSpend')
'''
df = df[df['substep'] == df.substep.max()]
firstQuantile = df.groupby(aggregate_dimension).quantile(0.25).reset_index()
thirdQuantile = df.groupby(aggregate_dimension).quantile(0.75).reset_index()
median_df = df.groupby(aggregate_dimension).median().reset_index()
fig, ax = plt.subplots(1,figsize=(10,6))
ax.plot(median_df[x].values, median_df[y].values, lw=2, label='Median', color='blue')
ax.fill_between(firstQuantile[x].values, firstQuantile[y].values, thirdQuantile[y].values, facecolor='black', alpha=0.2)
ax.set_title(y + ' Median')
ax.legend(loc='upper left')
ax.set_xlabel('Timestep')
ax.set_ylabel('Amount')
ax.grid()
def plot_median_with_quantiles_annotation(df,aggregate_dimension,x, y):
'''
Function to plot the median and 1st and 3rd quartiles of the monte carlo runs along a single variable.
Parameters:
df: dataframe name
aggregate_dimension: the dimension you would like to aggregate on, the standard one is timestep.
x = x axis variable for plotting
y = y axis variable for plotting
Example run:
plot_median_with_quantiles(df,'timestep','timestep','AggregatedAgentSpend')
'''
df = df[df['substep'] == df.substep.max()]
firstQuantile = df.groupby(aggregate_dimension).quantile(0.25).reset_index()
thirdQuantile = df.groupby(aggregate_dimension).quantile(0.75).reset_index()
median_df = df.groupby(aggregate_dimension).median().reset_index()
fig, ax = plt.subplots(1,figsize=(10,6))
ax.axvline(x=30,linewidth=2, color='r')
ax.annotate('Agents can withdraw and Red Cross Drip occurs', xy=(30,2), xytext=(35, 1),
arrowprops=dict(facecolor='black', shrink=0.05))
ax.axvline(x=60,linewidth=2, color='r')
ax.axvline(x=90,linewidth=2, color='r')
ax.plot(median_df[x].values, median_df[y].values, lw=2, label='Median', color='blue')
ax.fill_between(firstQuantile[x].values, firstQuantile[y].values, thirdQuantile[y].values, facecolor='black', alpha=0.2)
ax.set_title(y + ' Median')
ax.legend(loc='upper left')
ax.set_xlabel('Timestep')
ax.set_ylabel('Amount')
ax.grid()
def first_five_plot(df,aggregate_dimension,x,y,run_count):
'''
A function that generates timeseries plot of at most the first five Monte Carlo runs.
Parameters:
df: dataframe name
aggregate_dimension: the dimension you would like to aggregate on, the standard one is timestep.
x = x axis variable for plotting
y = y axis variable for plotting
run_count = the number of monte carlo simulations
Note: Run aggregate_runs before using this function
Example run:
first_five_plot(df,'timestep','timestep','revenue',run_count=100)
'''
mean_df,median_df,std_df,min_df = aggregate_runs(df,aggregate_dimension)
plt.figure(figsize=(10,6))
if run_count < 5:
runs = run_count
else:
runs = 5
for r in range(1,runs+1):
legend_name = 'Run ' + str(r)
plt.plot(df[df.run==r].timestep, df[df.run==r][y], label = legend_name )
plt.plot(mean_df[x], mean_df[y], label = 'Mean', color = 'black')
plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
plt.xlabel(x)
plt.ylabel(y)
title_text = 'Performance of ' + y + ' over the First ' + str(runs) + ' Monte Carlo Runs'
plt.title(title_text)
#plt.savefig(y +'_FirstFiveRuns.jpeg')
def aggregate_runs_param_mc(df,aggregate_dimension):
'''
Function to aggregate the monte carlo runs along a single dimension.
Parameters:
df: dataframe name
aggregate_dimension: the dimension you would like to aggregate on, the standard one is timestep.
Example run:
mean_df,median_df,std_df,min_df = aggregate_runs(df,'timestep')
'''
df = df[df['substep'] == df.substep.max()]
mean_df = df.groupby(aggregate_dimension).mean().reset_index()
median_df = df.groupby(aggregate_dimension).median().reset_index()
#min_df = df.groupby(aggregate_dimension).min().reset_index()
#max_df = df.groupby(aggregate_dimension).max().reset_index()
return mean_df,median_df
def param_dfs(results,params,swept):
mean_df,median_df = aggregate_runs_param_mc(results[0]['result'],'timestep')
mean_df[swept] = params[0]
median_df[swept] = params[0]
#max_df[swept] = params[0]
#min_df[swept] = params[0]
for i in range(1,len(params)):
mean_df_intermediate,median_df_intermediate = aggregate_runs_param_mc(results[i]['result'],'timestep')
mean_df_intermediate[swept] = params[i]
median_df_intermediate[swept] = params[i]
#max_df_intermediate[swept] = params[i]
#min_df_intermediate[swept] = params[i]
mean_df= pd.concat([mean_df, mean_df_intermediate])
median_df= pd.concat([median_df, median_df_intermediate])
#max_df= pd.concat([max_df, max_df_intermediate])
#min_df= pd.concat([min_df, min_df_intermediate])
return mean_df,median_df
def param_plot(results,state_var_x, state_var_y, parameter, save_plot = False,**kwargs):
'''
Results (df) is the dataframe (concatenated list of results dictionaries)
length = intreger, number of parameter values
Enter state variable name as a string for x and y. Enter the swept parameter name as a string.
y_label kwarg for custom y-label and title reference
x_label kwarg for custom x-axis label
'''
sns.scatterplot(x=state_var_x, y = state_var_y, hue = parameter, style= parameter, palette = 'coolwarm',alpha=1, data = results, legend="full")
title_text = 'Effect of ' + parameter + ' Parameter Sweep on ' + state_var_y
for key, value in kwargs.items():
if key == 'y_label':
plt.ylabel(value)
title_text = 'Effect of ' + parameter + ' Parameter Sweep on ' + value
if key == 'x_label':
plt.xlabel(value)
plt.title(title_text)
if save_plot == True:
filename = state_var_y + state_var_x + parameter + 'plot.png'
# # plt.savefig('static/images/' + filename)
# plt.savefig(filename)
lgd = plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
#title_text = 'Market Volatility versus Normalized Liquid Token Supply for All Runs'
plt.title(title_text)
plt.savefig('static/images/' + filename, bbox_extra_artists=(lgd,), bbox_inches='tight')