Last active
August 4, 2021 06:56
-
-
Save khyurri/62518cd97ba81db1f6eb6b8d519758d2 to your computer and use it in GitHub Desktop.
Search engine from Piter Python Summer Meetup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import selectors | |
import socket | |
import time | |
from collections import defaultdict, deque | |
from typing import Callable, TextIO, Generator | |
index = defaultdict(set) | |
sel = selectors.DefaultSelector() | |
def tokenize(text: str, sep: set) -> Generator: | |
acc = [] | |
for char in text: | |
if char not in sep: | |
acc.append(char) | |
else: | |
token = "".join(acc).lower() | |
yield token | |
acc = [] | |
if acc: | |
token = "".join(acc).lower() | |
yield token | |
def inverted_index() -> Generator: | |
sep = {" ", ".", ",", "\n", "!", "?"} | |
stop_words = {"the", "a", "to", "be", ""} | |
while True: | |
name, text = yield | |
for token in tokenize(text, sep): | |
if token not in stop_words: | |
index[token].add(name) | |
# AND NOT OR | |
# a and b | |
def search(query: str) -> set: | |
resultset = set() | |
grammar = {"and", "or", "not"} | |
first = True | |
prev_keyword = "" | |
for keyword in tokenize(query, {" "}): | |
if keyword in grammar: | |
prev_keyword = keyword | |
else: | |
if first: | |
resultset = index.get(keyword, set()) | |
first = False | |
else: | |
if prev_keyword == "and": | |
resultset = resultset.intersection(index.get(keyword, set())) | |
elif prev_keyword == "or": | |
resultset = resultset.union( | |
index.get(keyword, set())) | |
elif prev_keyword == "not": | |
resultset = resultset.difference( | |
index.get(keyword, set())) | |
return resultset | |
def read_file(fd: TextIO) -> Generator: | |
for chunk in iter(lambda: fd.readline(), ""): | |
yield fd.name, chunk | |
def read_socket() -> Generator: | |
while True: | |
events = sel.select(0) | |
if events: | |
for key, addr in events: | |
key.data(key.fileobj) | |
yield "socket", None | |
def reader(files: deque, inv_coro: Generator) -> None: | |
while files: | |
try: | |
file = files.popleft() | |
name, chunk = next(file) | |
if name != "socket": | |
inv_coro.send((name, chunk)) | |
except StopIteration: | |
print("File indexed") | |
else: | |
files.append(file) | |
#print(index) | |
time.sleep(0.1) | |
def accept(sock: socket.socket) -> None: | |
conn, addr = sock.accept() | |
print(f"Connected from {addr}") | |
while True: | |
try: | |
query = conn.recv(1024).decode("utf-8").strip() | |
result = search(query) | |
for doc in result: | |
conn.send(doc.encode("utf-8")) | |
conn.send("\n".encode("utf-8")) | |
conn.close() | |
break | |
except BlockingIOError: | |
pass | |
except BaseException: | |
conn.close() | |
raise | |
if __name__ == "__main__": | |
sock = socket.socket() | |
try: | |
sock.bind(("127.0.0.1", 5555)) | |
sock.setblocking(False) | |
sock.listen() | |
sel.register(sock, selectors.EVENT_READ, accept) | |
inv_coro = inverted_index() | |
next(inv_coro) | |
job_queue = deque() | |
job_queue.append(read_socket()) | |
for *_, files in os.walk("files"): | |
for file in files: | |
job_queue.append(read_file(open("files/{}".format(file)))) | |
reader(job_queue, inv_coro) | |
inv_coro.close() | |
res = search("book not frodo") | |
print(res) | |
except BaseException: | |
sock.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment