Skip to content

Instantly share code, notes, and snippets.

@njsmith
Created October 7, 2019 08:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save njsmith/23b12089fa01a23d8c6209ef197c6c38 to your computer and use it in GitHub Desktop.
Save njsmith/23b12089fa01a23d8c6209ef197c6c38 to your computer and use it in GitHub Desktop.
import socket
import errno
import select
a, b = socket.socketpair(family=socket.AF_UNIX, type=socket.SOCK_DGRAM)
a.setblocking(False)
try:
while True:
a.send(b"hi")
except OSError as exc:
print("send failed with errno {} ({})".format(exc.errno, errno.errorcode[exc.errno]))
print("checking for writability with select")
r, w, e = select.select([], [a], [])
if w:
print("select says socket is writable")
print("checking for writability with kqueue")
kq = select.kqueue()
ke = select.kevent(a, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
events = kq.control([ke], 1)
if events:
print("kqueue says socket is writable ({!r})".format(events))
print("trying to send again")
a.send(b"asdf")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment