Skip to content

Instantly share code, notes, and snippets.

@mynameisfiber
Last active August 29, 2015 14:10
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mynameisfiber/1e478d40ce75d867dd42 to your computer and use it in GitHub Desktop.
Save mynameisfiber/1e478d40ce75d867dd42 to your computer and use it in GitHub Desktop.
MultigramSearch looks up instances of ngrams within a target text
#!/usr/bin/env python2.7
"""
>>> import multigram_search
>>> mgs = multigram_search.MultigramSearch([("a", "b", "c"), ("o", "c", "z"),
('z','y')])
>>> list(mgs.intersection("hello world you a b c foo".split(" ")))
[['a', 'b', 'c']]
NOTE: I use a counting bloom filter right now because I already had one
implemented... this would work fine with a normal bloom filter (and use less
memory!).
"""
from fuggetaboutit import CountingBloomFilter
from itertools import ifilter
class MultigramSearch(object):
def __init__( self, ngrams, delimiter='####', stop='^^^^',
error=0.0001, error_tightening_ratio=0.5):
self.blooms = []
self.error = error
self.error_tightening_ratio = error_tightening_ratio
self.min_ngram = min(len(d) for d in ngrams) or 1
self.max_ngram = max(len(d) for d in ngrams)
self.delimiter = delimiter
self.STOP = stop
self._build_structure(ngrams)
def _build_structure(self, ngrams):
delimiter = self.delimiter
STOP = self.STOP
for i, n in enumerate(xrange(self.min_ngram, self.max_ngram+1)):
num_items = sum(1 for x in ngrams if len(x) >= n)
# we tighten the error so that the compounded error converges to
# the desired error
cur_error = self.error * (self.error_tightening_ratio ** i)
bloom = CountingBloomFilter(num_items, error=cur_error)
for item in ifilter(None, ngrams):
if len(item) >= n:
bloom.add(delimiter.join(item[:n]))
elif len(item) + 1 == n:
bloom.add(delimiter.join(item) + STOP)
self.blooms.append(bloom)
def intersection(self, text):
i = 0
offset = self.min_ngram
L = len(text) - offset
delimiter = self.delimiter
while i <= L:
for N, bloom in enumerate(self.blooms):
test = delimiter.join(text[i:i+N+offset])
if test not in bloom:
if N > 0:
new_test = delimiter.join(text[i:i+N+offset-1])
if (new_test + self.STOP) in bloom:
yield text[i:i+N+offset-1]
break
i += N + 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment