Skip to content

Instantly share code, notes, and snippets.

@mazieres
Created April 17, 2015 05:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mazieres/8391e0e40918185afad5 to your computer and use it in GitHub Desktop.
Save mazieres/8391e0e40918185afad5 to your computer and use it in GitHub Desktop.
# This script implements the methodology described in Chap. 7 of
# Mateos, Pablo. "Names, Ethnicity and Populations". Springer, 2014.
# For bipartite (fornames, surnames) graph projection for unsupervised
# learning of names ethnicity.
# It uses Louvain instead of FastCommunity however.
# Works fine with pypy.
#
# by Antoine Mazières (http://mazier.es ; {github|twitter}@mazieres)
# Cortext Lab -- http://www.cortext.net/
import networkx as nx
import unittest
import community
from collections import defaultdict
from itertools import combinations
import json
import gzip
class TestMkNameGraph(unittest.TestCase):
def setUp(self):
# Dupont Martin
# Pierre 6 0
# Paul 5 4
# Jacques 3 7
self.names = [('Pierre', 'Dupont')] * 6 + [('Paul', 'Dupont')] * 5 + [('Paul', 'Martin')] * 4 + [('Jacques', 'Dupont')] * 3 + [('Jacques', 'Martin')] * 7
def test_mk_raw_dict(self):
expected = {
'Dupont': {'Pierre': 6, 'Paul': 5, 'Jacques': 3},
'Martin': {'Paul': 4, 'Jacques': 7},
}
tested = mk_raw_dict(self.names)
msg = '\nExpected:\n{}\nGot:\n{}'.format(expected, tested)
self.assertEqual(expected, tested, msg=msg)
def test_mk_sums(self):
fn_sum = {'Pierre': 6, 'Paul': 9, 'Jacques': 10}
sn_sum = {'Dupont': 14, 'Martin': 11}
N = 25
expected = fn_sum, sn_sum, N
raw_dict = mk_raw_dict(self.names)
tested = mk_sums(raw_dict)
msg = '\nExpected:\n{}\nGot:\n{}'.format(expected, tested)
self.assertEqual(expected, tested, msg=msg)
def test_apply_thres(self):
expected = {
'Dupont': {'Pierre': 6},
'Martin': {'Paul': 4, 'Jacques': 7},
}
raw_dict = mk_raw_dict(self.names)
tested = apply_thres(raw_dict, 1)
msg = '\nExpected:\n{}\nGot:\n{}'.format(expected, tested)
self.assertEqual(expected, tested, msg=msg)
def test_bi_proj(self):
thres_dict = {
'Dupont': {'Pierre': 6, 'Paul': 1, 'Jacques': 3},
'Martin': {'Paul': 4, 'Jacques': 7},
'Lopez': {'Pierre': 4, 'Jacques': 9}
}
dupont_martin = (2*1*4)/float(5*(5-1)) + (2*3*7)/float(19*(19-1))
dupont_lopez = (2*6*4)/float(10*(10-1)) + (2*3*9)/float(19*(19-1))
martin_lopez = (2*7*9)/float(19*(19-1))
expected = {
'Dupont': {
'Martin': {'weight': dupont_martin},
'Lopez': {'weight': dupont_lopez}
},
'Martin': {
'Dupont': {'weight': dupont_martin},
'Lopez': {'weight': martin_lopez}
},
'Lopez': {
'Dupont': {'weight': dupont_lopez},
'Martin': {'weight': martin_lopez}
}
}
tested = nx.to_dict_of_dicts(bi_proj(thres_dict))
msg = '\nExpected:\n{}\nGot:\n{}'.format(expected, tested)
self.assertEqual(expected, tested, msg=msg)
def mk_raw_dict(names):
res = defaultdict(lambda: defaultdict(int))
for n in names:
fn = n[0]
sn = n[1]
res[sn][fn] += 1
return res
def mk_sums(raw_dict):
N = 0
fn_sum = defaultdict(int)
sn_sum = defaultdict(int)
for sn, fns in raw_dict.iteritems():
for fn, v in fns.iteritems():
sn_sum[sn] += v
fn_sum[fn] += v
N += v
return fn_sum, sn_sum, N
def apply_thres(raw_dict, k):
res = defaultdict(lambda: defaultdict(int))
fn_sum, sn_sum, N = mk_sums(raw_dict)
for sn, fns in raw_dict.iteritems():
for fn, v in fns.iteritems():
thres = (k * fn_sum[fn] * sn_sum[sn]) / float(N)
if v > thres:
res[sn][fn] = raw_dict[sn][fn]
return res
def bi_proj(thres_dict, np_thres=0):
G = nx.Graph()
fn_sum = defaultdict(int)
for fns in thres_dict.values():
for fn, v in fns.iteritems():
fn_sum[fn] += v
for fn_pair in combinations(thres_dict.keys(), 2):
x, y = fn_pair
np = 0
for fn, v in thres_dict[x].iteritems():
if fn in thres_dict[y]:
np += (2 * v * thres_dict[y][fn]) / float(fn_sum[fn] * (fn_sum[fn] - 1))
if np > np_thres:
G.add_edge(*fn_pair, weight=np)
return G
def clusterize(G):
# partition = community.best_partition(G)
dendo = community.generate_dendrogram(G)
partition = community.partition_at_level(dendo, len(dendo) - 1)
clusters = defaultdict(list)
for k, v in partition.iteritems():
clusters[v].append(k)
for k, v in clusters.iteritems():
f = open('clusters/{}.txt'.format(k), 'wb')
for name in v:
f.write(name+'\n')
f.close()
return clusters
def save_graph(G):
f = gzip.open('graph.json.gz', 'wb')
f.write(json.dumps(nx.to_dict_of_dicts(G)))
f.close()
def load_graph(path):
f = gzip.open('graph.json.gz', 'r')
dat = json.load(f)
f.close()
return nx.from_dict_of_dicts(dat)
def mk_graph(k=100, np_thres=0):
names = [(x.split(';')[1].split(' ')[0].strip(), x.split(';')[0].strip()) for x in open('sample_names.txt')]
raw_dict = mk_raw_dict(names)
thres_dict = apply_thres(raw_dict, k)
G = bi_proj(thres_dict, np_thres=np_thres)
print "number of nodes:", len(G.nodes())
print "number of edges:", len(G.edges())
return G
def get_biggest(G):
cc = [len(x) for x in list(nx.connected_components(G))]
idx_max = cc.index(max(cc))
bigg = list(nx.connected_component_subgraphs(G))[idx_max]
return bigg
if __name__ == '__main__':
G = mk_graph()
GG = get_biggest(G)
_ = clusterize(GG)
print "Done."
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment