Skip to content

Instantly share code, notes, and snippets.

@HarryR
Created June 16, 2013 19:56
Show Gist options
  • Save HarryR/5793196 to your computer and use it in GitHub Desktop.
Save HarryR/5793196 to your computer and use it in GitHub Desktop.
HyperDex text classification engine, like bayesonredis.py - but 'web 3.0 scale' and 'cloud synergy ready' or whatever the buzzwords are today.
#!/usr/bin/env python
import math, os.path, re, json, hyperclient
from os import urandom
from base64 import b32encode
from hashlib import md5
from pprint import pprint
__all__ = 'HyperBayes'
class Entry(object):
__slots__ = ('id', 'word', 'tag', 'word_count', 'word_score_sum', 'doc_count',
'doc_score_sum', 'doc_word_count')
def __init__(self, word, tag):
self.word = word.encode('utf-8')
self.tag = tag.encode('utf-8')
self.word_count = 0 # Total number of words
self.word_score_sum = 0 # Sum of all word scores
self.doc_count = 0 # Total number of documents
self.doc_score_sum = 0 # Sum of all words within the document
self.doc_word_count = 0 # Total number of words in all documents
hasher = md5()
hasher.update(word)
hasher.update(tag)
self.id = b32encode(hasher.digest()[:10])
def get_add_data(self):
return {
'word_count': self.word_count,
'word_score_sum': self.word_score_sum,
'doc_count': self.doc_count,
'doc_score_sum': self.doc_score_sum,
'doc_word_count': self.doc_word_count
}
def get_put_data(self):
data = self.get_add_data()
data.update({'word': self.word,
'tag': self.tag})
return data
def add(self, word_score, doc_score_sum, doc_word_count):
self.word_count += 1
self.doc_count += 1
self.word_score_sum += word_score
self.doc_score_sum += doc_score_sum
self.doc_word_count += doc_word_count
class HyperBayes(object):
ONE_OR_TWO_WORDS_RE = re.compile(r"\b[^\s]{1,2}\b", re.IGNORECASE)
NON_ALPHANUMERIC_AND_NON_DOT_RE = re.compile(r"[^\w\.]", re.IGNORECASE)
def __init__(self, client):
self._client = client
self._buffer = {}
def entry(self, word, tag):
key = '%s|%s' % (word, tag)
if key not in self._buffer:
self._buffer[key] = Entry(word, tag)
return self._buffer[key]
def _calculate_weight(self, entry_id):
while True:
entry = self._client.get('tags', entry_id)
if entry is None:
return False
weight = math.log((entry['doc_score_sum'] / entry['doc_count']) + 1) / math.log((entry['word_score_sum'] / entry['word_count']) + 1)
weight = int(weight * 1000000)
conditions = {'doc_score_sum': entry['doc_score_sum'],
'doc_count': entry['doc_count'],
'word_score_sum': entry['word_score_sum'],
'word_count': entry['word_count']}
result = self._client.cond_put('tags', entry_id, conditions, {'weight': weight})
print ".",
if result:
return True
def commit(self):
if len(self._buffer) == 0:
return False
for entry in self._buffer.values():
try:
self._client.atomic_add('tags', entry.id, entry.get_add_data())
print '+',
except hyperclient.HyperClientException, ex:
if ex.symbol() == 'HYPERCLIENT_NOTFOUND':
self._client.put('tags', entry.id, entry.get_put_data())
print 'p',
else:
# TODO: remove keys which have already been comitted?
raise ex
self._calculate_weight(entry.id)
print len(self._buffer)
self._buffer = {}
return True
def train(self, tags, text, train=True):
if isinstance(tags, basestring):
tags = [tags]
tags = [tag.lower() for tag in tags]
doc_score_sum, words = self.count_occurance(text)
for word, word_score in words:
for tag in tags:
entry = self.entry(word, tag)
if train:
entry.add(word_score, doc_score_sum, len(words))
else:
entry.add(0 - word_score, 0 - doc_score_sum, 0 - len(words))
def untrain(self, category, text):
self.train(category, text, False)
def score(self, text, max_results=5):
words = set(self.split_words(text))
results = {}
for word in words:
predicate = {'word': word}
found = self._client.sorted_search('tags', predicate, 'weight', max_results * 2, 'min')
for result in found:
tag = result['tag']
weight = result['weight']
results[tag] = results.get(tag, 0) + weight
results = sorted(results.items(), key=lambda x: x[1])
results.reverse()
return results[:max_results]
def split_words(self, text):
if not isinstance(text, basestring):
raise Exception("input must be instance of String")
text = text.lower()
separated_by_non_alphanumerics = self.NON_ALPHANUMERIC_AND_NON_DOT_RE.sub(' ', text)
without_one_or_two_words = self.ONE_OR_TWO_WORDS_RE.sub('', separated_by_non_alphanumerics)
without_dots = without_one_or_two_words.replace(".", "")
return without_dots.split()
def count_occurance(self, text):
text_chunks = self.split_words(text)
frequencies = {}
for word in text_chunks:
frequencies[word] = frequencies.get(word, 0) + 1
# Weights the words by how frequently they appear in a document
# More frequently occuring words will be scored higher than less frequent ones
weighted = {}
total = 0
for word, count in frequencies.items():
weighted[word] = int(round(math.log(len(text_chunks)) / math.log(count + 1)))
total += weighted[word]
return total, sorted(weighted.items(), key=lambda x: x[1])
def main():
client = hyperclient.Client('10.0.3.23', 10501)
bayes = HyperBayes(client)
"""
fh = open('data.json', 'r')
i = 0
while True:
i += 1
line = fh.readline()
if line is None:
break
try:
row = json.loads(line)
except ValueError:
break
bayes.train(row['tags'], row['text'])
if i % 100 == 0:
bayes.commit()
print "COMMIT!"
bayes.commit()
"""
print bayes.score("""hemp shampoo""")
print bayes.score("""vegan vitamins""")
print bayes.score("""teeth""")
if __name__ == '__main__':
main()
space tags
key id
attributes
string word,
string tag,
int weight,
int word_count,
int word_score_sum,
int doc_count,
int doc_score_sum,
int doc_word_count
subspace word, weight
subspace tag, weight
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment