Skip to content

Instantly share code, notes, and snippets.

@khyurri
Last active August 4, 2021 06:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save khyurri/62518cd97ba81db1f6eb6b8d519758d2 to your computer and use it in GitHub Desktop.
Save khyurri/62518cd97ba81db1f6eb6b8d519758d2 to your computer and use it in GitHub Desktop.
Search engine from Piter Python Summer Meetup
import os
import selectors
import socket
import time
from collections import defaultdict, deque
from typing import Callable, TextIO, Generator
index = defaultdict(set)
sel = selectors.DefaultSelector()
def tokenize(text: str, sep: set) -> Generator:
acc = []
for char in text:
if char not in sep:
acc.append(char)
else:
token = "".join(acc).lower()
yield token
acc = []
if acc:
token = "".join(acc).lower()
yield token
def inverted_index() -> Generator:
sep = {" ", ".", ",", "\n", "!", "?"}
stop_words = {"the", "a", "to", "be", ""}
while True:
name, text = yield
for token in tokenize(text, sep):
if token not in stop_words:
index[token].add(name)
# AND NOT OR
# a and b
def search(query: str) -> set:
resultset = set()
grammar = {"and", "or", "not"}
first = True
prev_keyword = ""
for keyword in tokenize(query, {" "}):
if keyword in grammar:
prev_keyword = keyword
else:
if first:
resultset = index.get(keyword, set())
first = False
else:
if prev_keyword == "and":
resultset = resultset.intersection(index.get(keyword, set()))
elif prev_keyword == "or":
resultset = resultset.union(
index.get(keyword, set()))
elif prev_keyword == "not":
resultset = resultset.difference(
index.get(keyword, set()))
return resultset
def read_file(fd: TextIO) -> Generator:
for chunk in iter(lambda: fd.readline(), ""):
yield fd.name, chunk
def read_socket() -> Generator:
while True:
events = sel.select(0)
if events:
for key, addr in events:
key.data(key.fileobj)
yield "socket", None
def reader(files: deque, inv_coro: Generator) -> None:
while files:
try:
file = files.popleft()
name, chunk = next(file)
if name != "socket":
inv_coro.send((name, chunk))
except StopIteration:
print("File indexed")
else:
files.append(file)
#print(index)
time.sleep(0.1)
def accept(sock: socket.socket) -> None:
conn, addr = sock.accept()
print(f"Connected from {addr}")
while True:
try:
query = conn.recv(1024).decode("utf-8").strip()
result = search(query)
for doc in result:
conn.send(doc.encode("utf-8"))
conn.send("\n".encode("utf-8"))
conn.close()
break
except BlockingIOError:
pass
except BaseException:
conn.close()
raise
if __name__ == "__main__":
sock = socket.socket()
try:
sock.bind(("127.0.0.1", 5555))
sock.setblocking(False)
sock.listen()
sel.register(sock, selectors.EVENT_READ, accept)
inv_coro = inverted_index()
next(inv_coro)
job_queue = deque()
job_queue.append(read_socket())
for *_, files in os.walk("files"):
for file in files:
job_queue.append(read_file(open("files/{}".format(file))))
reader(job_queue, inv_coro)
inv_coro.close()
res = search("book not frodo")
print(res)
except BaseException:
sock.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment