Skip to content

Instantly share code, notes, and snippets.

@BenLangmead
Last active December 28, 2015 03:39
Show Gist options
  • Save BenLangmead/7437031 to your computer and use it in GitHub Desktop.
Save BenLangmead/7437031 to your computer and use it in GitHub Desktop.
{
"metadata": {
"name": ""
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "code",
"collapsed": false,
"input": [
"def suffixArray(s):\n",
" ''' Given T return suffix array SA(T). Uses \"sorted\"\n",
" function for simplicity, which is probably very slow. '''\n",
" satups = sorted([(s[i:], i) for i in xrange(0, len(s))])\n",
" return map(lambda x: x[1], satups) # extract, return just offsets\n",
"\n",
"def bwtFromSa(t, sa=None):\n",
" ''' Given T, returns BWT(T) by way of the suffix array. '''\n",
" bw = []\n",
" dollarRow = None\n",
" if sa is None:\n",
" sa = suffixArray(t)\n",
" for si in sa:\n",
" if si == 0:\n",
" dollarRow = len(bw)\n",
" bw.append('$')\n",
" else:\n",
" bw.append(t[si-1])\n",
" return (''.join(bw), dollarRow) # return string-ized version of list bw"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 1
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"class FmCheckpoints(object):\n",
" ''' Manages rank checkpoints and handles rank queries, which are\n",
" O(1) time, with the checkpoints taking O(m) space, where m is\n",
" length of text. '''\n",
" \n",
" def __init__(self, bw, cpIval=4):\n",
" ''' Scan BWT, creating periodic checkpoints as we go '''\n",
" self.cps = {} # checkpoints\n",
" self.cpIval = cpIval # spacing between checkpoints\n",
" tally = {} # tally so far\n",
" # Create an entry in tally dictionary and checkpoint map for\n",
" # each distinct character in text\n",
" for c in bw:\n",
" if c not in tally:\n",
" tally[c] = 0\n",
" self.cps[c] = []\n",
" # Now build the checkpoints\n",
" for i in xrange(0, len(bw)):\n",
" tally[bw[i]] += 1 # up to *and including*\n",
" if (i % cpIval) == 0:\n",
" for c in tally.iterkeys():\n",
" self.cps[c].append(tally[c])\n",
" \n",
" def rank(self, bw, c, row):\n",
" ''' Return # c's there are in bw up to and including row '''\n",
" if row < 0 or c not in self.cps:\n",
" return 0\n",
" i, nocc = row, 0\n",
" # Always walk to left (up) when calculating rank\n",
" while (i % self.cpIval) != 0:\n",
" if bw[i] == c:\n",
" nocc += 1\n",
" i -= 1\n",
" return self.cps[c][i // self.cpIval] + nocc"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 2
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"st = 'teststring'\n",
"# 0123456789\n",
"cps = FmCheckpoints(st)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 3
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# should get back list of integers, where elt i gives\n",
"# # times 't' appears up to and including offset i\n",
"[ cps.rank(st, 't', i) for i in xrange(10) ]"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 4,
"text": [
"[1, 1, 1, 2, 2, 3, 3, 3, 3, 3]"
]
}
],
"prompt_number": 4
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# likewise for 'g'\n",
"[ cps.rank(st, 'g', i) for i in xrange(10) ]"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 5,
"text": [
"[0, 0, 0, 0, 0, 0, 0, 0, 0, 1]"
]
}
],
"prompt_number": 5
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"class FmIndex():\n",
" ''' O(m) size FM Index, where checkpoints and suffix array samples are\n",
" spaced O(1) elements apart. Queries like count() and range() are\n",
" O(n) where n is the length of the query. Finding all k\n",
" occurrences of a length-n query string takes O(n + k) time.\n",
" \n",
" Note: The spacings in the suffix array sample and checkpoints can\n",
" be chosen differently to achieve different bounds. '''\n",
" \n",
" @staticmethod\n",
" def downsampleSuffixArray(sa, n=4):\n",
" ''' Take only the suffix-array entries for every nth suffix. Keep\n",
" suffixes at offsets 0, n, 2n, etc with respect to the text.\n",
" Return map from the rows to their suffix-array values. '''\n",
" ssa = {}\n",
" for i in xrange(0, len(sa)):\n",
" # We could use i % n instead of sa[i] % n, but we lose the\n",
" # constant-time guarantee for resolutions\n",
" if sa[i] % n == 0:\n",
" ssa[i] = sa[i]\n",
" return ssa\n",
" \n",
" def __init__(self, t, cpIval=4, ssaIval=4):\n",
" if t[-1] != '$':\n",
" t += '$' # add dollar if not there already\n",
" # Get BWT string and offset of $ within it\n",
" sa = suffixArray(t)\n",
" self.bwt, self.dollarRow = bwtFromSa(t, sa)\n",
" # Get downsampled suffix array, taking every 1 out of 'ssaIval'\n",
" # elements w/r/t T\n",
" self.ssa = self.downsampleSuffixArray(sa, ssaIval)\n",
" self.slen = len(self.bwt)\n",
" # Make rank checkpoints\n",
" self.cps = FmCheckpoints(self.bwt, cpIval)\n",
" # Calculate # occurrences of each character\n",
" tots = dict()\n",
" for c in self.bwt:\n",
" tots[c] = tots.get(c, 0) + 1\n",
" # Calculate concise representation of first column\n",
" self.first = {}\n",
" totc = 0\n",
" for c, count in sorted(tots.iteritems()):\n",
" self.first[c] = totc\n",
" totc += count\n",
" \n",
" def count(self, c):\n",
" ''' Return number of occurrences of characters < c '''\n",
" if c not in self.first:\n",
" # (Unusual) case where c does not occur in text\n",
" for cc in sorted(self.first.iterkeys()):\n",
" if c < cc: return self.first[cc]\n",
" return self.first[cc]\n",
" else:\n",
" return self.first[c]\n",
" \n",
" def range(self, p):\n",
" ''' Return range of BWM rows having p as a prefix '''\n",
" l, r = 0, self.slen - 1 # closed (inclusive) interval\n",
" for i in xrange(len(p)-1, -1, -1): # from right to left\n",
" l = self.cps.rank(self.bwt, p[i], l-1) + self.count(p[i])\n",
" r = self.cps.rank(self.bwt, p[i], r) + self.count(p[i]) - 1\n",
" if r < l:\n",
" break\n",
" return l, r+1\n",
" \n",
" def resolve(self, row):\n",
" ''' Given BWM row, return its offset w/r/t T '''\n",
" def stepLeft(row):\n",
" ''' Step left according to character in given BWT row '''\n",
" c = self.bwt[row]\n",
" return self.cps.rank(self.bwt, c, row-1) + self.count(c)\n",
" nsteps = 0\n",
" while row not in self.ssa:\n",
" row = stepLeft(row)\n",
" nsteps += 1\n",
" return self.ssa[row] + nsteps\n",
" \n",
" def hasSubstring(self, p):\n",
" ''' Return true if and only if p is substring of indexed text '''\n",
" l, r = self.range(p)\n",
" return r > l\n",
" \n",
" def hasSuffix(self, p):\n",
" ''' Return true if and only if p is suffix of indexed text '''\n",
" l, r = self.range(p)\n",
" off = self.resolve(l)\n",
" return r > l and off + len(p) == self.slen-1\n",
" \n",
" def occurrences(self, p):\n",
" ''' Return offsets for all occurrences of p, in no particular order '''\n",
" l, r = self.range(p)\n",
" return [ self.resolve(x) for x in xrange(l, r) ]"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 6
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"fm = FmIndex('abaaba')"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 7
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"fm.hasSubstring('aaba')"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 8,
"text": [
"True"
]
}
],
"prompt_number": 8
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"fm.hasSubstring('aabb')"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 9,
"text": [
"False"
]
}
],
"prompt_number": 9
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"fm.range('a')"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 10,
"text": [
"(1, 5)"
]
}
],
"prompt_number": 10
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"fm.range('baaba')"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 11,
"text": [
"(6, 7)"
]
}
],
"prompt_number": 11
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"p, t = \"CAT\", \"TTGTGTGCATGTTGTTTCATCATTTAGAGATACATTGCGCTGCATCATGGTCA\"\n",
"# 01234567890123456789012345678901234567890123456789012\n",
"# Occurrences: * * * * * *\n",
"fm = FmIndex(t)\n",
"matches = sorted(fm.occurrences(p))\n",
"matches == [7, 17, 20, 32, 42, 45]"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 12,
"text": [
"True"
]
}
],
"prompt_number": 12
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 12
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment