Created
April 19, 2015 02:38
-
-
Save 1328/80ff3e97ad0f54d2e43f to your computer and use it in GitHub Desktop.
test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import shelve | |
import time | |
from collections import defaultdict, deque | |
from functools import lru_cache | |
from pprint import pprint | |
class FastaIndex(object): | |
''' | |
a simple fasta (or really any long string) searcher | |
Uses dmb to save index to disk, allowing for later fast lookups | |
works by building an index of slices to the larger text blocks | |
searches work by matching the front n blocks of the searched to the original | |
text | |
This original model uses shelve to allow the index to persist across loading. | |
it is probably not worth doing because it is dog-slow, both in saving and | |
then crossing later, even with lru_cache | |
''' | |
def __init__(self, filename, fragment_size = 2): | |
''' | |
load a previously created index at filename | |
''' | |
self.db = shelve.open(filename, 'c', writeback = True) | |
self.fragment_size = fragment_size | |
@classmethod | |
def build_table(cls, data, fragment_size): | |
''' | |
build a lookup table by taking a fragment_size rolling window over each | |
indexed item, and adding to a dictionary that allows you to lookup the | |
original item. | |
params: | |
data : a dictionary of string:other stuff | |
fragment_size : size of each indexed piece, larger takes longer to | |
index, but can speed up lookups. | |
result: | |
a lookup table that takes a string like 'hello world' and: | |
'hel':'hello world' | |
'ell':'hello world' | |
etc. | |
''' | |
print('building index') | |
start = time.time() | |
index = defaultdict(set) | |
for key in data: | |
for seq in cls.window(key, fragment_size): | |
index[seq].add(key) | |
print('building index took {}'.format(time.time() - start)) | |
return index | |
@classmethod | |
def create(cls, data, filename, fragment_size = 2): | |
''' | |
alternative constructer first builds the index and returns an FastaIndex | |
object, linked to that index | |
indexes the keys of an existing dictionary named data | |
''' | |
index = cls.build_table(data, fragment_size) | |
print('saving index') | |
start = time.time() | |
obj = cls(filename, fragment_size) | |
for key, value in index.items(): | |
obj.db[key] = value | |
obj.db.sync() | |
print('saving index too {}'.format(time.time() - start)) | |
return obj | |
@staticmethod | |
def window(string, n=2): | |
''' sliding window generator, that takes a string and returns windows of n | |
characters, e.g. | |
window('hello', 2) -> 'he', 'el', 'll', 'lo' | |
''' | |
it = iter(string) | |
win = deque((next(it, None) for _ in range(n)), maxlen=n) | |
yield ''.join(win) | |
append = win.append | |
for e in it: | |
append(e) | |
yield ''.join(win) | |
@lru_cache() | |
def get(self, key): | |
''' | |
return a set of potential matches (since we are only checking the first | |
fragment sized piece of key you will get multiple false positives), | |
e.g., with a fragment_size of 3: | |
get('help') -> 'help', 'hello', 'hell', etc. | |
''' | |
key = key[:self.fragment_size] | |
if key not in self.db: | |
return set() | |
return self.db[key] | |
@lru_cache() | |
def __getitem__(self, target): | |
'''return a set of any original string containing target | |
works by using get to pull candidates, then testing to make sure we ''' | |
result = set() | |
for candidate in self.get(target): | |
if target in candidate: | |
result.add(candidate) | |
return result | |
def close(self): | |
self.db.close() | |
class MemIndex(FastaIndex): | |
''' | |
like FastaIndex, except it saves nothing to disk | |
It runs faster, but must rebuild index each time | |
''' | |
def __init__(self, data, fragment_size): | |
self.db = self.build_table(data, fragment_size) | |
self.fragment_size = fragment_size | |
def get(self, key): | |
key = key[:self.fragment_size] | |
if key not in self.db: | |
return set() | |
return self.db[key] | |
def close(self): | |
pass | |
def test(): | |
d = { | |
'abcdef' : 'abcdef', | |
'abcdee' : 'abcdee', | |
'abcder' : 'abcder', | |
} | |
i = FastaIndex.create(d, 'test', 3) | |
print(i['def']) | |
print(i['nop']) | |
print(i['abcd']) | |
i.close() | |
if __name__ == '__main__': | |
test() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment