An Evolutionary Model for the Emergence of Scale-Free Biological Networks
#!/usr/env/bin python3 | |
# USAGE: LIB=work_space python3 interaction_topology_evolution.py | |
import random | |
import sys | |
import itertools | |
import math | |
import networkx as nx | |
import numpy as np | |
import os | |
from ctypes import cdll | |
from matplotlib import pyplot as plt | |
from itertools import combinations | |
LIB = os.getenv('LIB') | |
assert LIB and os.path.exists(LIB), 'environment variable LIB not properly set!' | |
sys.path.insert(0, LIB) | |
from lib import util, init, solver, reducer | |
plt.rc('text', usetex=True) | |
# An "individual" is identified by a set of genes. | |
class Individual(object): | |
def __init__(self, G, genes): | |
self.G = G | |
self.genes = set(genes) | |
self.benefits, self.damages = benefit_and_damage(G.subgraph(self.genes)) | |
self.total_benefits = sum(self.benefits.values()) | |
self.total_damages = sum(self.damages.values()) | |
assert(self.total_damages >= 0 and self.total_benefits >= 0) | |
self.fitness = self.total_benefits | |
@property | |
def subgraph(self): | |
return self.G.subgraph(self.genes) | |
def __str__(self): | |
return '(benefits: %d, damages: %d, no. genes: %d) ' % \ | |
(self.total_benefits, self.total_damages, len(self.genes)) | |
# Produces a new Individual as follows: | |
# 1. each gene is deleted with probability d_prob | |
# 2. each gene is duplicated-and-diversified with probability i_prob. | |
# | |
# If mut_probs is not given; insertions pick a random gene uniformly | |
# distributed over the graph. If mut_probs is given, it is assumed to be a | |
# dict keyed by gene id such that mut_probs[g][g_] is the probability that | |
# a duplicate of g diversifies to g_. | |
def reproduce(self, d_prob, i_prob, mut_probs=None): | |
offspring_genes = set() | |
if not mut_probs: | |
pool = set(self.G.nodes()) - set(self.genes) | |
for g in self.genes: | |
if bernoulli(d_prob): | |
continue | |
offspring_genes.add(g) | |
if not bernoulli(i_prob): | |
continue | |
if mut_probs: | |
cands = list(mut_probs[g].keys()) | |
probs = list(mut_probs[g].values()) | |
new_gene = np.random.choice(cands, 1, p=probs)[0] | |
offspring_genes.add(new_gene) | |
else: | |
if not pool: | |
continue | |
new_gene = np.random.choice(list(pool), 1)[0] | |
offspring_genes.add(new_gene) | |
pool.remove(new_gene) | |
return Individual(self.G, offspring_genes) | |
# Plots the estimated probability mass function as a continuous curve | |
def plot_hist(ax, values, bins=None, **kw): | |
if not max(values): | |
return | |
all_hist, all_edges = np.histogram(values, bins, density=True) | |
all_ctrs = (all_edges[1:] + all_edges[:-1]) / 2. | |
hist, ctrs = [], [] | |
assert(len(all_ctrs) == len(all_hist)) | |
for idx in range(len(all_ctrs)): | |
if all_hist[idx] > 0: | |
hist.append(all_hist[idx]) | |
ctrs.append(all_ctrs[idx]) | |
ax.plot(ctrs, hist, **kw) | |
ax.set_ylim(0, 1) | |
# Assigns random weights to nodes of a graph. If mode is 'binary' weights are | |
# either of {-1, 0, 1} with Pr(weight != 0) = pressure and +1 and -1 equally | |
# likely. If mode is 'powerlaw', pressure is ignored and all weights are | |
# assigned from a power law distribution with exponent powerlaw_exp. | |
def assign_random_node_signs(G, pressure, mode='binary', powerlaw_exp=None): | |
assert mode in ['binary', 'powerlaw'] | |
if mode == 'binary': | |
p = [pressure / 2., 1 - pressure, pressure / 2.] | |
node_signs = np.random.choice([-1, 0, 1], len(G), p=p) | |
elif mode == 'powerlaw': | |
assert powerlaw_exp is not None | |
# NOTE np.random.power draws from P(x) = ax^{a-1} for 0 < x < 1 | |
# cf. http://stackoverflow.com/a/31117560 | |
# We report the inverse of drawn samples (i.e x > 1) so if we want to | |
# draw from P(x) ~ x^{-k}; we should pass 1 + k as the "exponent" | |
# argument to np.random.power | |
assert powerlaw_exp > 0, 'Power law exponent (k in x^-k) should be given as a positive number' | |
node_signs = 1. / np.random.power(1 + powerlaw_exp, len(G)) | |
node_signs = [int(s) if bernoulli() else -int(s) for s in node_signs] | |
node_signs = dict(zip(G.nodes(), node_signs)) | |
nx.set_node_attributes(G, 'sign', node_signs) | |
# Assigns random edge weights from {-1, +1} to all edges of a given graph such | |
# that Pr(weight > 0) = edge_pos_prob. | |
def assign_random_edge_signs(G, edge_pos_prob): | |
n_edges = G.number_of_edges() | |
p = [1 - edge_pos_prob, edge_pos_prob] | |
edge_signs = dict(zip(G.edges(), np.random.choice([-1, 1], n_edges, p=p))) | |
nx.set_edge_attributes(G, 'sign', edge_signs) | |
# Converts all -1 edges to 0. This is used for a variation on the model in | |
# which the effect of a suppressed gene is 0 rather than the opposite of the | |
# weight the node. | |
def convert_to_binary_edge_signs(G): | |
signs = {(src, tgt): 0 if data['sign'] == -1 else data['sign'] | |
for src, tgt, data in G.edges(data=True)} | |
nx.set_edge_attributes(G, 'sign', signs) | |
# Generates a random Erdos-Renyi graph with N nodes and edge probability given | |
# by edge_prob. Nodes and edges are assigned weights using | |
# assign_random_node_signs() and assign_random_edge_signs(), respectively. | |
def random_graph(N, edge_prob, pressure, edge_pos_prob, **node_kw): | |
G = nx.fast_gnp_random_graph(N, edge_prob, directed=True) | |
assign_random_node_signs(G, pressure, **node_kw) | |
assign_random_edge_signs(G, edge_pos_prob) | |
return G | |
# cf. Individual.reproduce | |
def mutation_probabilities_based_on_similarity(G): | |
es = G.edges() | |
neigh = lambda es, n: set([(set(e) - {n}).pop() for e in es if n in e]) | |
i = [] | |
ns = G.nodes() | |
neighs = {n: neigh(es, n) for n in ns} | |
dists = {n: {} for n in ns} | |
for n, _n in combinations(ns, 2): | |
common_neigh = len(neighs[n].intersection(neighs[_n])) | |
if common_neigh: | |
dists[n][_n] = common_neigh | |
dists[_n][n] = dists[n][_n] | |
mut_probs = {n: {} for n in ns} | |
for n in dists: | |
sum_dists = sum(dists[n].values()) | |
for _n in dists[n]: | |
mut_probs[n][_n] = dists[n][_n] / sum_dists | |
return mut_probs | |
# Loads an interaction network from the given path; the file structure is | |
# expected to be as follows: | |
# - first line is considered to be headers and is ignored | |
# - every subsequent line "src tgt ±" describes an edge where src and tgt are | |
# strings representing the nodes and the sign is the sign of interaction. | |
# nodes are assigned signs randomly according to pressure ∈ [0,1]. | |
# | |
# If N is provided the nodes in the graph are subsampled such that there are N | |
# nodes in the graph. | |
# | |
# If edge_pos_prob is given the signs of edges are overriden by random | |
# assignment as per assign_random_edge_signs(). | |
def bio_graph(path, pressure, N=None, edge_pos_prob=None): | |
assert os.path.exists(path), 'Biological graph not found!' | |
G = nx.DiGraph() | |
with open(path) as f: | |
line = f.readline() | |
while True: | |
line = f.readline().strip() | |
if not line: | |
break | |
src, tgt, sign = line.split() | |
sign = 1 if sign == '+' else -1 | |
G.add_edge(src, tgt, sign=sign) | |
assign_random_node_signs(G, pressure) | |
if edge_pos_prob is not None: | |
assign_random_edge_signs(G, edge_pos_prob) | |
if N is not None: | |
assert(N < len(G)) | |
G = G.subgraph(random.sample(G.nodes(), N)) | |
return G | |
# Calculates benefits and damages (keyed by node) for a given graph | |
def benefit_and_damage(G): | |
nodes = G.nodes() | |
benefit = dict(zip(nodes, [0] * len(nodes))) | |
damage = dict(zip(nodes, [0] * len(nodes))) | |
for src, tgt, edge_data in G.edges(data=True): | |
edge_effect = G.node[tgt]['sign'] * edge_data['sign'] | |
if edge_effect > 0: | |
benefit[src] += edge_effect | |
benefit[tgt] += edge_effect | |
elif edge_effect < 0: | |
damage[src] += -edge_effect | |
damage[tgt] += -edge_effect | |
return benefit, damage | |
def random_gene_set(G, size): | |
return set(np.random.choice(G.nodes(), size)) | |
def bernoulli(p_true=.5): | |
return bool(np.random.choice([0, 1], 1, p=[1 - p_true, p_true])[0]) | |
# Solves the optimization problem using the knapsack solver. | |
def knapsack_solution(G, max_damage): | |
kp_solver = cdll.LoadLibrary(os.path.join(LIB, 'lib/kp_solvers/minknap.so')) | |
b, d = benefit_and_damage(G) | |
result = solver.solve_knapsack([(b, d, int(max_damage))], kp_solver) | |
B_in, D_in, B_out, D_out, genes_in, genes_out, \ | |
green_genes, red_genes, grey_genes, coresize, execution_time = result | |
genes_in = [g[0] for g in genes_in] # each is a tuple of (gene, benefit, damage) | |
all_genes_indiv = Individual(G, G.nodes()) | |
assert(B_in == sum(all_genes_indiv.benefits[node] for node in genes_in)) | |
opt = Individual(G, genes_in) | |
sys.stderr.write('Knapsack Solution ** benefits: %d (reported: %d) ** damages: %d (reported: %d) ** no. genes: %d\n' % \ | |
(opt.total_benefits, B_in, opt.total_damages, D_in, len(genes_in))) | |
return opt, B_in, D_in | |
# Given a population of individuals; simulates a reproductive cycle. | |
# Individuals are first filtered for exceeding max. admissible damage. Then the | |
# fittest individuals (as per survivorship) are copied as-is to next generation | |
# and the rest of the next generation (as per capacity) is populated by mutated | |
# descendents of the fittest individuals with fecundity ∝ fitness. | |
def next_generation(population, capacity, survivorship, max_damage, | |
d_prob, i_prob, mut_probs=None): | |
survivors = [i for i in sorted(population, key=lambda i: i.fitness) | |
if i.total_damages < max_damage] | |
if not survivors: | |
raise Exception('Population went instinct!') | |
survivors = survivors[int(survivorship * len(survivors)):] | |
next_gen = [] | |
# keep the fittest as-is in next generation | |
next_gen += survivors | |
capacity -= len(next_gen) | |
# populate the rest with mutated offsprings according to fecundity ∝ fitness | |
total_fitness = sum(i.fitness for i in survivors) | |
for i in survivors: | |
if total_fitness: | |
fecundity = int(round(capacity * i.fitness / total_fitness)) | |
else: | |
fecundity = int(round(capacity / len(survivors))) | |
next_gen += [i.reproduce(d_prob, i_prob, mut_probs=mut_probs) | |
for _ in range(fecundity)] | |
return next_gen | |
def experiment(plot_path, **kw): | |
sys.stdout.write('generating: %s\n' % plot_path) | |
sys.stdout.write('params:\n\t%s\n' % ('\n\t'.join('%s: %s' % (str(k), str(v)) for k,v in kw.items()))) | |
# Generations for which to plot degree distributions | |
plot_on_gens = [100, 200, 300, 400] | |
# colors for each of the above generations | |
colors = 'rgbm' | |
pressure = kw['pressure'] | |
edge_pos_prob = kw['edge_pos_prob'] | |
tolerance = kw['tolerance'] | |
d_prob, i_prob = kw['d_prob'], kw['i_prob'] | |
capacity = kw['capacity'] | |
survivorship = kw['survivorship'] | |
gene_count = kw['gene_count'] | |
assert kw['graph_type'] in ['ER', 'BIO'] | |
if kw['graph_type'] == 'BIO': | |
G = bio_graph(kw['graph_path'], pressure, N=gene_count) | |
elif kw['graph_type'] == 'ER': | |
node_kw = { | |
'mode': kw['node_mode'], | |
'powerlaw_exp': kw.get('node_powerlaw_exp'), | |
} | |
G = random_graph(gene_count, kw['edge_prob'], pressure, edge_pos_prob, **node_kw) | |
assert kw['edge_sign_mode'] in ['0;1', '-1;+1'] | |
if kw['edge_sign_mode'] == '0;1': | |
convert_to_binary_edge_signs(G) | |
assert kw['insertion_mode'] in ['uniform', 'similarity'] | |
if kw['insertion_mode'] == 'similarity': | |
mut_probs = mutation_probabilities_based_on_similarity(G) | |
elif kw['insertion_mode'] == 'uniform': | |
mut_probs = None | |
# ================================================ | |
signs = list(data['sign'] for src, tgt, data in G.edges(data=True)) | |
edge_pos_prob_emp = sum(1 for sign in signs if sign > 0) / len(signs) | |
sys.stderr.write('Graph: %d nodes, %d edges, %.2f edge + probability\n' % | |
(len(G), len(G.edges()), edge_pos_prob_emp)) | |
max_damage = tolerance * sum(benefit_and_damage(G)[1].values()) | |
#nx.draw(G, pos=nx.circular_layout(G), ax=ax, node_size=1, node_color='k', alpha=.4, lw=.1) | |
ax_ideg = plt.subplot2grid((6, 2), (0, 0), rowspan=3) # in degrees | |
ax_odeg = plt.subplot2grid((6, 2), (3, 0), rowspan=3, sharex=ax_ideg) # out degrees | |
ax_ben = plt.subplot2grid((6, 2), (0, 1), rowspan=2) # benefits | |
ax_dam = plt.subplot2grid((6, 2), (2, 1), rowspan=2, sharex=ax_ben) # damages | |
ax_cnt = plt.subplot2grid((6, 2), (4, 1), rowspan=2, sharex=ax_ben) # gene count | |
for ax in [ax_ideg, ax_odeg, ax_ben, ax_dam, ax_cnt]: | |
ax.grid(True) | |
ax_ideg.set_xlabel('In degree') | |
ax_ideg.set_ylabel('Probability Mass') | |
ax_odeg.set_xlabel('Out degree') | |
ax_odeg.set_ylabel('Probability Mass') | |
ax_ideg.set_xscale('log', basex=2) | |
ax_ideg.set_yscale('log', basey=2) | |
ax_odeg.set_yscale('log', basey=2) | |
assert len(plot_on_gens) <= len(colors), \ | |
'At most %d generations can be ploted, %d given!' % \ | |
(len(colors), len(plot_on_gens)) | |
kop_opt_indiv, kop_benefits, kop_damages = knapsack_solution(G, max_damage) | |
population = [Individual(G, random_gene_set(G, 1)) for _ in range(capacity)] | |
min_ds, max_ds, min_bs, max_bs = [], [], [], [] | |
min_no_opt_genes, max_no_opt_genes, min_no_genes, max_no_genes = [], [], [], [] | |
assert all(plot_on_gens[i] <= plot_on_gens[i+1] for i in range(len(plot_on_gens) - 1)) | |
for color, (idx, max_gen) in zip(colors, enumerate(plot_on_gens)): | |
min_gen = plot_on_gens[idx - 1] if idx else 0 | |
for i in range(min_gen, max_gen): | |
sim = [len(kop_opt_indiv.genes.intersection(indiv.genes)) for indiv in population] | |
b = [indiv.total_benefits for indiv in population] | |
d = [indiv.total_damages for indiv in population] | |
n = [len(indiv.genes) for indiv in population] | |
sys.stderr.write('#%s ** pop = %s ** no. genes = %s ** %s out of %s KOP opt. genes ** benefit = %s / %d ** damage = %s / %d\n' % \ | |
(str(i + 1).ljust(4), str(len(population)).ljust(5), \ | |
str((min(n), max(n))).ljust(10), | |
str((min(sim), max(sim))).ljust(10), str(len(kop_opt_indiv.genes)).ljust(4), \ | |
str((min(b), max(b))).ljust(10), kop_opt_indiv.total_benefits, \ | |
str((min(d), max(d))).ljust(10), max_damage | |
)) | |
min_no_genes.append(min(n)) | |
max_no_genes.append(max(n)) | |
min_no_opt_genes.append(min(sim)) | |
max_no_opt_genes.append(max(sim)) | |
min_bs.append(min(b)) | |
max_bs.append(max(b)) | |
min_ds.append(min(d)) | |
max_ds.append(max(d)) | |
#population = next_generation(population, capacity, survivorship, max_damage, d_prob, i_prob) | |
population = next_generation(population, capacity, survivorship, max_damage, d_prob, i_prob, mut_probs=mut_probs) | |
sys.stderr.write('plotting ...\n') | |
scatter_kw = { | |
'color': color, | |
'lw': .5, | |
'alpha': .1, | |
'label': 'Gen. %d' % max_gen | |
} | |
first = True | |
for indiv in population: | |
if not indiv.genes: | |
continue | |
graph = indiv.subgraph | |
i_deg = list(graph.in_degree().values()) | |
o_deg = list(graph.out_degree().values()) | |
plot_hist(ax_ideg, i_deg, bins=range(1, max(i_deg) + 1), **scatter_kw) | |
plot_hist(ax_odeg, o_deg, bins=range(1, max(o_deg) + 1), **scatter_kw) | |
if first: | |
scatter_kw.pop('label') | |
first = False | |
opt_graph = kop_opt_indiv.subgraph | |
i_deg = list(opt_graph.in_degree().values()) | |
o_deg = list(opt_graph.out_degree().values()) | |
plot_hist(ax_ideg, i_deg, bins=range(1, max(i_deg) + 1), color='k', lw=3, alpha=.6, label='Knapsack solution') | |
plot_hist(ax_odeg, o_deg, bins=range(1, max(o_deg) + 1), color='k', lw=3, alpha=.6, label='Knapsack solution') | |
i_deg = list(G.in_degree().values()) | |
o_deg = list(G.out_degree().values()) | |
plot_hist(ax_ideg, i_deg, bins=range(1, max(i_deg) + 1), color='y', lw=3, alpha=.6, label='whole graph') | |
plot_hist(ax_odeg, o_deg, bins=range(1, max(o_deg) + 1), color='y', lw=3, alpha=.6, label='whole graph') | |
ax_ben.plot(range(plot_on_gens[-1]), max_bs, c='k') | |
ax_ben.plot(range(plot_on_gens[-1]), min_bs, c='k', linestyle='--') | |
ax_ben.axhline(kop_opt_indiv.total_benefits, *ax_ben.get_xlim(), c='g', lw=3, alpha=.4, label='Knapsack solution') | |
ax_ben.axhline(kop_benefits, *ax_ben.get_xlim(), c='m', lw=3, alpha=.4, label='Knapsack solution (reported)') | |
ax_ben.set_ylabel('Benefit') | |
ax_ben.set_xlabel('Generation') | |
ax_dam.plot(range(plot_on_gens[-1]), max_ds, c='k') | |
ax_dam.plot(range(plot_on_gens[-1]), min_ds, c='k', linestyle='--') | |
ax_dam.axhline(max_damage, *ax_dam.get_xlim(), c='m', lw=3, alpha=.4, label='Max. admissible total damage') | |
ax_dam.axhline(kop_opt_indiv.total_damages, *ax_dam.get_xlim(), c='r', lw=3, alpha=.4, label='Knapsack solution') | |
ax_dam.set_ylabel('Damage') | |
ax_dam.set_xlabel('Generation') | |
ax_cnt.plot(range(plot_on_gens[-1]), max_no_genes, c='k', label='No. of genes') | |
ax_cnt.plot(range(plot_on_gens[-1]), min_no_genes, c='k', linestyle='--') | |
ax_cnt.plot(range(plot_on_gens[-1]), max_no_opt_genes, c='b', label='No. of genes shared with knapsack solution') | |
ax_cnt.plot(range(plot_on_gens[-1]), min_no_opt_genes, c='b', linestyle='--') | |
ax_cnt.axhline(len(kop_opt_indiv.genes), *ax_dam.get_xlim(), c='g', lw=3, alpha=.4, label='Knapsack solution') | |
ax_cnt.set_ylim(None, ax_cnt.get_ylim()[1] * 1.1) | |
ax_cnt.set_xlabel('Generation') | |
ax_cnt.set_ylabel('Number of Genes') | |
for ax in [ax_ben, ax_dam, ax_cnt]: | |
leg = ax.legend(fontsize=12, loc=2) | |
for l in leg.get_lines(): | |
l.set_alpha(1) | |
l.set_linewidth(2) | |
for ax in [ax_ideg, ax_odeg]: | |
leg = ax.legend(fontsize=12) | |
for l in leg.get_lines(): | |
l.set_alpha(1) | |
l.set_linewidth(2) | |
fig = plt.gcf() | |
fig.set_figwidth(20) | |
fig.set_figheight(12) | |
plt.tight_layout() | |
fig.savefig(plot_path, dpi=500) | |
plt.clf() | |
def plot_binomials(path): | |
ax = plt.gca() | |
colors = 'rgbm' | |
ps = [.01, .005, .001, .0005] | |
for p, color in zip(ps, colors): | |
x = np.random.binomial(1000, p, size=1e6) | |
x = [max(_x, 1) for _x in x] | |
plot_hist(ax, x, bins=range(min(x), max(x)), color=color, lw=3, alpha=.6, label='$\sim \mathcal{B}(1000, %.4f)$' % p) | |
ax.set_xscale('log', basex=2) | |
ax.set_yscale('log', basey=2) | |
ax.set_ylim(0, 1) | |
ax.grid(True) | |
ax.legend(fontsize=12) | |
plt.tight_layout() | |
plt.savefig(path, dpi=300) | |
plt.clf() | |
if __name__ == '__main__': | |
plot_binomials('binomials.png') | |
kw = { | |
'pressure': .9, | |
'edge_pos_prob': .7, | |
'tolerance': .1, | |
'd_prob': .1, | |
'i_prob': .1, | |
'capacity': 200, | |
'survivorship': .5, | |
'gene_count': 1000, | |
'graph_type': 'ER', | |
'edge_prob': .01, | |
#'graph_type': 'BIO', | |
#'graph_path': LIB + '/data/input/Example_Network.txt', | |
'node_mode': 'binary', | |
#'node_mode': 'powerlaw', | |
#'node_powerlaw_exp': 2, | |
#'edge_sign_mode': '-1;+1', | |
'edge_sign_mode': '0;1', | |
#'insertion_mode': 'uniform', | |
'insertion_mode': 'similarity', | |
} | |
f = ','.join(['%s:%s' % (str(k), str(v)) | |
for k, v in sorted(kw.items()) | |
if k != 'graph_path']) + '.png' | |
experiment(f, **kw) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment