Skip to content

Instantly share code, notes, and snippets.

@1328
Created April 19, 2015 02:38
Show Gist options
  • Save 1328/80ff3e97ad0f54d2e43f to your computer and use it in GitHub Desktop.
Save 1328/80ff3e97ad0f54d2e43f to your computer and use it in GitHub Desktop.
test
import shelve
import time
from collections import defaultdict, deque
from functools import lru_cache
from pprint import pprint
class FastaIndex(object):
'''
a simple fasta (or really any long string) searcher
Uses dmb to save index to disk, allowing for later fast lookups
works by building an index of slices to the larger text blocks
searches work by matching the front n blocks of the searched to the original
text
This original model uses shelve to allow the index to persist across loading.
it is probably not worth doing because it is dog-slow, both in saving and
then crossing later, even with lru_cache
'''
def __init__(self, filename, fragment_size = 2):
'''
load a previously created index at filename
'''
self.db = shelve.open(filename, 'c', writeback = True)
self.fragment_size = fragment_size
@classmethod
def build_table(cls, data, fragment_size):
'''
build a lookup table by taking a fragment_size rolling window over each
indexed item, and adding to a dictionary that allows you to lookup the
original item.
params:
data : a dictionary of string:other stuff
fragment_size : size of each indexed piece, larger takes longer to
index, but can speed up lookups.
result:
a lookup table that takes a string like 'hello world' and:
'hel':'hello world'
'ell':'hello world'
etc.
'''
print('building index')
start = time.time()
index = defaultdict(set)
for key in data:
for seq in cls.window(key, fragment_size):
index[seq].add(key)
print('building index took {}'.format(time.time() - start))
return index
@classmethod
def create(cls, data, filename, fragment_size = 2):
'''
alternative constructer first builds the index and returns an FastaIndex
object, linked to that index
indexes the keys of an existing dictionary named data
'''
index = cls.build_table(data, fragment_size)
print('saving index')
start = time.time()
obj = cls(filename, fragment_size)
for key, value in index.items():
obj.db[key] = value
obj.db.sync()
print('saving index too {}'.format(time.time() - start))
return obj
@staticmethod
def window(string, n=2):
''' sliding window generator, that takes a string and returns windows of n
characters, e.g.
window('hello', 2) -> 'he', 'el', 'll', 'lo'
'''
it = iter(string)
win = deque((next(it, None) for _ in range(n)), maxlen=n)
yield ''.join(win)
append = win.append
for e in it:
append(e)
yield ''.join(win)
@lru_cache()
def get(self, key):
'''
return a set of potential matches (since we are only checking the first
fragment sized piece of key you will get multiple false positives),
e.g., with a fragment_size of 3:
get('help') -> 'help', 'hello', 'hell', etc.
'''
key = key[:self.fragment_size]
if key not in self.db:
return set()
return self.db[key]
@lru_cache()
def __getitem__(self, target):
'''return a set of any original string containing target
works by using get to pull candidates, then testing to make sure we '''
result = set()
for candidate in self.get(target):
if target in candidate:
result.add(candidate)
return result
def close(self):
self.db.close()
class MemIndex(FastaIndex):
'''
like FastaIndex, except it saves nothing to disk
It runs faster, but must rebuild index each time
'''
def __init__(self, data, fragment_size):
self.db = self.build_table(data, fragment_size)
self.fragment_size = fragment_size
def get(self, key):
key = key[:self.fragment_size]
if key not in self.db:
return set()
return self.db[key]
def close(self):
pass
def test():
d = {
'abcdef' : 'abcdef',
'abcdee' : 'abcdee',
'abcder' : 'abcder',
}
i = FastaIndex.create(d, 'test', 3)
print(i['def'])
print(i['nop'])
print(i['abcd'])
i.close()
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment