Skip to content

Instantly share code, notes, and snippets.

@yuribossa
Created January 6, 2010 11:24
Show Gist options
  • Save yuribossa/270211 to your computer and use it in GitHub Desktop.
Save yuribossa/270211 to your computer and use it in GitHub Desktop.
Fulltext search on Google App Engine
# -*- coding: utf8 -*-
# Fulltext search module on Google App Engine.
from google.appengine.ext import db
class Text(db.Model):
content = db.StringProperty(multiline=True)
created = db.DateTimeProperty(auto_now_add=True)
class InvertedIndex(db.Model):
text_list = db.ListProperty(int)
position_list = db.ListProperty(int)
# make inverted index
class Indexer():
def make_index(self, text=''):
if len(text) == 0:
return True
elif len(text) <= 1:
return False
elif len(text) > 100:
return False
try:
tx = Text(content=text)
tx.put()
id = tx.key().id()
except:
return False
for i in range(len(text)-1):
while True:
if self.put(text, id, i):
break
return True
def put(self, text, id, num):
try:
q = InvertedIndex.get_by_key_name(text[num:num+2])
if q is None:
q = InvertedIndex(key_name=text[num:num+2])
q.text_list.append(id)
q.position_list.append(num)
q.put()
else:
q.text_list.append(id)
q.position_list.append(num)
q.put()
except:
return False
return True
# search text by query
class SearchClient():
def search(self, query):
if len(query) < 2:
return None
index_lists = []
for i in range(len(query)-1):
q = InvertedIndex.get_by_key_name(query[i:i+2])
if q is None:
return None
index_lists.append(zip(q.text_list, map(lambda x: x-i, q.position_list)))
result = []
for head in index_lists[0]:
if len(index_lists) == 1:
result.append(head)
continue
text_id = head[0]
text_pos = head[1]
search_flg = False
for i in range(1, len(index_lists)):
search_flg2 = False
for j in range(len(index_lists[i])):
if text_id == index_lists[i][j][0] and text_pos == index_lists[i][j][1]:
search_flg2 = True
break
if search_flg2:
search_flg = True
else:
search_flg = False
break
if search_flg:
result.append(head)
return result
# delete old text and inverted index
class RefreshClient():
def refresh(self, delete_time):
texts = Text.all().filter('created <', delete_time)
if texts is None:
return
for text in texts:
id = text.key().id()
for i in range(len(text.content)-1):
q = InvertedIndex.get_by_key_name(text.content[i:i+2])
if q is None:
continue
for i in range(len(q.text_list)):
if id == q.text_list[i]:
q.text_list.remove(id)
p = q.position_list[i]
q.position_list.remove(p)
break
if len(q.text_list) > 0:
q.put()
else:
q.delete()
text.delete()
# -*- coding: utf8 -*-
# Example of fulltext search on Google App Engine
from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app
from datetime import datetime, timedelta
import cgi
import re
from zenbun import *
class IndexHandler(webapp.RequestHandler):
def get(self):
FORM1 = """
<form action='/zenbun/import' method='post'>
<input type='text' name='query' />
<input type='submit' value='Import' />
</form>
"""
FORM2 = """
<form action='/zenbun/search' method='get'>
<input type='text' name='query' value='' />
<input type='submit' value='Search' />
</form>
"""
w = self.response.out.write
w('<html><head><meta http-equiv="Content-Type" content="text/html; charset=utf-8"><title>Full-Text Search on Google App Engine</title></head><body>')
w("<h3>Full-Text Search on Google App Engine</h3>")
w(FORM1)
w(FORM2)
w('</body></html>')
class ErrorHandler(webapp.RequestHandler):
def get(self):
w = self.response.out.write
w('<html><head><meta http-equiv="Content-Type" content="text/html; charset=utf-8"><title>Error</title></head><body>')
w(u'<p>Error</p>')
w('<p><a href="/zenbun/index">index</a></p>')
w('</body></html>')
class ImportHandler(webapp.RequestHandler):
def post(self):
text = self.request.get('query')
m = Indexer()
if not m.make_index(text):
self.redirect('/zenbun/error')
return
self.redirect('/zenbun/index')
class SearchHandler(webapp.RequestHandler):
def get(self):
query = self.request.get('query')
if not query:
self.redirect('/zenbun/index')
return
sc = SearchClient()
start_time = datetime.utcnow()
results = sc.search(query)
end_time = datetime.utcnow()
process_time = end_time - start_time
w = self.response.out.write
w('<html><head><meta http-equiv="Content-Type" content="text/html; charset=utf-8"><title>Search Result</title></head><body>')
if results:
written_list = []
for result in results:
if result[0] in written_list:
continue
else:
written_list.append(result[0])
q = Text.get_by_id(result[0])
x = '<p>' + re.compile(cgi.escape(query)).sub('<b>'+cgi.escape(query)+'</b>', cgi.escape(q.content)) + '</p>'
w(x)
else:
w("'" + cgi.escape(query) + "' is not found.")
w('<p>time: '+str(process_time)+'</p>')
w('<p><a href="/zenbun/index">index</a></p>')
w('</body></html>')
class RefreshHandler(webapp.RequestHandler):
def get(self):
delete_time = datetime.utcnow() - timedelta(minutes=60)
rc = RefreshClient()
rc.refresh(delete_time)
application = webapp.WSGIApplication(
[('/zenbun/index', IndexHandler),
('/zenbun/refresh', RefreshHandler),
('/zenbun/import', ImportHandler),
('/zenbun/error', ErrorHandler),
('/zenbun/search', SearchHandler)],
debug=False)
def main():
run_wsgi_app(application)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment