Skip to content

Instantly share code, notes, and snippets.

@moonwatcher
Last active August 29, 2015 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save moonwatcher/6672a543dacfaa56e9ee to your computer and use it in GitHub Desktop.
Save moonwatcher/6672a543dacfaa56e9ee to your computer and use it in GitHub Desktop.
Decision Tree Name / Country classifier
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Lior Galanti
# email: lior.galanti@gmail.com
# Name / Country classifier
# Uses Decision Tree classifier to predict country from person name
# Decision Tree implementation: http://scikit-learn.org/stable/
import os
import re
import sys
import json
import copy
import math
import random
import logging
from datetime import datetime
from datetime import timedelta
from unidecode import unidecode
from StringIO import StringIO
from argparse import ArgumentParser
log_levels = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL
}
expression = {
'whitespace':re.compile(ur'\s+',re.IGNORECASE|re.UNICODE),
'apostrophe':re.compile(ur'\'',re.IGNORECASE|re.UNICODE),
'space':ur' ',
'empty':ur'',
}
config = {
'exhaustive':{
'name':'exhaustive',
'training threshold':150,
'testing threshold':50,
'drop':[],
'cluster':[]
},
'clustered':{
'name':'clustered',
'training threshold':180,
'testing threshold':50,
'drop':[],
'cluster':[
{
'name':'Arabian',
'member':[
'Afghanistan',
'Bahrain',
'Egypt',
'Iran',
'Iraq',
'Jordan',
'Lebanon',
'Saudi Arabia',
'Syria',
'Tunisia',
'United Arab Emirates',
'Mauritania',
]
},
{
'name':'United Kingdom',
'member':[
'United Kingdom',
'England',
'Ireland',
'Scotland',
'Wales',
]
},
{
'name':'Russian',
'member':[
'Russia',
'Ukraine',
'Belarus',
]
},
{
'name':'India',
'member':[
'India',
'Sri Lankan',
]
},
{
'name':'Spanish',
'member':[
'Mexico',
'Spain',
'Argentina',
]
},
{
'name':'German',
'member':[
'Germany',
'Austria',
]
},
{
'name':'Australia',
'member':[
'Australia',
'New Zealand',
]
},
]
},
'colonial':{
'name':'colonial',
'training threshold':180,
'testing threshold':50,
'drop':[
'United States of America',
'United Kingdom',
'Australia',
],
'cluster':[
{
'name':'Arabian',
'member':[
'Afghanistan',
'Bahrain',
'Egypt',
'Iran',
'Iraq',
'Jordan',
'Lebanon',
'Saudi Arabia',
'Syria',
'Tunisia',
'United Arab Emirates',
'Mauritania',
]
},
{
'name':'United Kingdom',
'member':[
'United Kingdom',
'England',
'Ireland',
'Scotland',
'Wales',
]
},
{
'name':'Russian',
'member':[
'Russia',
'Ukraine',
'Belarus',
]
},
{
'name':'India',
'member':[
'India',
'Sri Lankan',
]
},
{
'name':'Spanish',
'member':[
'Mexico',
'Spain',
'Argentina',
]
},
{
'name':'German',
'member':[
'Germany',
'Austria',
]
},
{
'name':'Australia',
'member':[
'Australia',
'New Zealand',
]
},
]
},
'narrow':{
'name':'narrow',
'training threshold':120,
'testing threshold':40,
'drop':[
'United States of America',
'United Kingdom',
'Australia',
'Arabian',
'India',
'Belgium',
'Netherlands',
'Denmark',
'Sweden',
'Norway',
'Argentina',
'Brazil',
'Belgium',
],
'cluster':[
{
'name':'Arabian',
'member':[
'Afghanistan',
'Bahrain',
'Egypt',
'Iran',
'Iraq',
'Jordan',
'Lebanon',
'Saudi Arabia',
'Syria',
'Tunisia',
'United Arab Emirates',
'Mauritania',
]
},
{
'name':'United Kingdom',
'member':[
'United Kingdom',
'England',
'Ireland',
'Scotland',
'Wales',
]
},
{
'name':'Russian',
'member':[
'Russia',
'Ukraine',
'Belarus',
]
},
{
'name':'India',
'member':[
'India',
'Sri Lankan',
]
},
{
'name':'Spanish',
'member':[
'Mexico',
'Spain',
'Argentina',
]
},
{
'name':'German',
'member':[
'Germany',
'Austria',
]
},
{
'name':'Australia',
'member':[
'Australia',
'New Zealand',
]
},
]
}
}
def reduce(name):
value = name
value = value.lower()
value = unidecode(value)
value = expression['whitespace'].sub(expression['space'], value).strip()
return value
def simplify(name):
value = name
value = value.lower()
value = expression['apostrophe'].sub(expression['empty'], value)
value = expression['whitespace'].sub(expression['space'], value).strip()
return value
def normalize(name):
value = name
value = simplify(value)
value = reduce(value)
return value
def clean(samples):
result = []
aggregate = { 'country':{} }
for record in samples:
record['normal'] = normalize(record['name'])
if record['country'] not in aggregate['country']:
aggregate['country'][record['country']] = { 'count': 0, 'record':{} }
if record['normal'] not in aggregate['country'][record['country']]['record']:
aggregate['country'][record['country']]['count'] += 1
aggregate['country'][record['country']]['record'][record['normal']] = record
elif simplify(record['name']) != record['normal']:
# This means it is a version of the name with accents
aggregate['country'][record['country']]['record'][record['normal']] = record
for country in aggregate['country'].values():
result.extend(country['record'].values())
return result
class Experiment(object):
def __init__(self, dataset, node, mode, configuration):
self.log = logging.getLogger('Experiment')
self.dataset = dataset
self.node = node
self.mode = mode
self.configuration = configuration
self._lookup = None
self._model = None
self._result = None
self._sample = None
@property
def name(self):
return self.node['name']
@property
def vocabulary(self):
return self.sample['vocabulary']
@property
def partition(self):
return self.sample['partition']
@property
def lookup(self):
if self._lookup is None:
self._lookup = {'name':{}, 'index':{}}
for country in self.vocabulary.values():
self.lookup['name'][country['name']] = country['index']
self.lookup['index'][country['index']] = country['name']
return self._lookup
def class_index_by_name(self, name):
result = None
if name in self.lookup['name']:
result = self.lookup['name'][name]
return result
def class_name_by_index(self, index):
result = None
if index in self.lookup['index']:
result = self.lookup['index'][index]
return result
@property
def sample(self):
if self._sample is None:
self._sample = {
'vocabulary':{},
'vocabulary size':0,
'partition':{
self.configuration:{
'feature count':0,
'feature labels':[],
'features':None,
},
'training':{
'feature count':0,
'feature labels':[],
'features':None,
}
},
}
# build the inverse transform map for the clustering instruction
transform = {}
if 'cluster' in self.node:
for cluster in self.node['cluster']:
for member in cluster['member']:
transform[member] = cluster['name']
for p in (self.configuration, 'training'):
self._load_partition(transform, p)
# determine which countries are in the experiment
for country in self._sample['partition']['training']['vocabulary'].values():
if country['name'] in self._sample['partition'][self.configuration]['vocabulary']:
other = self._sample['partition'][self.configuration]['vocabulary'][country['name']]
if country['count'] >= self.node['training threshold'] and \
other['count'] >= self.node['testing threshold']:
self._sample['vocabulary'][country['name']] = {
'name': country['name'],
'count':country['count'] + other['count'],
}
self._sample['vocabulary size'] = len(self._sample['vocabulary'])
self.log.debug(u'Vocabulary size is %s', self._sample['vocabulary size'])
for p in (self.configuration, 'training'):
self._filter_partition(p)
countries = self._sample['vocabulary'].keys()
countries.sort()
for index, country in enumerate(countries):
self._sample['vocabulary'][country]['index'] = index
for p in (self.configuration, 'training'):
self._sample['partition'][p]['vocabulary'][country]['index'] = index
return self._sample
def _filter_partition(self, name):
partition = self._sample['partition'][name]
for c in partition['vocabulary'].keys():
if c not in self._sample['vocabulary']:
del partition['vocabulary'][c]
self.log.debug(u'Dropping country %s from %s', c, name)
partition['vocabulary size'] = len(partition['vocabulary'])
partition['samples'] = [sample for sample in partition['samples'] if sample['country'] in partition['vocabulary'] ]
partition['sample count'] = len(partition['samples'])
self.log.debug(u'Total number of samples in %s is %s', name, partition['sample count'])
for country in partition['vocabulary'].values():
country['correct'] = 0
country['wrong'] = {}
country['portion'] = float(country['count']) / float(partition['sample count'])
for index, sample in enumerate(partition['samples']):
sample['index'] = index
def _load_partition(self, transform, name):
samples = self.dataset.partition[name]['samples']
partition = self._sample['partition'][name]
partition['samples'] = []
partition['vocabulary'] = {}
for s in samples:
sample = self.dataset.sample_by_index(s)
# cluster the sample if necessery
if sample['country'] in transform:
sample['country'] = transform[sample['country']]
# if the country should not be dropped
if not ('drop' in self.node and sample['country'] in self.node['drop']) and \
('include' not in self.node or sample['country'] in self.node['include']):
# add the sample to the sample collection
partition['samples'].append(sample)
# initialize a vocabulary entry the first time we encounter it
if sample['country'] not in partition['vocabulary']:
partition['vocabulary'][sample['country']] = {
'name': sample['country'],
'count':0,
}
# update the vocabulary counters
partition['vocabulary'][sample['country']]['count'] += 1
partition['vocabulary size'] = len(partition['vocabulary'])
@property
def encoded(self):
return json.dumps(self.node, ensure_ascii=False, sort_keys=True, indent=4).encode('utf-8')
def draw_heatmap(self):
# build the correlation matrix
self.log.debug(u'Painting heatmap...')
# initialize an empty matrix
contamination = []
labels = []
for i in range(self.sample['vocabulary size']):
row = []
labels.append(None)
for j in range(self.sample['vocabulary size']):
row.append(0.0)
contamination.append(row)
for country in self.partition[self.configuration]['vocabulary'].values():
index = self.class_index_by_name(country['name'])
row = contamination[index]
labels[index] = country['name']
row[index] = float(country['correct']) / float(country['count'])
for k,v in country['wrong'].iteritems():
i = self.class_index_by_name(k)
row[i] = float(v) / float(country['count'])
# switch to a logarithmic scale
vmax = 0
for i in range(len(contamination)):
for j in range(len(contamination)):
if contamination[i][j] > 0:
contamination[i][j] = math.log(1.0 + contamination[i][j])
vmax = max(vmax, contamination[i][j])
# normalize
for i in range(len(contamination)):
for j in range(len(contamination)):
contamination[i][j] = contamination[i][j] / vmax
h = Heatmap(contamination, labels)
h.save('{}.{}.png'.format(self.name, self.mode))
def run(self, classifier):
start = datetime.now()
self.log.info(u'Running classifier with configuration:\n\tconfiguration %s\n\tmethodology %s\n\tprofile: %s\n\tmode: %s\n\ttraining threshold: %s\n\ttesting threshold: %s\n\tcountries: %s',
self.configuration,
classifier,
self.name,
self.mode,
self.node['training threshold'],
self.node['testing threshold'],
len(self.vocabulary))
if classifier == 'binary': self.run_binary_classifier()
elif classifier == 'nclass': self.run_nclass_classifier()
end = datetime.now()
self.log.info(u'Experiment duration was %s', unicode(end - start))
def run_binary_classifier(self):
from sklearn.feature_extraction.text import CountVectorizer
from sklearn import tree
model = {
'feature lables':None,
'classifier':{},
'partition':{
'training':{
'features':None,
'country':{},
},
self.configuration:{
'features':None,
'country':{},
}
}
}
vectorizer = CountVectorizer(
analyzer='char',
ngram_range=(2, 2),
min_df=1,
lowercase=True)
model['partition']['training']['features'] = vectorizer.fit_transform(self.name_vector('training'))
# we will only use the features produced by the training set vectorizer
model['feature labels'] = list(vectorizer.get_feature_names())
model['feature vocabulary'] = vectorizer.vocabulary_
vectorizer = CountVectorizer(
analyzer='char',
ngram_range=(2, 2),
min_df=1,
lowercase=True,
vocabulary=model['feature vocabulary'])
self.partition[self.configuration]['features'] = vectorizer.fit_transform(self.name_vector(self.configuration))
self.log.info(u'Infered %s features', len(model['feature labels']))
results = {
'average':{
'accuracy':0.0,
'precision':0.0,
'recall':0.0,
'f1score':0.0,
'training':self.partition['training']['sample count'],
'testing':self.partition[self.configuration]['sample count'],
},
'country':[]
}
for country in self.partition[self.configuration]['vocabulary'].values():
model['classifier'][country['name']] = tree.DecisionTreeClassifier()
classifier = model['classifier'][country['name']]
self.log.debug(u'Training a model for classifing %s', country['name'])
classifier.fit(model['partition']['training']['features'].toarray(), self.country_vector('training', country['name']))
self.log.debug(u'Predicting %s set classification for %s', self.configuration, country['name'])
prediction = classifier.predict(self.partition[self.configuration]['features'].toarray())
answer = self.country_vector(self.configuration, country['name'])
# calculate f-measure...
result = {
'country':country['name'],
'accuracy':0.0,
'precision':0.0,
'recall':0.0,
'f1score':0.0,
'training':self.partition['training']['vocabulary'][country['name']]['count'],
'testing':country['count'],
}
positive = 0
true_positive = 0
for i in range(len(prediction)):
if prediction[i]:
positive += 1
if answer[i]:
true_positive += 1
if prediction[i] == answer[i]:
result['accuracy'] += 1
result['accuracy'] = float(result['accuracy']) / float(len(prediction))
result['precision'] = float(true_positive) / float(positive)
result['recall'] = float(true_positive) / float(country['count'])
if result['precision'] + result['recall'] > 0:
result['f1score'] = 2.0 * (result['precision'] * result['recall']) / (result['precision'] + result['recall'])
else:
result['f1score'] = 0.0
results['average']['accuracy'] += result['accuracy']
results['average']['precision'] += result['precision']
results['average']['recall'] += result['recall']
results['average']['f1score'] += result['f1score']
results['country'].append(result)
results['average']['accuracy'] /= len(self.vocabulary)
results['average']['precision'] /= len(self.vocabulary)
results['average']['recall'] /= len(self.vocabulary)
results['average']['f1score'] /= len(self.vocabulary)
for result in sorted(results['country'], key=lambda i: i['f1score'], reverse=True):
self.log.info(
u'Results for %s: %s',
result['country'],
u'training: {}, testing: {}, accuracy: {}, precision: {}, recall: {}, f1-score: {}'.format(
result['training'],
result['testing'],
result['accuracy'],
result['precision'],
result['recall'],
result['f1score'],
))
self.log.info(
u'Run averages: %s',
u'training: {}, testing: {}, accuracy: {}, precision: {}, recall: {}, f1-score: {}'.format(
results['average']['training'],
results['average']['testing'],
results['average']['accuracy'],
results['average']['precision'],
results['average']['recall'],
results['average']['f1score'],
))
def run_nclass_classifier(self):
from sklearn.feature_extraction.text import CountVectorizer
from sklearn import tree
# Calculating features for the training set
# going over 2-grams seems to make memory footprint go very high without much of a benefit
# 4-grams actually decreased preformance and took all night...
train_cv = CountVectorizer(analyzer='char', ngram_range=(2, 2), min_df=1, lowercase=True)
self.partition['training']['features'] = train_cv.fit_transform(self.train_names)
self.partition['training']['feature labels'] = list(train_cv.get_feature_names())
self.partition['training']['feature count'] = len(self.partition['training']['feature labels'])
# calculate features for the test set
test_cv = CountVectorizer(analyzer='char', ngram_range=(2, 2), min_df=1, lowercase=True, vocabulary=train_cv.vocabulary_)
self.partition[self.configuration]['features'] = test_cv.fit_transform(self.test_names)
self.partition[self.configuration]['feature labels'] = list(test_cv.get_feature_names())
self.partition[self.configuration]['feature count'] = len(self.partition[self.configuration]['feature labels'])
# train the model on the training set
model = tree.DecisionTreeClassifier()
self.log.debug(u'Training model...')
model.fit(self.partition['training']['features'].toarray(), self.train_answers)
# predict the answers for the test set
self.log.debug(u'Predicting test set...')
result = list(model.predict(self.partition[self.configuration]['features'].toarray()))
answer = self.test_answers
self.partition[self.configuration]['correct'] = 0
for i in range(len(result)):
sample = self.partition[self.configuration]['samples'][i]
country = self.partition[self.configuration]['vocabulary'][sample['country']]
if result[i] == answer[i]:
# if the result is correct simply count it
country['correct'] += 1
self.partition[self.configuration]['correct'] += 1
else:
# if the result is wrong
wrong = self.class_name_by_index(result[i])
if wrong not in country['wrong']:
country['wrong'][wrong] = 0
country['wrong'][wrong] += 1
self.partition[self.configuration]['accuracy'] = float(self.partition[self.configuration]['correct']) / float(self.partition[self.configuration]['sample count'])
for country in self.partition[self.configuration]['vocabulary'].values():
country['accuracy'] = float(country['correct']) / float(country['count'])
self.draw_heatmap()
# print json.dumps(self.partition[self.configuration]['vocabulary'], ensure_ascii=False, sort_keys=True, indent=4).encode('utf-8')
for country in sorted(self.partition[self.configuration]['vocabulary'].values(), key=lambda i: i['accuracy'], reverse=True):
self.log.info(u'country: %s, training: %s, testing: %s, accuracy: %s',
country['name'],
self.partition['training']['vocabulary'][country['name']]['count'],
country['count'],
country['accuracy']
)
self.log.info(
u'Run accuracy: %s, training: %s, testing: %s',
self.partition[self.configuration]['accuracy'],
self.partition['training']['sample count'],
self.partition[self.configuration]['sample count']
)
def country_vector(self, partition, country):
result = None
if partition in self.partition:
if 'vector' not in self.partition[partition]:
self.partition[partition]['vector'] = {}
if 'country' not in self.partition[partition]['vector']:
self.partition[partition]['vector']['country'] = {}
if country not in self.partition[partition]['vector']['country']:
self.partition[partition]['vector']['country'][country] = \
[ sample['country'] == country for sample in self.partition[partition]['samples'] ]
result = self.partition[partition]['vector']['country'][country]
return result
def name_vector(self, partition):
if self.mode == 'normal': return self.normal_name_vector(partition)
elif self.mode == 'full': return self.full_name_vector(partition)
def full_name_vector(self, partition):
result = None
if partition in self.partition:
if 'vector' not in self.partition[partition]: self.partition[partition]['vector'] = {}
if 'full name' not in self.partition[partition]['vector']:
self.partition[partition]['vector']['full name'] = [ sample['name'] for sample in self.partition[partition]['samples'] ]
result = self.partition[partition]['vector']['full name']
return result
def normal_name_vector(self, partition):
result = None
if partition in self.partition:
if 'vector' not in self.partition[partition]: self.partition[partition]['vector'] = {}
if 'normal name' not in self.partition[partition]['vector']:
self.partition[partition]['vector']['normal name'] = [ sample['normal'] for sample in self.partition[partition]['samples'] ]
result = self.partition[partition]['vector']['normal name']
return result
@property
def train_names(self):
if self.mode == 'normal': return self.train_normal_names
elif self.mode == 'full': return self.train_full_names
@property
def train_normal_names(self):
return [ sample['normal'] for sample in self.partition['training']['samples'] ]
@property
def train_full_names(self):
return [ sample['name'] for sample in self.partition['training']['samples'] ]
@property
def test_names(self):
if self.mode == 'normal': return self.test_normal_names
elif self.mode == 'full': return self.test_full_names
@property
def test_normal_names(self):
return [ n['normal'] for n in self.partition[self.configuration]['samples'] ]
@property
def test_full_names(self):
return [ n['name'] for n in self.partition[self.configuration]['samples'] ]
@property
def train_answers(self):
return [ self.class_index_by_name(sample['country']) for sample in self.partition['training']['samples'] ]
@property
def test_answers(self):
return [ self.class_index_by_name(sample['country']) for sample in self.partition[self.configuration]['samples'] ]
class DataSet(object):
def __init__(self, node=None):
self.log = logging.getLogger('DataSet')
self.node = node
self._lookup = None
@property
def encoded(self):
return json.dumps(self.node, ensure_ascii=False, sort_keys=True, indent=4).encode('utf-8')
@property
def encoded_vocabulary(self):
node = copy.deepcopy(self.node)
del node['samples']
del node['partition']['development']['samples']
del node['partition']['testing']['samples']
del node['partition']['training']['samples']
return json.dumps(node, ensure_ascii=False, sort_keys=True, indent=4).encode('utf-8')
@property
def lookup(self):
if self._lookup is None:
self.log.debug(u'loading sample lookup')
self._lookup = {
'index':{},
'name':{},
'normal':{}
}
for sample in self.node['samples']:
self._lookup['index'][sample['index']] = sample
self._lookup['name'][sample['name']] = sample
self._lookup['normal'][sample['normal']] = sample
return self._lookup
@property
def vocabulary(self):
return self.node['vocabulary']
@property
def partition(self):
return self.node['partition']
def _load_samples(self, samples):
self.node['samples'] = samples
self.node['sample count'] = len(self.node['samples'])
self.node['vocabulary'] = {}
# figure out the distribution of the samples in the entire set
for sample in samples:
# if the country has not been seen before, start a vocabulary record
if sample['country'] not in self.vocabulary:
self.vocabulary[sample['country']] = {
'name':sample['country'],
'count':0,
}
# update the per country sample counter
self.vocabulary[sample['country']]['count'] += 1
self.node['vocabulary size'] = len(self.vocabulary)
# enumerate the sample
for index,sample in enumerate(self.node['samples']):
sample['index'] = index
for country in self.vocabulary.values():
country['portion'] = float(country['count']) / float(self.node['sample count'])
def _load_partition_vocabulary(self, partition):
# figure out the distribution of the samples in the entire set
partition['vocabulary'] = {}
for s in partition['samples']:
sample = self.sample_by_index(s)
# if the country has not been seen before, start a vocabulary record
if sample['country'] not in partition['vocabulary']:
partition['vocabulary'][sample['country']] = {
'name':sample['country'],
'count':0,
}
# update the per country sample counter
partition['vocabulary'][sample['country']]['count'] += 1
partition['vocabulary size'] = len(partition['vocabulary'])
partition['standard deviatio'] = 0
for country in partition['vocabulary'].values():
country['portion'] = float(country['count']) / float(partition['sample count'])
country['bias'] = float(country['portion']) - float(self.vocabulary[country['name']]['portion'])
partition['standard deviatio'] += pow(country['bias'],2)
partition['standard deviatio'] /= (partition['vocabulary size'] - 1)
partition['standard deviatio'] = pow(partition['standard deviatio'],0.5)
def split(self, samples, development, testing):
training = 1.0 - development - testing
# Reset the lookup upon load
self._lookup = None
# allocate an empty node
self.node = {
'partition':{
'development':{
'samples':[],
},
'testing':{
'samples':[],
},
'training':{
'samples':[],
}
}
}
self._load_samples(samples)
# Determine the size, in samples, of each of the 3 sample groups
self.partition['development']['sample count'] = int(round(self.node['sample count'] * development))
self.log.info( u'Development partition set to %s%% of %s which are %s samples.',
development * 100.0,
self.node['sample count'],
self.partition['development']['sample count'])
self.partition['testing']['sample count'] = int(round(self.node['sample count'] * testing))
self.log.info( u'Testing partition set to %s%% of %s which are %s samples.',
testing * 100.0,
self.node['sample count'],
self.partition['testing']['sample count'])
self.partition['training']['sample count'] = \
self.node['sample count'] - \
self.partition['development']['sample count'] - \
self.partition['testing']['sample count']
self.log.info( u'Training partition set to %s%% of %s which are %s samples.',
training * 100.0,
self.node['sample count'],
self.partition['training']['sample count'])
# Split the samples into the 3 groups
# first extract an array fo the sample's indexes
# than shuffle the array
# than samples the required amounts out of that index group
# this should preserve the natural distribution of the samples in each of the groups
shuffled = range(self.node['sample count'])
# shuffle the indexes
random.shuffle(shuffled)
# pick the samples that go in the development set
self.partition['development']['samples'] = \
random.sample(shuffled, self.partition['development']['sample count'])
# remove the development set from the complete set
shuffled = [ i for i in shuffled if i not in self.partition['development']['samples'] ]
# pick a testing set
self.partition['testing']['samples'] = \
random.sample(shuffled, self.partition['testing']['sample count'])
# what ever remains will be used for training
self.partition['training']['samples'] = \
[ i for i in shuffled if i not in self.partition['testing']['samples'] ]
for partition in self.partition.values():
self._load_partition_vocabulary(partition)
def sample_by_name(self, name):
result = None
if name in self.lookup['name']:
result = copy.deepcopy(self.lookup['name'][name])
return result
def sample_by_index(self, index):
result = None
if index in self.lookup['index']:
result = copy.deepcopy(self.lookup['index'][index])
return result
def sample_by_normal(self, normal):
result = None
if normal in self.lookup['normal']:
result = copy.deepcopy(self.lookup['normal'][normal])
return result
def country_vector(self, partition, country):
result = None
if partition in self.node['partition']:
result = []
for sample in self.node['partition'][partition]['samples']:
if sample['country'] == country:
result.append(True)
else:
result.append(False)
return result
def name_vector(self, partition):
result = None
if partition in self.node['partition']:
result = []
for sample in self.node['partition'][partition]['samples']:
result.append(sample['name'])
return result
def normal_name_vector(self, partition):
result = None
if partition in self.node['partition']:
result = []
for sample in self.node['partition'][partition]['samples']:
result.append(sample['normal'])
return result
class Heatmap(object):
def __init__(self, matrix, labels):
import ImageDraw
import ImageFont
import Image
self.scale = {
'color': [
'#00007F', '#000083', '#000087', '#00008B', '#00008F', '#000093', '#000097', '#00009B',
'#00009F', '#0000A3', '#0000A7', '#0000AB', '#0000AF', '#0000B3', '#0000B7', '#0000BB',
'#0000BF', '#0000C3', '#0000C7', '#0000CB', '#0000CF', '#0000D3', '#0000D7', '#0000DB',
'#0000DF', '#0000E3', '#0000E7', '#0000EB', '#0000EF', '#0000F3', '#0000F7', '#0000FB',
'#0000FF', '#0004FF', '#0008FF', '#000CFF', '#0010FF', '#0014FF', '#0018FF', '#001CFF',
'#0020FF', '#0024FF', '#0028FF', '#002CFF', '#0030FF', '#0034FF', '#0038FF', '#003CFF',
'#0040FF', '#0044FF', '#0048FF', '#004CFF', '#0050FF', '#0054FF', '#0058FF', '#005CFF',
'#0060FF', '#0064FF', '#0068FF', '#006CFF', '#0070FF', '#0074FF', '#0078FF', '#007CFF',
'#0080FF', '#0084FF', '#0088FF', '#008CFF', '#0090FF', '#0094FF', '#0098FF', '#009CFF',
'#00A0FF', '#00A4FF', '#00A8FF', '#00ACFF', '#00B0FF', '#00B4FF', '#00B8FF', '#00BCFF',
'#00C0FF', '#00C4FF', '#00C8FF', '#00CCFF', '#00D0FF', '#00D4FF', '#00D8FF', '#00DCFF',
'#00E0FF', '#00E4FF', '#00E8FF', '#00ECFF', '#00F0FF', '#00F4FF', '#00F8FF', '#00FCFF',
'#01FFFD', '#05FFF9', '#09FFF5', '#0DFFF1', '#11FFED', '#15FFE9', '#19FFE5', '#1DFFE1',
'#21FFDD', '#25FFD9', '#29FFD5', '#2DFFD1', '#31FFCD', '#35FFC9', '#39FFC5', '#3DFFC1',
'#41FFBD', '#45FFB9', '#49FFB5', '#4DFFB1', '#51FFAD', '#55FFA9', '#59FFA5', '#5DFFA1',
'#61FF9D', '#65FF99', '#69FF95', '#6DFF91', '#71FF8D', '#75FF89', '#79FF85', '#7DFF81',
'#81FF7D', '#85FF79', '#89FF75', '#8DFF71', '#91FF6D', '#95FF69', '#99FF65', '#9DFF61',
'#A1FF5D', '#A5FF59', '#A9FF55', '#ADFF51', '#B1FF4D', '#B5FF49', '#B9FF45', '#BDFF41',
'#C1FF3D', '#C5FF39', '#C9FF35', '#CDFF31', '#D1FF2D', '#D5FF29', '#D9FF25', '#DDFF21',
'#E1FF1D', '#E5FF19', '#E9FF15', '#EDFF11', '#F1FF0D', '#F5FF09', '#F9FF05', '#FDFF01',
'#FFFC00', '#FFF800', '#FFF400', '#FFF000', '#FFEC00', '#FFE800', '#FFE400', '#FFE000',
'#FFDC00', '#FFD800', '#FFD400', '#FFD000', '#FFCC00', '#FFC800', '#FFC400', '#FFC000',
'#FFBC00', '#FFB800', '#FFB400', '#FFB000', '#FFAC00', '#FFA800', '#FFA400', '#FFA000',
'#FF9C00', '#FF9800', '#FF9400', '#FF9000', '#FF8C00', '#FF8800', '#FF8400', '#FF8000',
'#FF7C00', '#FF7800', '#FF7400', '#FF7000', '#FF6C00', '#FF6800', '#FF6400', '#FF6000',
'#FF5C00', '#FF5800', '#FF5400', '#FF5000', '#FF4C00', '#FF4800', '#FF4400', '#FF4000',
'#FF3C00', '#FF3800', '#FF3400', '#FF3000', '#FF2C00', '#FF2800', '#FF2400', '#FF2000',
'#FF1C00', '#FF1800', '#FF1400', '#FF1000', '#FF0C00', '#FF0800', '#FF0400', '#FF0000',
'#FB0000', '#F70000', '#F30000', '#EF0000', '#EB0000', '#E70000', '#E30000', '#DF0000',
'#DB0000', '#D70000', '#D30000', '#CF0000', '#CB0000', '#C70000', '#C30000', '#BF0000',
'#BB0000', '#B70000', '#B30000', '#AF0000', '#AB0000', '#A70000', '#A30000', '#9F0000',
'#9B0000', '#970000', '#930000', '#8F0000', '#8B0000', '#870000', '#830000', '#7F0000',
]
}
self.matrix = matrix
self.origin = (0,0)
self.cell_size = 15
self.size = self.cell_size * len(matrix)
self.image = Image.new('RGB', (self.size + 160, self.size), '#222222')
self.font = ImageFont.truetype('/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans.ttf', 10)
self.title = ImageDraw.Draw(self.image)
self.draw = ImageDraw.Draw(self.image)
for line_idx, line in enumerate(self.matrix):
self.title.text((self.size + 10 , self.cell_size * line_idx), labels[line_idx], font=self.font, fill='#F0F0F0')
for cell_idx, cell in enumerate(line):
heat = self.fraction_to_color(cell)
block = (
self.origin[0] + (cell_idx * self.cell_size),
self.origin[1] + (line_idx * self.cell_size),
self.origin[0] + ((cell_idx + 1) * self.cell_size - 1),
self.origin[1] + ((line_idx + 1) * self.cell_size - 1)
)
self.draw.rectangle(block,fill=heat)
def fraction_to_color(self, value):
result = None
if value is None: result = '#DDDDDD'
else:
if value > 1.0: result = '#FFFFFF'
elif value < 0.0: result = '#000000'
else:
position = int(round(float((len(self.scale['color']) - 1)) * value))
result = self.scale['color'][position]
return result
def save(self, path):
self.image.save(path)
class Queue(object):
def __init__(self, env):
self.log = logging.getLogger('Queue')
self.env = env
def load_json(self):
result = None
try:
stream = StringIO(sys.stdin.read())
except IOError as ioerr:
self.log.warning(u'Failed to load dataset %s')
self.log.debug(ioerr)
else:
try:
result = json.load(stream)
except ValueError, e:
self.log.warning(u'Failed to decode JSON document %s')
self.log.debug(u'Exception raised %s', unicode(e))
return result
def clean(self):
samples = clean(self.load_json())
print json.dumps(samples, ensure_ascii=False, sort_keys=True, indent=4).encode('utf-8')
def run(self):
experiment = Experiment(
DataSet(self.load_json()),
config[self.env['profile']],
self.env['mode'],
self.env['configuration']
)
experiment.run(self.env['classifier'])
def split(self):
dataset = DataSet()
dataset.split(self.load_json(), self.env['development'], self.env['development'])
print dataset.encoded
def vocabulary(self):
dataset = DataSet(self.load_json())
print dataset.encoded_vocabulary
def decode_cli():
p = ArgumentParser()
p.add_argument('--version', action='version', version='%(prog)s 0.2')
p.add_argument(
'-v',
'--verbosity',
dest='verbosity',
metavar='LEVEL',
choices=log_levels.keys(),
default='info',
help='logging verbosity level [default: %(default)s]'
)
s = p.add_subparsers(dest='action')
c = s.add_parser(
'clean',
help='Clean a dataset',
description='Takes a JSON encoded list of sample dictionaries in the form of: [ { \"name\": \"John Doe\", "country": "England" }] on standard input. \
Appends every sample the \"normal\" entry which is the canonical version of the name with accents stripped out. It also makes sure the canonical values are unique per class. \
Returns a similar JSON encoded list on standard output.',
)
c = s.add_parser(
'split',
help='Create a uniformally partitioned dataset',
description='Takes a JSON encoded list of sample dictionaries in the form of: [ { \"name\": \"John Doe\", "country": "England" }] on standard input. \
Splits the samples uniformally into 3 partitions: \"development\", \"testing\" and \"training\". \
The --development and --testing arguments take values on the [0,1] range and used to detemine the fraction allocated to those partitions. \
The remaining samples are allocated to the \"traning\" partition. \
Returns JSON encoded dataset.'
)
c.add_argument('-d', '--development', dest='development', type=float, default=0.1,
help='portion of samples to use for development [%(default)s]')
c.add_argument('-t', '--testing', dest='testing', type=float, default=0.1,
help='portion of each class to use for testing [%(default)s]')
c = s.add_parser(
'vocabulary',
help='Display the vocabulary of a dataset',
description='Display the vocabulary of a dataset.',
)
c = s.add_parser(
'run',
help='Preform the experiment and report results',
description='Preform the experiment and report results. \
Two classifiers are implemented: an n-class classifier picking a country for the name out of a collection of countries \
and a binary classifier that trains a model for each country to determine if a name belongs to a country or not. \
For the binary classifier we can report precision, recall, f1-score and accuracy.',
)
c.add_argument('-m', '--mode', dest='mode', choices=('normal', 'full'), default='full', help='full unicode names of stripped down ascii [ %(default)s ]')
c.add_argument('-p', '--profile', dest='profile', choices=config.keys(), default='one', help='experiment profile to run [ %(default)s ]')
c.add_argument('-c', '--configuration', dest='configuration', choices=('development', 'testing'), default='development', help='experiment configuration to run [ %(default)s ]')
c.add_argument('-C', '--classifier', dest='classifier', choices=('nclass', 'binary'), default='binary', help='n-class or binary classifier [ %(default)s ]')
o = {}
for k,v in vars(p.parse_args()).iteritems():
o[k] = v
return o
def main():
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
env = decode_cli()
logging.getLogger().setLevel(log_levels[env['verbosity']])
queue = Queue(env)
if env['action'] == 'clean':
queue.clean()
if env['action'] == 'split':
queue.split()
if env['action'] == 'vocabulary':
queue.vocabulary()
if env['action'] == 'run':
queue.run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment