Skip to content

Instantly share code, notes, and snippets.

@saaj
Last active August 29, 2015 14:02
Show Gist options
  • Save saaj/625fe3753a20b46d6a8f to your computer and use it in GitHub Desktop.
Save saaj/625fe3753a20b46d6a8f to your computer and use it in GitHub Desktop.
Debugging Python SQLite FTS4 tokenizers with sqlite-fts-python 0.1
--- original/sqlite_tokenizer.py
+++ patched/sqlite_tokenizer.py
@@ -8,6 +8,10 @@
import ctypes
from ctypes import POINTER, CFUNCTYPE
import struct
+
+
+SQLITE_OK = 0
+SQLITE_DONE = 101
class sqlite3_tokenizer_module(ctypes.Structure):
@@ -64,11 +68,11 @@
tkn.t = tokenizer
tokenizers[ctypes.addressof(tkn)] = tkn
ppTokenizer[0] = ctypes.pointer(tkn)
- return 0
+ return SQLITE_OK
def xdestroy(pTokenizer):
del(tokenizers[ctypes.addressof(pTokenizer[0])])
- return 0
+ return SQLITE_OK
def xopen(pTokenizer, pInput, nInput, ppCursor):
cur = sqlite3_tokenizer_cursor()
@@ -78,28 +82,33 @@
cur.offset = 0
cursors[ctypes.addressof(cur)] = cur
ppCursor[0] = ctypes.pointer(cur)
- return 0
+ return SQLITE_OK
def xnext(pCursor, ppToken, pnBytes,
piStartOffset, piEndOffset, piPosition):
try:
cur = pCursor[0]
- token = next(cur.tokens).encode('utf-8')
- tokenlen = len(token)
- ppToken[0] = token
- pnBytes[0] = tokenlen
- piStartOffset[0] = cur.offset
- cur.offset += tokenlen
- piEndOffset[0] = cur.offset
+
+ while True:
+ normalized, inputBegin, inputEnd = next(cur.tokens)
+ normalized = normalized.encode('utf-8')
+ if normalized:
+ break
+
+ ppToken[0] = normalized
+ pnBytes[0] = len(normalized)
+ piStartOffset[0] = inputBegin
+ piEndOffset[0] = inputEnd
+ cur.offset = inputEnd
piPosition[0] = cur.pos
cur.pos += 1
except StopIteration:
- return 101
- return 0
+ return SQLITE_DONE
+ return SQLITE_OK
def xclose(pCursor):
del(cursors[ctypes.addressof(pCursor[0])])
- return 0
+ return SQLITE_OK
tokenizer_module = sqlite3_tokenizer_module(
0,
# -*- coding: utf-8 -*-
import unittest
import sqlite3
import re
import sqlitefts.sqlite_tokenizer as fts
class BaseTokenizer(fts.Tokenizer):
_spliter = re.compile(r'\s+|\S+')
_nonws = re.compile(r'\S+')
def _normalize(self, token):
return token
def _tokenize(self, text):
pos = 0
for t in self._spliter.findall(text):
byteLen = len(t.encode('utf-8'))
if self._nonws.match(t):
yield self._normalize(t), pos, pos + byteLen
pos += byteLen
def tokenize(self, text):
return self._tokenize(text)
class DebugTokenizer(BaseTokenizer):
_limit = 16
def _normalize(self, token):
if not self._limit:
raise RuntimeError()
self._limit -= 1
print token, token[0:-1]
return token[0:-1]
class OriginalDebugTokenizer(fts.Tokenizer):
_limit = 16
def tokenize(self, text):
if not self._limit:
raise RuntimeError()
self._limit -= 1
print text, [w[0:-1] for w in text.split(' ')]
return (w[0:-1] for w in text.split(' '))
class TestCase(unittest.TestCase):
def setUp(self):
name = 'test'
conn = sqlite3.connect(':memory:')
fts.register_tokenizer(conn, name, fts.make_tokenizer_module(DebugTokenizer()))
conn.execute('CREATE VIRTUAL TABLE fts USING FTS4(tokenize={})'.format(name))
self.testee = conn
def testZeroLengthToken(self):
result = self.testee.executemany('INSERT INTO fts VALUES(?)', [('Make things I',), (u'Some σ φχικλψ',)])
self.assertEqual(2, result.rowcount)
def testInfiniteRecursion(self):
contents = [('abc def',), ('abc xyz',)]
result = self.testee.executemany('INSERT INTO fts VALUES(?)', contents)
self.assertEqual(2, result.rowcount)
result = self.testee.execute("SELECT * FROM fts WHERE fts MATCH 'abc'").fetchall()
self.assertEqual(2, len(result))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment