Skip to content

Instantly share code, notes, and snippets.

@njsmith
Created July 12, 2017 20:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save njsmith/ba7a7c1033d46895ea6e552cbec59141 to your computer and use it in GitHub Desktop.
Save njsmith/ba7a7c1033d46895ea6e552cbec59141 to your computer and use it in GitHub Desktop.
import os
import threading
import time
print("Creating new anonymous pipe")
r1, w = os.pipe()
print("Reopening read end")
r2 = os.open("/proc/self/fd/{}".format(r1), os.O_RDONLY)
print("r1 is {}, r2 is {}".format(r1, r2))
print("checking they both can receive from w...")
os.write(w, b"a")
assert os.read(r1, 1) == b"a"
os.write(w, b"b")
assert os.read(r2, 1) == b"b"
print("...ok")
print("setting r2 to non-blocking")
os.set_blocking(r2, False)
print("os.get_blocking(r1) ==", os.get_blocking(r1))
print("os.get_blocking(r2) ==", os.get_blocking(r2))
# Check r2 is really truly non-blocking
try:
os.read(r2, 1)
except BlockingIOError:
print("r2 definitely seems to be in non-blocking mode")
# Check that r1 is really truly still in blocking mode
def sleep_then_write():
time.sleep(1)
os.write(w, b"c")
threading.Thread(target=sleep_then_write, daemon=True).start()
assert os.read(r1, 1) == b"c"
print("r1 definitely seems to be in blocking mode")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment