-
-
Save Moosems/963300fffff27bdc5bbc0ee3b175a5f9 to your computer and use it in GitHub Desktop.
Simple client/server app
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from json import dumps, loads | |
from os import set_blocking | |
from random import randint | |
from subprocess import PIPE, Popen | |
from tkinter import Entry, Event, Label, Tk | |
from typing import IO | |
class ServerApp: | |
def __init__(self): | |
self.used_ids = [] | |
self.current_id = 0 | |
self.main_server: Popen | |
self.create_server() | |
self.app = Tk() | |
self.user_entry = Entry(self.app) | |
self.user_entry.pack() | |
self.user_entry.bind("<Return>", self.create_request) | |
self.user_entry.focus() | |
self.output_label = Label(self.app, text="Item at index ?: ?") | |
self.output_label.pack() | |
self.app.after_idle(self.read_server_output) | |
self.app.mainloop() | |
def create_server(self) -> None: | |
server = Popen(["python3", "simple_server.py"], stdin=PIPE, stdout=PIPE) | |
set_blocking(server.stdout.fileno(), False) # type: ignore | |
set_blocking(server.stdin.fileno(), False) # type: ignore | |
self.main_server = server | |
def check_server(self) -> None: | |
if self.main_server.poll(): | |
self.create_server() | |
def get_server_file(self, file: str) -> IO: | |
self.check_server() | |
if file == "stdout": | |
return self.main_server.stdout # type: ignore | |
return self.main_server.stdin # type: ignore | |
def parse_line(self, line: str) -> None: | |
response_json = loads(line) | |
id = response_json["id"] | |
self.used_ids.remove(id) | |
if id != self.current_id: | |
return | |
self.current_id = 0 | |
item = response_json["item"] # type: ignore | |
self.output_label.configure(text=f"Item at index requested: {item}") | |
def read_server_output(self) -> None: | |
server_stdout: IO = self.get_server_file("stdout") | |
for line in server_stdout: # type: ignore | |
self.parse_line(line) | |
self.refresh_server() | |
self.app.after(50, self.read_server_output) | |
def refresh_server(self) -> None: | |
self.create_request(Event(), "refresh") | |
def create_request(self, _: Event, type: str = "request") -> None: | |
id = randint(0, 5000) | |
while id in self.used_ids: | |
id = randint(0, 5000) | |
self.used_ids.append(id) | |
if type != "refresh": | |
self.current_id = id # We don't care for refresh request data | |
request = dumps({"id": id, "type": type, "index": self.user_entry.get()}) | |
server_stdin = self.get_server_file("stdin") | |
server_stdin.write(f"{request}\n".encode()) | |
server_stdin.flush() | |
if __name__ == "__main__": | |
ServerApp() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from json import dumps, loads | |
from os import set_blocking | |
from sys import exit, stdin, stdout | |
from time import time | |
from typing import TypedDict, NotRequired | |
from selectors import EVENT_READ, DefaultSelector | |
class ResReq(TypedDict): | |
id: int | |
type: str # "cancelled", "response", "refresh", "request" | |
index: NotRequired[str] # Only in "request" | |
item: NotRequired[str] # Only in "response" | |
class RequestHandler: | |
def __init__(self): | |
set_blocking(stdin.fileno(), False) | |
set_blocking(stdout.fileno(), False) | |
self.selector = DefaultSelector() | |
self.selector.register(stdin, EVENT_READ) | |
self.id_list = [] | |
self.newest_request = None | |
self.newest_id = 0 | |
self.old_time = time() | |
self.random_list = ["apples", "oranges", "bananas"] | |
def get_item(self, index: str) -> str: | |
try: | |
intdex = int(index) | |
except ValueError: | |
return "?" | |
try: | |
return self.random_list[intdex] | |
except IndexError: | |
return "?" | |
def write_resreq(self, response: ResReq) -> None: | |
stdout.write(dumps(response) + "\n") | |
stdout.flush() | |
def parse_line(self, line: str) -> None: | |
json_input: ResReq = loads(line) | |
id: int = json_input["id"] | |
if json_input["type"] == "refresh": | |
self.write_resreq({"id": id, "type": "response"}) | |
return | |
self.id_list.append(id) | |
self.newest_id = id | |
self.newest_request = json_input | |
def cancel_all_ids_except_newest(self) -> None: | |
for id in self.id_list: | |
if id == self.newest_id: | |
continue | |
self.write_resreq({"id": id, "type": "cancelled"}) | |
def run_tasks(self): | |
current_time = time() | |
if current_time - self.old_time > 5: | |
exit(0) | |
events = self.selector.select(0.025) | |
if not events: | |
return | |
for line in stdin: | |
self.old_time = current_time | |
self.parse_line(line) | |
self.cancel_all_ids_except_newest() | |
if not self.newest_request: # There may have only been refreshes | |
return | |
index_request: str = self.newest_request["index"] # type: ignore | |
self.write_resreq( | |
{ | |
"id": self.newest_id, | |
"type": "response", | |
"item": self.get_item(index_request), | |
} | |
) | |
self.id_list = [] | |
self.newest_request = None | |
self.newest_id = 0 | |
if __name__ == "__main__": | |
handler = RequestHandler() | |
while True: | |
handler.run_tasks() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment