Created
April 17, 2015 05:37
-
-
Save mazieres/8391e0e40918185afad5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This script implements the methodology described in Chap. 7 of | |
# Mateos, Pablo. "Names, Ethnicity and Populations". Springer, 2014. | |
# For bipartite (fornames, surnames) graph projection for unsupervised | |
# learning of names ethnicity. | |
# It uses Louvain instead of FastCommunity however. | |
# Works fine with pypy. | |
# | |
# by Antoine Mazières (http://mazier.es ; {github|twitter}@mazieres) | |
# Cortext Lab -- http://www.cortext.net/ | |
import networkx as nx | |
import unittest | |
import community | |
from collections import defaultdict | |
from itertools import combinations | |
import json | |
import gzip | |
class TestMkNameGraph(unittest.TestCase): | |
def setUp(self): | |
# Dupont Martin | |
# Pierre 6 0 | |
# Paul 5 4 | |
# Jacques 3 7 | |
self.names = [('Pierre', 'Dupont')] * 6 + [('Paul', 'Dupont')] * 5 + [('Paul', 'Martin')] * 4 + [('Jacques', 'Dupont')] * 3 + [('Jacques', 'Martin')] * 7 | |
def test_mk_raw_dict(self): | |
expected = { | |
'Dupont': {'Pierre': 6, 'Paul': 5, 'Jacques': 3}, | |
'Martin': {'Paul': 4, 'Jacques': 7}, | |
} | |
tested = mk_raw_dict(self.names) | |
msg = '\nExpected:\n{}\nGot:\n{}'.format(expected, tested) | |
self.assertEqual(expected, tested, msg=msg) | |
def test_mk_sums(self): | |
fn_sum = {'Pierre': 6, 'Paul': 9, 'Jacques': 10} | |
sn_sum = {'Dupont': 14, 'Martin': 11} | |
N = 25 | |
expected = fn_sum, sn_sum, N | |
raw_dict = mk_raw_dict(self.names) | |
tested = mk_sums(raw_dict) | |
msg = '\nExpected:\n{}\nGot:\n{}'.format(expected, tested) | |
self.assertEqual(expected, tested, msg=msg) | |
def test_apply_thres(self): | |
expected = { | |
'Dupont': {'Pierre': 6}, | |
'Martin': {'Paul': 4, 'Jacques': 7}, | |
} | |
raw_dict = mk_raw_dict(self.names) | |
tested = apply_thres(raw_dict, 1) | |
msg = '\nExpected:\n{}\nGot:\n{}'.format(expected, tested) | |
self.assertEqual(expected, tested, msg=msg) | |
def test_bi_proj(self): | |
thres_dict = { | |
'Dupont': {'Pierre': 6, 'Paul': 1, 'Jacques': 3}, | |
'Martin': {'Paul': 4, 'Jacques': 7}, | |
'Lopez': {'Pierre': 4, 'Jacques': 9} | |
} | |
dupont_martin = (2*1*4)/float(5*(5-1)) + (2*3*7)/float(19*(19-1)) | |
dupont_lopez = (2*6*4)/float(10*(10-1)) + (2*3*9)/float(19*(19-1)) | |
martin_lopez = (2*7*9)/float(19*(19-1)) | |
expected = { | |
'Dupont': { | |
'Martin': {'weight': dupont_martin}, | |
'Lopez': {'weight': dupont_lopez} | |
}, | |
'Martin': { | |
'Dupont': {'weight': dupont_martin}, | |
'Lopez': {'weight': martin_lopez} | |
}, | |
'Lopez': { | |
'Dupont': {'weight': dupont_lopez}, | |
'Martin': {'weight': martin_lopez} | |
} | |
} | |
tested = nx.to_dict_of_dicts(bi_proj(thres_dict)) | |
msg = '\nExpected:\n{}\nGot:\n{}'.format(expected, tested) | |
self.assertEqual(expected, tested, msg=msg) | |
def mk_raw_dict(names): | |
res = defaultdict(lambda: defaultdict(int)) | |
for n in names: | |
fn = n[0] | |
sn = n[1] | |
res[sn][fn] += 1 | |
return res | |
def mk_sums(raw_dict): | |
N = 0 | |
fn_sum = defaultdict(int) | |
sn_sum = defaultdict(int) | |
for sn, fns in raw_dict.iteritems(): | |
for fn, v in fns.iteritems(): | |
sn_sum[sn] += v | |
fn_sum[fn] += v | |
N += v | |
return fn_sum, sn_sum, N | |
def apply_thres(raw_dict, k): | |
res = defaultdict(lambda: defaultdict(int)) | |
fn_sum, sn_sum, N = mk_sums(raw_dict) | |
for sn, fns in raw_dict.iteritems(): | |
for fn, v in fns.iteritems(): | |
thres = (k * fn_sum[fn] * sn_sum[sn]) / float(N) | |
if v > thres: | |
res[sn][fn] = raw_dict[sn][fn] | |
return res | |
def bi_proj(thres_dict, np_thres=0): | |
G = nx.Graph() | |
fn_sum = defaultdict(int) | |
for fns in thres_dict.values(): | |
for fn, v in fns.iteritems(): | |
fn_sum[fn] += v | |
for fn_pair in combinations(thres_dict.keys(), 2): | |
x, y = fn_pair | |
np = 0 | |
for fn, v in thres_dict[x].iteritems(): | |
if fn in thres_dict[y]: | |
np += (2 * v * thres_dict[y][fn]) / float(fn_sum[fn] * (fn_sum[fn] - 1)) | |
if np > np_thres: | |
G.add_edge(*fn_pair, weight=np) | |
return G | |
def clusterize(G): | |
# partition = community.best_partition(G) | |
dendo = community.generate_dendrogram(G) | |
partition = community.partition_at_level(dendo, len(dendo) - 1) | |
clusters = defaultdict(list) | |
for k, v in partition.iteritems(): | |
clusters[v].append(k) | |
for k, v in clusters.iteritems(): | |
f = open('clusters/{}.txt'.format(k), 'wb') | |
for name in v: | |
f.write(name+'\n') | |
f.close() | |
return clusters | |
def save_graph(G): | |
f = gzip.open('graph.json.gz', 'wb') | |
f.write(json.dumps(nx.to_dict_of_dicts(G))) | |
f.close() | |
def load_graph(path): | |
f = gzip.open('graph.json.gz', 'r') | |
dat = json.load(f) | |
f.close() | |
return nx.from_dict_of_dicts(dat) | |
def mk_graph(k=100, np_thres=0): | |
names = [(x.split(';')[1].split(' ')[0].strip(), x.split(';')[0].strip()) for x in open('sample_names.txt')] | |
raw_dict = mk_raw_dict(names) | |
thres_dict = apply_thres(raw_dict, k) | |
G = bi_proj(thres_dict, np_thres=np_thres) | |
print "number of nodes:", len(G.nodes()) | |
print "number of edges:", len(G.edges()) | |
return G | |
def get_biggest(G): | |
cc = [len(x) for x in list(nx.connected_components(G))] | |
idx_max = cc.index(max(cc)) | |
bigg = list(nx.connected_component_subgraphs(G))[idx_max] | |
return bigg | |
if __name__ == '__main__': | |
G = mk_graph() | |
GG = get_biggest(G) | |
_ = clusterize(GG) | |
print "Done." |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment