Instantly share code, notes, and snippets.

@rgkimball /norvig.py
Last active Feb 26, 2018

Embed
What would you like to do?
Norvig Economy Simulation
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import networkx as nx
from string import ascii_lowercase, digits
COLW = 10
chars = ascii_lowercase + digits
class Economy(object):
plt.style.use('bmh')
def __init__(self, population, corporations):
self.ids = list()
# Population starting wealth
self.population = {self.id_gen(): w for w in population}
# Corporations start with 0 units
self.corps = {self.id_gen(): 0 for _ in range(corporations)}
# Set up multi-directional network
self.G = nx.MultiDiGraph()
# Add population nodes
self.G.add_nodes_from(self.population.keys(), bipartite=0)
self.G.add_nodes_from(self.population.keys(), bipartite=1)
# Randomly assign actors to work for each company
for cid in self.corps.keys():
# Equal number of people work for each company
eids = random.sample(self.population.keys(), int(len(self.population)/len(self.corps)))
for eid in eids:
self.G.add_edge(cid, eid)
self.n = len(self.population)
self.stages = [[self.population.copy(), self.corps.copy()]]
self.fig, self.axes = plt.subplots(nrows=2, ncols=2, figsize=(9, 4))
self.stats = list()
def id_gen(self):
eid = ''
# Guarantee unique
while len(eid) == 0:
eid = ''.join(random.sample(chars, 5))
if eid in self.ids:
eid = ''
self.ids.append(eid)
return eid
def transaction(self, a, c):
# Actor spends a random quantity of their wealth with the company
w = self.population[a]
r = np.random.uniform(0, self.population[a])
# Add edge to network between participants, weighted by transaction size
self.G.add_edge(a, c, weight=r)
return w - r, self.corps[c] + r
def distribute(self, cid, revenue, pop):
while revenue > 1:
for eid in self.G[cid].keys():
# pay = revenue / employees
pay = np.random.uniform(0, revenue)
# Give rounding error to this employee:
pay = revenue if pay < 1 else pay
pop[0][eid] += pay
pop[1][cid] -= pay
revenue -= pay
self.G.add_edge(cid, eid, weight=pay)
return pop
@staticmethod
def gini(y):
"""Calculate the Gini coefficient of a numpy array.
based on bottom eq: http://www.statsdirect.com/help/content/image/stat0206_wmf.gif
from: http://www.statsdirect.com/help/default.htm#nonparametric_methods/gini.htm
:param y: 1-dimensional numpy array, input series
:return: int, Gini coefficient
"""
y = sorted(y)
n = len(y)
numer = 2 * sum((i + 1) * y[i] for i in range(n))
denom = n * sum(y)
return (numer / denom) - (n + 1) / n
@staticmethod
def auto_bin(y, m=5):
""" Modified Freedman-Diaconis rule to optimize # of histogram bins
:param narray: numpy array
:param m: int, optional; minimum number of bins to return
:return: int, rounded number of optimal bins for histogram of sample
"""
narray = np.array(y)
iqr = np.subtract(*np.percentile(narray, [75, 25]))
return max(m, int(2 * iqr * narray.size ** (-1/3)))
def simulate(self, generations, events):
print(" ".join(self.cols))
print(" ".join(["-" * len(x) for x in self.cols]))
print(self.stat_row(0, 0))
for g in range(1, generations + 1):
pop = [self.population.copy(), self.corps.copy()]
for _ in range(events):
a, c = random.sample(pop[0].keys(), 1)[0], random.sample(pop[1].keys(), 1)[0]
pop[0][a], pop[1][c] = self.transaction(a=a, c=c)
for cid, revenue in pop[1].items():
pop = self.distribute(cid, revenue, pop)
self.stages.append(pop)
x = self.stat_row(g, events * g)
if g % (generations * 0.05) == 0:
print(x)
self.plot(generations, events)
def plot(self, g, e):
v_1 = [*self.stages[0][0].values()]
v_n = [*self.stages[-1][0].values()]
# Starting population
self.axes[0][0].hist(v_1, edgecolor=(0, 0, .14),
bins=self.auto_bin(v_1),
label="Starting population (n={:,.0f})".format(self.n),
range=(0, max(v_n)))
self.axes[0][0].set_title("Population Wealth")
self.axes[0][0].set_xlabel("Wealth")
# Ending population
self.axes[0][0].hist(v_n, color=(.7, 0, 0, 0.5), edgecolor=(.14, 0, 0),
bins=self.auto_bin(v_n),
label="Ending population (generation={:,.0f}; transactions={:,.0f})".format(g, g * e),
range=(0, max(v_n)))
# Gini index
self.summary()[self.cols[3]].plot(ax=self.axes[0][1])
self.axes[0][1].set_title("Wealth Inequality", fontsize=10)
self.axes[0][1].set_ylabel("Gini Coefficient")
self.axes[0][1].set_xlabel("Generation")
plt.show()
def summary(self):
return pd.DataFrame(data=self.stats, columns=self.cols)
cols = [x + " " * max(0, COLW - len(x)) for x in \
["generation", "cum events", "n", "G", "median", "stdev", "min", "25%", "50%", "90%", "99%", "max"]]
def stat_row(self, i, j):
""" Store and return summary statistics for most recent generation """
p = [*self.stages[-1][0].values()]
self.stats.append([i, j, len(p), self.gini(p), np.median(p), np.std(p),
min(p), np.percentile(p, 25), np.percentile(p, 50),
np.percentile(p, 90), np.percentile(p, 99), max(p)])
row = ["{:.2f}".format(x) for x in self.stats[-1]]
return " ".join([" " * max(0, COLW - len(x)) + x for x in row])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment