Last active
March 8, 2024 17:21
-
-
Save JamesTheAwesomeDude/3828775d68ea04a3e2906b7197d83778 to your computer and use it in GitHub Desktop.
Python socket.recv() work with KeyboardInterrupt example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from contextlib import contextmanager | |
import select | |
import signal | |
import socket | |
try: | |
from resource import getpagesize | |
except ImportError: | |
import mmap | |
def getpagesize(): | |
return mmap.PAGESIZE | |
def _udp_listen(address, bufsize=getpagesize(), family=socket.AF_INET, flags=0, sockopts=frozenset()): | |
with socket.socket(family, socket.SOCK_DGRAM) as sock: | |
for sockopt in sockopts: | |
sock.setsockopt(*sockopt) | |
sock.bind(address) | |
_coalmine, _canary = socket.socketpair() | |
with _canary, _coalmine, _wakeup_fd_ctx(_coalmine.fileno(), strict=True, warn_on_full_buffer=False): | |
while True: | |
# FIXME: select is deprecated on Linux, should use poll instead | |
# (I think not epoll, for 2 reasons: it's not cross-platform, | |
# and edge polling isn't what we want here anyway) | |
# https://stackoverflow.com/q/78124574/1874170 | |
ready = select.select((sock, _canary), (), ())[0] | |
if _canary in ready: | |
# There's no need to raise any error ourselves, | |
# since Python itself will raise KeyboardInterrupt | |
# out of select.select() if needed | |
pass | |
if sock in ready: | |
yield sock.recv(bufsize, flags) | |
@contextmanager | |
def _wakeup_fd_ctx(fd, strict=True, **k): | |
_orig_wakeup_fd = signal.set_wakeup_fd(fd, **k) | |
_needs_restore = True | |
try: | |
if _orig_wakeup_fd == -1: | |
yield fd | |
else: | |
# We overwrote the existing handler | |
if strict: | |
raise RuntimeError(f'wakeup fd already occupied ({_orig_wakeup_fd}). Not sure what to do about that.') | |
else: | |
signal.set_wakeup_fd(_orig_wakeup_fd) | |
_needs_restore = False | |
yield _orig_wakeup_fd | |
finally: | |
if _needs_restore: | |
signal.set_wakeup_fd(_orig_wakeup_fd) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment