Skip to content

Instantly share code, notes, and snippets.

@zed
Created April 15, 2018 12:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zed/f5dbe136adff2e1a315031d703e370c6 to your computer and use it in GitHub Desktop.
Save zed/f5dbe136adff2e1a315031d703e370c6 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""Entry -> queue -> (client request) -> (server response) -> Label"""
import os
import socket
from threading import Thread
from tkinter import BOTH, Tk, ttk
from queue import Queue, Full
def echo_server(host, port):
# from stdlib docs /library/socket.html#example
s = None
for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC,
socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except OSError as msg:
s = None
continue
try:
s.bind(sa)
s.listen(1)
except OSError as msg:
s.close()
s = None
continue
break
if s is None:
print('could not open server socket', flush=True)
os._exit(1)
print('Serving on {}:{}'.format(*sa))
while True:
conn, addr = s.accept()
with conn:
print('Connected by {}:{}'.format(*addr))
while True: # one client at a time
data = conn.recv(1024)
if not data:
break
conn.sendall(data)
def echo_client(input_queue, label, host, port):
with socket.create_connection((host, port)) as s, \
s.makefile('r', encoding='utf-8', newline='\n') as file:
for request in iter(input_queue.get, None):
# send request (a single line)
s.sendall(request.encode('utf-8') + b'\n')
# receive response (a single line)
label['text'] = file.readline().rstrip('\n')
def on_key(q, value):
try:
q.put_nowait(value)
except Full:
return 0
else:
return 1
def main():
host, port = 'localhost', int.from_bytes(b'ec', 'big')
input_queue = Queue()
root = Tk()
vcmd = (root.register(lambda P: on_key(input_queue, P)), '%P')
entry = ttk.Entry(validate="key", validatecommand=vcmd)
entry.pack(fill=BOTH)
entry.focus()
label = ttk.Label(text='nothing yet')
label.pack()
Thread(target=echo_server, args=[host, port], daemon=True).start()
Thread(target=echo_client,
args=[input_queue, label, host, port], daemon=True).start()
root.mainloop()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment