Norvig Economy Simulation
import random | |
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import networkx as nx | |
from string import ascii_lowercase, digits | |
COLW = 10 | |
chars = ascii_lowercase + digits | |
class Economy(object): | |
plt.style.use('bmh') | |
def __init__(self, population, corporations): | |
self.ids = list() | |
# Population starting wealth | |
self.population = {self.id_gen(): w for w in population} | |
# Corporations start with 0 units | |
self.corps = {self.id_gen(): 0 for _ in range(corporations)} | |
# Set up multi-directional network | |
self.G = nx.MultiDiGraph() | |
# Add population nodes | |
self.G.add_nodes_from(self.population.keys(), bipartite=0) | |
self.G.add_nodes_from(self.population.keys(), bipartite=1) | |
# Randomly assign actors to work for each company | |
for cid in self.corps.keys(): | |
# Equal number of people work for each company | |
eids = random.sample(self.population.keys(), int(len(self.population)/len(self.corps))) | |
for eid in eids: | |
self.G.add_edge(cid, eid) | |
self.n = len(self.population) | |
self.stages = [[self.population.copy(), self.corps.copy()]] | |
self.fig, self.axes = plt.subplots(nrows=2, ncols=2, figsize=(9, 4)) | |
self.stats = list() | |
def id_gen(self): | |
eid = '' | |
# Guarantee unique | |
while len(eid) == 0: | |
eid = ''.join(random.sample(chars, 5)) | |
if eid in self.ids: | |
eid = '' | |
self.ids.append(eid) | |
return eid | |
def transaction(self, a, c): | |
# Actor spends a random quantity of their wealth with the company | |
w = self.population[a] | |
r = np.random.uniform(0, self.population[a]) | |
# Add edge to network between participants, weighted by transaction size | |
self.G.add_edge(a, c, weight=r) | |
return w - r, self.corps[c] + r | |
def distribute(self, cid, revenue, pop): | |
while revenue > 1: | |
for eid in self.G[cid].keys(): | |
# pay = revenue / employees | |
pay = np.random.uniform(0, revenue) | |
# Give rounding error to this employee: | |
pay = revenue if pay < 1 else pay | |
pop[0][eid] += pay | |
pop[1][cid] -= pay | |
revenue -= pay | |
self.G.add_edge(cid, eid, weight=pay) | |
return pop | |
@staticmethod | |
def gini(y): | |
"""Calculate the Gini coefficient of a numpy array. | |
based on bottom eq: http://www.statsdirect.com/help/content/image/stat0206_wmf.gif | |
from: http://www.statsdirect.com/help/default.htm#nonparametric_methods/gini.htm | |
:param y: 1-dimensional numpy array, input series | |
:return: int, Gini coefficient | |
""" | |
y = sorted(y) | |
n = len(y) | |
numer = 2 * sum((i + 1) * y[i] for i in range(n)) | |
denom = n * sum(y) | |
return (numer / denom) - (n + 1) / n | |
@staticmethod | |
def auto_bin(y, m=5): | |
""" Modified Freedman-Diaconis rule to optimize # of histogram bins | |
:param narray: numpy array | |
:param m: int, optional; minimum number of bins to return | |
:return: int, rounded number of optimal bins for histogram of sample | |
""" | |
narray = np.array(y) | |
iqr = np.subtract(*np.percentile(narray, [75, 25])) | |
return max(m, int(2 * iqr * narray.size ** (-1/3))) | |
def simulate(self, generations, events): | |
print(" ".join(self.cols)) | |
print(" ".join(["-" * len(x) for x in self.cols])) | |
print(self.stat_row(0, 0)) | |
for g in range(1, generations + 1): | |
pop = [self.population.copy(), self.corps.copy()] | |
for _ in range(events): | |
a, c = random.sample(pop[0].keys(), 1)[0], random.sample(pop[1].keys(), 1)[0] | |
pop[0][a], pop[1][c] = self.transaction(a=a, c=c) | |
for cid, revenue in pop[1].items(): | |
pop = self.distribute(cid, revenue, pop) | |
self.stages.append(pop) | |
x = self.stat_row(g, events * g) | |
if g % (generations * 0.05) == 0: | |
print(x) | |
self.plot(generations, events) | |
def plot(self, g, e): | |
v_1 = [*self.stages[0][0].values()] | |
v_n = [*self.stages[-1][0].values()] | |
# Starting population | |
self.axes[0][0].hist(v_1, edgecolor=(0, 0, .14), | |
bins=self.auto_bin(v_1), | |
label="Starting population (n={:,.0f})".format(self.n), | |
range=(0, max(v_n))) | |
self.axes[0][0].set_title("Population Wealth") | |
self.axes[0][0].set_xlabel("Wealth") | |
# Ending population | |
self.axes[0][0].hist(v_n, color=(.7, 0, 0, 0.5), edgecolor=(.14, 0, 0), | |
bins=self.auto_bin(v_n), | |
label="Ending population (generation={:,.0f}; transactions={:,.0f})".format(g, g * e), | |
range=(0, max(v_n))) | |
# Gini index | |
self.summary()[self.cols[3]].plot(ax=self.axes[0][1]) | |
self.axes[0][1].set_title("Wealth Inequality", fontsize=10) | |
self.axes[0][1].set_ylabel("Gini Coefficient") | |
self.axes[0][1].set_xlabel("Generation") | |
plt.show() | |
def summary(self): | |
return pd.DataFrame(data=self.stats, columns=self.cols) | |
cols = [x + " " * max(0, COLW - len(x)) for x in \ | |
["generation", "cum events", "n", "G", "median", "stdev", "min", "25%", "50%", "90%", "99%", "max"]] | |
def stat_row(self, i, j): | |
""" Store and return summary statistics for most recent generation """ | |
p = [*self.stages[-1][0].values()] | |
self.stats.append([i, j, len(p), self.gini(p), np.median(p), np.std(p), | |
min(p), np.percentile(p, 25), np.percentile(p, 50), | |
np.percentile(p, 90), np.percentile(p, 99), max(p)]) | |
row = ["{:.2f}".format(x) for x in self.stats[-1]] | |
return " ".join([" " * max(0, COLW - len(x)) + x for x in row]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment