Skip to content

Instantly share code, notes, and snippets.

@zed zed/ Secret
Created Apr 19, 2018

What would you like to do?
#!/usr/bin/env python3
"""Show number of bytes received."""
import socket
import sys
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
from PyQt5.QtCore import pyqtSignal
from PyQt5.QtWidgets import QApplication, QLabel
def server(signal_obj, handler, *, host='localhost', port=29044,
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# reuse sockets in TIME_WAIT state
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
signal_obj.server_started.emit(host, port)
with ThreadPoolExecutor(nconcurrent) as pool:
while True:
conn, addr = sock.accept()
pool.submit(handler, conn).add_done_callback(
lambda f: signal_obj.request_finished.emit(*addr, f.result()))
def count_nbytes(conn):
"""Count the number of received byte"""
with conn:
return sum(map(len, iter(lambda: conn.recv(8096), b'')))
class Widget(QLabel):
server_started = pyqtSignal(str, int) # host, port
request_finished = pyqtSignal(str, int, object) # *addr, result
def display(self, *args):
self.setText('Got: ' + ' '.join(map(str, args)))
app = QApplication(sys.argv)
w = Widget()
w.setWindowTitle('Count received bytes')
# run server in a background thread
Thread(target=server, args=[w, count_nbytes], daemon=True).start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.