Last active
August 29, 2015 14:02
-
-
Save saaj/625fe3753a20b46d6a8f to your computer and use it in GitHub Desktop.
Debugging Python SQLite FTS4 tokenizers with sqlite-fts-python 0.1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- original/sqlite_tokenizer.py | |
+++ patched/sqlite_tokenizer.py | |
@@ -8,6 +8,10 @@ | |
import ctypes | |
from ctypes import POINTER, CFUNCTYPE | |
import struct | |
+ | |
+ | |
+SQLITE_OK = 0 | |
+SQLITE_DONE = 101 | |
class sqlite3_tokenizer_module(ctypes.Structure): | |
@@ -64,11 +68,11 @@ | |
tkn.t = tokenizer | |
tokenizers[ctypes.addressof(tkn)] = tkn | |
ppTokenizer[0] = ctypes.pointer(tkn) | |
- return 0 | |
+ return SQLITE_OK | |
def xdestroy(pTokenizer): | |
del(tokenizers[ctypes.addressof(pTokenizer[0])]) | |
- return 0 | |
+ return SQLITE_OK | |
def xopen(pTokenizer, pInput, nInput, ppCursor): | |
cur = sqlite3_tokenizer_cursor() | |
@@ -78,28 +82,33 @@ | |
cur.offset = 0 | |
cursors[ctypes.addressof(cur)] = cur | |
ppCursor[0] = ctypes.pointer(cur) | |
- return 0 | |
+ return SQLITE_OK | |
def xnext(pCursor, ppToken, pnBytes, | |
piStartOffset, piEndOffset, piPosition): | |
try: | |
cur = pCursor[0] | |
- token = next(cur.tokens).encode('utf-8') | |
- tokenlen = len(token) | |
- ppToken[0] = token | |
- pnBytes[0] = tokenlen | |
- piStartOffset[0] = cur.offset | |
- cur.offset += tokenlen | |
- piEndOffset[0] = cur.offset | |
+ | |
+ while True: | |
+ normalized, inputBegin, inputEnd = next(cur.tokens) | |
+ normalized = normalized.encode('utf-8') | |
+ if normalized: | |
+ break | |
+ | |
+ ppToken[0] = normalized | |
+ pnBytes[0] = len(normalized) | |
+ piStartOffset[0] = inputBegin | |
+ piEndOffset[0] = inputEnd | |
+ cur.offset = inputEnd | |
piPosition[0] = cur.pos | |
cur.pos += 1 | |
except StopIteration: | |
- return 101 | |
- return 0 | |
+ return SQLITE_DONE | |
+ return SQLITE_OK | |
def xclose(pCursor): | |
del(cursors[ctypes.addressof(pCursor[0])]) | |
- return 0 | |
+ return SQLITE_OK | |
tokenizer_module = sqlite3_tokenizer_module( | |
0, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import unittest | |
import sqlite3 | |
import re | |
import sqlitefts.sqlite_tokenizer as fts | |
class BaseTokenizer(fts.Tokenizer): | |
_spliter = re.compile(r'\s+|\S+') | |
_nonws = re.compile(r'\S+') | |
def _normalize(self, token): | |
return token | |
def _tokenize(self, text): | |
pos = 0 | |
for t in self._spliter.findall(text): | |
byteLen = len(t.encode('utf-8')) | |
if self._nonws.match(t): | |
yield self._normalize(t), pos, pos + byteLen | |
pos += byteLen | |
def tokenize(self, text): | |
return self._tokenize(text) | |
class DebugTokenizer(BaseTokenizer): | |
_limit = 16 | |
def _normalize(self, token): | |
if not self._limit: | |
raise RuntimeError() | |
self._limit -= 1 | |
print token, token[0:-1] | |
return token[0:-1] | |
class OriginalDebugTokenizer(fts.Tokenizer): | |
_limit = 16 | |
def tokenize(self, text): | |
if not self._limit: | |
raise RuntimeError() | |
self._limit -= 1 | |
print text, [w[0:-1] for w in text.split(' ')] | |
return (w[0:-1] for w in text.split(' ')) | |
class TestCase(unittest.TestCase): | |
def setUp(self): | |
name = 'test' | |
conn = sqlite3.connect(':memory:') | |
fts.register_tokenizer(conn, name, fts.make_tokenizer_module(DebugTokenizer())) | |
conn.execute('CREATE VIRTUAL TABLE fts USING FTS4(tokenize={})'.format(name)) | |
self.testee = conn | |
def testZeroLengthToken(self): | |
result = self.testee.executemany('INSERT INTO fts VALUES(?)', [('Make things I',), (u'Some σ φχικλψ',)]) | |
self.assertEqual(2, result.rowcount) | |
def testInfiniteRecursion(self): | |
contents = [('abc def',), ('abc xyz',)] | |
result = self.testee.executemany('INSERT INTO fts VALUES(?)', contents) | |
self.assertEqual(2, result.rowcount) | |
result = self.testee.execute("SELECT * FROM fts WHERE fts MATCH 'abc'").fetchall() | |
self.assertEqual(2, len(result)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment