Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
An Evolutionary Model for the Emergence of Scale-Free Biological Networks
#!/usr/env/bin python3
# USAGE: LIB=work_space python3 interaction_topology_evolution.py
import random
import sys
import itertools
import math
import networkx as nx
import numpy as np
import os
from ctypes import cdll
from matplotlib import pyplot as plt
from itertools import combinations
LIB = os.getenv('LIB')
assert LIB and os.path.exists(LIB), 'environment variable LIB not properly set!'
sys.path.insert(0, LIB)
from lib import util, init, solver, reducer
plt.rc('text', usetex=True)
# An "individual" is identified by a set of genes.
class Individual(object):
def __init__(self, G, genes):
self.G = G
self.genes = set(genes)
self.benefits, self.damages = benefit_and_damage(G.subgraph(self.genes))
self.total_benefits = sum(self.benefits.values())
self.total_damages = sum(self.damages.values())
assert(self.total_damages >= 0 and self.total_benefits >= 0)
self.fitness = self.total_benefits
@property
def subgraph(self):
return self.G.subgraph(self.genes)
def __str__(self):
return '(benefits: %d, damages: %d, no. genes: %d) ' % \
(self.total_benefits, self.total_damages, len(self.genes))
# Produces a new Individual as follows:
# 1. each gene is deleted with probability d_prob
# 2. each gene is duplicated-and-diversified with probability i_prob.
#
# If mut_probs is not given; insertions pick a random gene uniformly
# distributed over the graph. If mut_probs is given, it is assumed to be a
# dict keyed by gene id such that mut_probs[g][g_] is the probability that
# a duplicate of g diversifies to g_.
def reproduce(self, d_prob, i_prob, mut_probs=None):
offspring_genes = set()
if not mut_probs:
pool = set(self.G.nodes()) - set(self.genes)
for g in self.genes:
if bernoulli(d_prob):
continue
offspring_genes.add(g)
if not bernoulli(i_prob):
continue
if mut_probs:
cands = list(mut_probs[g].keys())
probs = list(mut_probs[g].values())
new_gene = np.random.choice(cands, 1, p=probs)[0]
offspring_genes.add(new_gene)
else:
if not pool:
continue
new_gene = np.random.choice(list(pool), 1)[0]
offspring_genes.add(new_gene)
pool.remove(new_gene)
return Individual(self.G, offspring_genes)
# Plots the estimated probability mass function as a continuous curve
def plot_hist(ax, values, bins=None, **kw):
if not max(values):
return
all_hist, all_edges = np.histogram(values, bins, density=True)
all_ctrs = (all_edges[1:] + all_edges[:-1]) / 2.
hist, ctrs = [], []
assert(len(all_ctrs) == len(all_hist))
for idx in range(len(all_ctrs)):
if all_hist[idx] > 0:
hist.append(all_hist[idx])
ctrs.append(all_ctrs[idx])
ax.plot(ctrs, hist, **kw)
ax.set_ylim(0, 1)
# Assigns random weights to nodes of a graph. If mode is 'binary' weights are
# either of {-1, 0, 1} with Pr(weight != 0) = pressure and +1 and -1 equally
# likely. If mode is 'powerlaw', pressure is ignored and all weights are
# assigned from a power law distribution with exponent powerlaw_exp.
def assign_random_node_signs(G, pressure, mode='binary', powerlaw_exp=None):
assert mode in ['binary', 'powerlaw']
if mode == 'binary':
p = [pressure / 2., 1 - pressure, pressure / 2.]
node_signs = np.random.choice([-1, 0, 1], len(G), p=p)
elif mode == 'powerlaw':
assert powerlaw_exp is not None
# NOTE np.random.power draws from P(x) = ax^{a-1} for 0 < x < 1
# cf. http://stackoverflow.com/a/31117560
# We report the inverse of drawn samples (i.e x > 1) so if we want to
# draw from P(x) ~ x^{-k}; we should pass 1 + k as the "exponent"
# argument to np.random.power
assert powerlaw_exp > 0, 'Power law exponent (k in x^-k) should be given as a positive number'
node_signs = 1. / np.random.power(1 + powerlaw_exp, len(G))
node_signs = [int(s) if bernoulli() else -int(s) for s in node_signs]
node_signs = dict(zip(G.nodes(), node_signs))
nx.set_node_attributes(G, 'sign', node_signs)
# Assigns random edge weights from {-1, +1} to all edges of a given graph such
# that Pr(weight > 0) = edge_pos_prob.
def assign_random_edge_signs(G, edge_pos_prob):
n_edges = G.number_of_edges()
p = [1 - edge_pos_prob, edge_pos_prob]
edge_signs = dict(zip(G.edges(), np.random.choice([-1, 1], n_edges, p=p)))
nx.set_edge_attributes(G, 'sign', edge_signs)
# Converts all -1 edges to 0. This is used for a variation on the model in
# which the effect of a suppressed gene is 0 rather than the opposite of the
# weight the node.
def convert_to_binary_edge_signs(G):
signs = {(src, tgt): 0 if data['sign'] == -1 else data['sign']
for src, tgt, data in G.edges(data=True)}
nx.set_edge_attributes(G, 'sign', signs)
# Generates a random Erdos-Renyi graph with N nodes and edge probability given
# by edge_prob. Nodes and edges are assigned weights using
# assign_random_node_signs() and assign_random_edge_signs(), respectively.
def random_graph(N, edge_prob, pressure, edge_pos_prob, **node_kw):
G = nx.fast_gnp_random_graph(N, edge_prob, directed=True)
assign_random_node_signs(G, pressure, **node_kw)
assign_random_edge_signs(G, edge_pos_prob)
return G
# cf. Individual.reproduce
def mutation_probabilities_based_on_similarity(G):
es = G.edges()
neigh = lambda es, n: set([(set(e) - {n}).pop() for e in es if n in e])
i = []
ns = G.nodes()
neighs = {n: neigh(es, n) for n in ns}
dists = {n: {} for n in ns}
for n, _n in combinations(ns, 2):
common_neigh = len(neighs[n].intersection(neighs[_n]))
if common_neigh:
dists[n][_n] = common_neigh
dists[_n][n] = dists[n][_n]
mut_probs = {n: {} for n in ns}
for n in dists:
sum_dists = sum(dists[n].values())
for _n in dists[n]:
mut_probs[n][_n] = dists[n][_n] / sum_dists
return mut_probs
# Loads an interaction network from the given path; the file structure is
# expected to be as follows:
# - first line is considered to be headers and is ignored
# - every subsequent line "src tgt ±" describes an edge where src and tgt are
# strings representing the nodes and the sign is the sign of interaction.
# nodes are assigned signs randomly according to pressure ∈ [0,1].
#
# If N is provided the nodes in the graph are subsampled such that there are N
# nodes in the graph.
#
# If edge_pos_prob is given the signs of edges are overriden by random
# assignment as per assign_random_edge_signs().
def bio_graph(path, pressure, N=None, edge_pos_prob=None):
assert os.path.exists(path), 'Biological graph not found!'
G = nx.DiGraph()
with open(path) as f:
line = f.readline()
while True:
line = f.readline().strip()
if not line:
break
src, tgt, sign = line.split()
sign = 1 if sign == '+' else -1
G.add_edge(src, tgt, sign=sign)
assign_random_node_signs(G, pressure)
if edge_pos_prob is not None:
assign_random_edge_signs(G, edge_pos_prob)
if N is not None:
assert(N < len(G))
G = G.subgraph(random.sample(G.nodes(), N))
return G
# Calculates benefits and damages (keyed by node) for a given graph
def benefit_and_damage(G):
nodes = G.nodes()
benefit = dict(zip(nodes, [0] * len(nodes)))
damage = dict(zip(nodes, [0] * len(nodes)))
for src, tgt, edge_data in G.edges(data=True):
edge_effect = G.node[tgt]['sign'] * edge_data['sign']
if edge_effect > 0:
benefit[src] += edge_effect
benefit[tgt] += edge_effect
elif edge_effect < 0:
damage[src] += -edge_effect
damage[tgt] += -edge_effect
return benefit, damage
def random_gene_set(G, size):
return set(np.random.choice(G.nodes(), size))
def bernoulli(p_true=.5):
return bool(np.random.choice([0, 1], 1, p=[1 - p_true, p_true])[0])
# Solves the optimization problem using the knapsack solver.
def knapsack_solution(G, max_damage):
kp_solver = cdll.LoadLibrary(os.path.join(LIB, 'lib/kp_solvers/minknap.so'))
b, d = benefit_and_damage(G)
result = solver.solve_knapsack([(b, d, int(max_damage))], kp_solver)
B_in, D_in, B_out, D_out, genes_in, genes_out, \
green_genes, red_genes, grey_genes, coresize, execution_time = result
genes_in = [g[0] for g in genes_in] # each is a tuple of (gene, benefit, damage)
all_genes_indiv = Individual(G, G.nodes())
assert(B_in == sum(all_genes_indiv.benefits[node] for node in genes_in))
opt = Individual(G, genes_in)
sys.stderr.write('Knapsack Solution ** benefits: %d (reported: %d) ** damages: %d (reported: %d) ** no. genes: %d\n' % \
(opt.total_benefits, B_in, opt.total_damages, D_in, len(genes_in)))
return opt, B_in, D_in
# Given a population of individuals; simulates a reproductive cycle.
# Individuals are first filtered for exceeding max. admissible damage. Then the
# fittest individuals (as per survivorship) are copied as-is to next generation
# and the rest of the next generation (as per capacity) is populated by mutated
# descendents of the fittest individuals with fecundity ∝ fitness.
def next_generation(population, capacity, survivorship, max_damage,
d_prob, i_prob, mut_probs=None):
survivors = [i for i in sorted(population, key=lambda i: i.fitness)
if i.total_damages < max_damage]
if not survivors:
raise Exception('Population went instinct!')
survivors = survivors[int(survivorship * len(survivors)):]
next_gen = []
# keep the fittest as-is in next generation
next_gen += survivors
capacity -= len(next_gen)
# populate the rest with mutated offsprings according to fecundity ∝ fitness
total_fitness = sum(i.fitness for i in survivors)
for i in survivors:
if total_fitness:
fecundity = int(round(capacity * i.fitness / total_fitness))
else:
fecundity = int(round(capacity / len(survivors)))
next_gen += [i.reproduce(d_prob, i_prob, mut_probs=mut_probs)
for _ in range(fecundity)]
return next_gen
def experiment(plot_path, **kw):
sys.stdout.write('generating: %s\n' % plot_path)
sys.stdout.write('params:\n\t%s\n' % ('\n\t'.join('%s: %s' % (str(k), str(v)) for k,v in kw.items())))
# Generations for which to plot degree distributions
plot_on_gens = [100, 200, 300, 400]
# colors for each of the above generations
colors = 'rgbm'
pressure = kw['pressure']
edge_pos_prob = kw['edge_pos_prob']
tolerance = kw['tolerance']
d_prob, i_prob = kw['d_prob'], kw['i_prob']
capacity = kw['capacity']
survivorship = kw['survivorship']
gene_count = kw['gene_count']
assert kw['graph_type'] in ['ER', 'BIO']
if kw['graph_type'] == 'BIO':
G = bio_graph(kw['graph_path'], pressure, N=gene_count)
elif kw['graph_type'] == 'ER':
node_kw = {
'mode': kw['node_mode'],
'powerlaw_exp': kw.get('node_powerlaw_exp'),
}
G = random_graph(gene_count, kw['edge_prob'], pressure, edge_pos_prob, **node_kw)
assert kw['edge_sign_mode'] in ['0;1', '-1;+1']
if kw['edge_sign_mode'] == '0;1':
convert_to_binary_edge_signs(G)
assert kw['insertion_mode'] in ['uniform', 'similarity']
if kw['insertion_mode'] == 'similarity':
mut_probs = mutation_probabilities_based_on_similarity(G)
elif kw['insertion_mode'] == 'uniform':
mut_probs = None
# ================================================
signs = list(data['sign'] for src, tgt, data in G.edges(data=True))
edge_pos_prob_emp = sum(1 for sign in signs if sign > 0) / len(signs)
sys.stderr.write('Graph: %d nodes, %d edges, %.2f edge + probability\n' %
(len(G), len(G.edges()), edge_pos_prob_emp))
max_damage = tolerance * sum(benefit_and_damage(G)[1].values())
#nx.draw(G, pos=nx.circular_layout(G), ax=ax, node_size=1, node_color='k', alpha=.4, lw=.1)
ax_ideg = plt.subplot2grid((6, 2), (0, 0), rowspan=3) # in degrees
ax_odeg = plt.subplot2grid((6, 2), (3, 0), rowspan=3, sharex=ax_ideg) # out degrees
ax_ben = plt.subplot2grid((6, 2), (0, 1), rowspan=2) # benefits
ax_dam = plt.subplot2grid((6, 2), (2, 1), rowspan=2, sharex=ax_ben) # damages
ax_cnt = plt.subplot2grid((6, 2), (4, 1), rowspan=2, sharex=ax_ben) # gene count
for ax in [ax_ideg, ax_odeg, ax_ben, ax_dam, ax_cnt]:
ax.grid(True)
ax_ideg.set_xlabel('In degree')
ax_ideg.set_ylabel('Probability Mass')
ax_odeg.set_xlabel('Out degree')
ax_odeg.set_ylabel('Probability Mass')
ax_ideg.set_xscale('log', basex=2)
ax_ideg.set_yscale('log', basey=2)
ax_odeg.set_yscale('log', basey=2)
assert len(plot_on_gens) <= len(colors), \
'At most %d generations can be ploted, %d given!' % \
(len(colors), len(plot_on_gens))
kop_opt_indiv, kop_benefits, kop_damages = knapsack_solution(G, max_damage)
population = [Individual(G, random_gene_set(G, 1)) for _ in range(capacity)]
min_ds, max_ds, min_bs, max_bs = [], [], [], []
min_no_opt_genes, max_no_opt_genes, min_no_genes, max_no_genes = [], [], [], []
assert all(plot_on_gens[i] <= plot_on_gens[i+1] for i in range(len(plot_on_gens) - 1))
for color, (idx, max_gen) in zip(colors, enumerate(plot_on_gens)):
min_gen = plot_on_gens[idx - 1] if idx else 0
for i in range(min_gen, max_gen):
sim = [len(kop_opt_indiv.genes.intersection(indiv.genes)) for indiv in population]
b = [indiv.total_benefits for indiv in population]
d = [indiv.total_damages for indiv in population]
n = [len(indiv.genes) for indiv in population]
sys.stderr.write('#%s ** pop = %s ** no. genes = %s ** %s out of %s KOP opt. genes ** benefit = %s / %d ** damage = %s / %d\n' % \
(str(i + 1).ljust(4), str(len(population)).ljust(5), \
str((min(n), max(n))).ljust(10),
str((min(sim), max(sim))).ljust(10), str(len(kop_opt_indiv.genes)).ljust(4), \
str((min(b), max(b))).ljust(10), kop_opt_indiv.total_benefits, \
str((min(d), max(d))).ljust(10), max_damage
))
min_no_genes.append(min(n))
max_no_genes.append(max(n))
min_no_opt_genes.append(min(sim))
max_no_opt_genes.append(max(sim))
min_bs.append(min(b))
max_bs.append(max(b))
min_ds.append(min(d))
max_ds.append(max(d))
#population = next_generation(population, capacity, survivorship, max_damage, d_prob, i_prob)
population = next_generation(population, capacity, survivorship, max_damage, d_prob, i_prob, mut_probs=mut_probs)
sys.stderr.write('plotting ...\n')
scatter_kw = {
'color': color,
'lw': .5,
'alpha': .1,
'label': 'Gen. %d' % max_gen
}
first = True
for indiv in population:
if not indiv.genes:
continue
graph = indiv.subgraph
i_deg = list(graph.in_degree().values())
o_deg = list(graph.out_degree().values())
plot_hist(ax_ideg, i_deg, bins=range(1, max(i_deg) + 1), **scatter_kw)
plot_hist(ax_odeg, o_deg, bins=range(1, max(o_deg) + 1), **scatter_kw)
if first:
scatter_kw.pop('label')
first = False
opt_graph = kop_opt_indiv.subgraph
i_deg = list(opt_graph.in_degree().values())
o_deg = list(opt_graph.out_degree().values())
plot_hist(ax_ideg, i_deg, bins=range(1, max(i_deg) + 1), color='k', lw=3, alpha=.6, label='Knapsack solution')
plot_hist(ax_odeg, o_deg, bins=range(1, max(o_deg) + 1), color='k', lw=3, alpha=.6, label='Knapsack solution')
i_deg = list(G.in_degree().values())
o_deg = list(G.out_degree().values())
plot_hist(ax_ideg, i_deg, bins=range(1, max(i_deg) + 1), color='y', lw=3, alpha=.6, label='whole graph')
plot_hist(ax_odeg, o_deg, bins=range(1, max(o_deg) + 1), color='y', lw=3, alpha=.6, label='whole graph')
ax_ben.plot(range(plot_on_gens[-1]), max_bs, c='k')
ax_ben.plot(range(plot_on_gens[-1]), min_bs, c='k', linestyle='--')
ax_ben.axhline(kop_opt_indiv.total_benefits, *ax_ben.get_xlim(), c='g', lw=3, alpha=.4, label='Knapsack solution')
ax_ben.axhline(kop_benefits, *ax_ben.get_xlim(), c='m', lw=3, alpha=.4, label='Knapsack solution (reported)')
ax_ben.set_ylabel('Benefit')
ax_ben.set_xlabel('Generation')
ax_dam.plot(range(plot_on_gens[-1]), max_ds, c='k')
ax_dam.plot(range(plot_on_gens[-1]), min_ds, c='k', linestyle='--')
ax_dam.axhline(max_damage, *ax_dam.get_xlim(), c='m', lw=3, alpha=.4, label='Max. admissible total damage')
ax_dam.axhline(kop_opt_indiv.total_damages, *ax_dam.get_xlim(), c='r', lw=3, alpha=.4, label='Knapsack solution')
ax_dam.set_ylabel('Damage')
ax_dam.set_xlabel('Generation')
ax_cnt.plot(range(plot_on_gens[-1]), max_no_genes, c='k', label='No. of genes')
ax_cnt.plot(range(plot_on_gens[-1]), min_no_genes, c='k', linestyle='--')
ax_cnt.plot(range(plot_on_gens[-1]), max_no_opt_genes, c='b', label='No. of genes shared with knapsack solution')
ax_cnt.plot(range(plot_on_gens[-1]), min_no_opt_genes, c='b', linestyle='--')
ax_cnt.axhline(len(kop_opt_indiv.genes), *ax_dam.get_xlim(), c='g', lw=3, alpha=.4, label='Knapsack solution')
ax_cnt.set_ylim(None, ax_cnt.get_ylim()[1] * 1.1)
ax_cnt.set_xlabel('Generation')
ax_cnt.set_ylabel('Number of Genes')
for ax in [ax_ben, ax_dam, ax_cnt]:
leg = ax.legend(fontsize=12, loc=2)
for l in leg.get_lines():
l.set_alpha(1)
l.set_linewidth(2)
for ax in [ax_ideg, ax_odeg]:
leg = ax.legend(fontsize=12)
for l in leg.get_lines():
l.set_alpha(1)
l.set_linewidth(2)
fig = plt.gcf()
fig.set_figwidth(20)
fig.set_figheight(12)
plt.tight_layout()
fig.savefig(plot_path, dpi=500)
plt.clf()
def plot_binomials(path):
ax = plt.gca()
colors = 'rgbm'
ps = [.01, .005, .001, .0005]
for p, color in zip(ps, colors):
x = np.random.binomial(1000, p, size=1e6)
x = [max(_x, 1) for _x in x]
plot_hist(ax, x, bins=range(min(x), max(x)), color=color, lw=3, alpha=.6, label='$\sim \mathcal{B}(1000, %.4f)$' % p)
ax.set_xscale('log', basex=2)
ax.set_yscale('log', basey=2)
ax.set_ylim(0, 1)
ax.grid(True)
ax.legend(fontsize=12)
plt.tight_layout()
plt.savefig(path, dpi=300)
plt.clf()
if __name__ == '__main__':
plot_binomials('binomials.png')
kw = {
'pressure': .9,
'edge_pos_prob': .7,
'tolerance': .1,
'd_prob': .1,
'i_prob': .1,
'capacity': 200,
'survivorship': .5,
'gene_count': 1000,
'graph_type': 'ER',
'edge_prob': .01,
#'graph_type': 'BIO',
#'graph_path': LIB + '/data/input/Example_Network.txt',
'node_mode': 'binary',
#'node_mode': 'powerlaw',
#'node_powerlaw_exp': 2,
#'edge_sign_mode': '-1;+1',
'edge_sign_mode': '0;1',
#'insertion_mode': 'uniform',
'insertion_mode': 'similarity',
}
f = ','.join(['%s:%s' % (str(k), str(v))
for k, v in sorted(kw.items())
if k != 'graph_path']) + '.png'
experiment(f, **kw)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.