Skip to content

Instantly share code, notes, and snippets.

@Leedehai
Created August 18, 2020 04:15
Show Gist options
  • Save Leedehai/bf24f4de497ad1bd87055cb8709e322d to your computer and use it in GitHub Desktop.
Save Leedehai/bf24f4de497ad1bd87055cb8709e322d to your computer and use it in GitHub Desktop.
Rotating log, Unix domain socket implementation
#!/usr/bin/env python3
# Python 3.7+
import multiprocessing.dummy as mp # Threading wrapped using multiprocessing API.
import os
import queue
import socket
import socketserver
import sys
import time
import threading
from pathlib import Path
SERVER_ADDR = './uds_socket' # UNIX domain socket
received_conn = 0
class Logger:
CURSOR_UP_AND_CLEAR = "\x1b[1A\x1b[2K" # Cursor moves up and clears line.
instance_ = None
def __init__(self, size: int):
assert size > 0
self.arr_ = [] # Transient lines currently on screen.
self.max_size_ = size # Max count of transient lines.
@staticmethod
def get_instance():
if Logger.instance_ == None:
Logger.instance_ = Logger(size=5)
return Logger.instance_
def add_transient(self, s: str) -> None:
original_qsize = len(self.arr_)
if original_qsize == self.max_size_:
self.arr_.pop(0)
self.arr_.append(s)
sys.stderr.write( # Cursor moves up and clears line.
Logger.CURSOR_UP_AND_CLEAR * original_qsize
+ "\x1b[?25l" # Hide cursor.
+ ''.join(self.arr_)
+ "\x1b[?25h" # Show cursor.
)
def add_persistent(self, s: str) -> None:
for _ in range(len(self.arr_)):
sys.stderr.write(Logger.CURSOR_UP_AND_CLEAR)
self.arr_.clear()
sys.stderr.write(s)
class UdpRequestHandler(socketserver.DatagramRequestHandler):
def handle(self):
# Unlike a TCP handler, here self.request is tuple (data, client socket)
data_bytes, client_sock = self.request[0], self.request[1]
message = data_bytes.decode()
if int(message.strip().split()[-1]) % 20 == 0:
Logger.get_instance().add_persistent(message)
else:
Logger.get_instance().add_transient(message)
global received_conn
received_conn += 1
def finish(self):
pass
def send_message_to_server(uds_addr: Path, data: bytes) -> None:
assert os.path.exists(uds_addr)
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
sock.connect(uds_addr)
sock.sendall(data) # Better than send()
def multiply_by_2(n: int):
time.sleep(0.2) # Simulates some blocking call.
message_str = "client: n = %d\n" % n
# TODO send message_str to server
send_message_to_server(SERVER_ADDR, message_str.encode())
return n * 2
def run_multiple_clients_until_complete(input_list):
pool = mp.Pool(8)
result_list = pool.map(multiply_by_2, input_list)
return result_list
def stop_server(server):
server.shutdown()
server.server_close()
os.remove(SERVER_ADDR)
def start_server_thread():
if os.path.exists(SERVER_ADDR):
os.remove(SERVER_ADDR)
server = socketserver.UnixDatagramServer(SERVER_ADDR, UdpRequestHandler)
server_thread = threading.Thread(target=server.serve_forever)
# Exits the server thread when the main thread exits.
server_thread.daemon = True
server_thread.start()
return server
def work(input_list):
server = start_server_thread()
result_list = run_multiple_clients_until_complete(input_list)
stop_server(server)
return result_list
def main():
try:
n = int(sys.argv[1] if len(sys.argv[1:]) else 20)
except ValueError:
sys.exit("expecting a number")
input_list = list(range(n))
result_list = work(input_list)
print(result_list)
print("received_conn: %d, expected: %d" % (received_conn, n))
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment