-
-
Save njsmith/ba7a7c1033d46895ea6e552cbec59141 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import threading | |
import time | |
print("Creating new anonymous pipe") | |
r1, w = os.pipe() | |
print("Reopening read end") | |
r2 = os.open("/proc/self/fd/{}".format(r1), os.O_RDONLY) | |
print("r1 is {}, r2 is {}".format(r1, r2)) | |
print("checking they both can receive from w...") | |
os.write(w, b"a") | |
assert os.read(r1, 1) == b"a" | |
os.write(w, b"b") | |
assert os.read(r2, 1) == b"b" | |
print("...ok") | |
print("setting r2 to non-blocking") | |
os.set_blocking(r2, False) | |
print("os.get_blocking(r1) ==", os.get_blocking(r1)) | |
print("os.get_blocking(r2) ==", os.get_blocking(r2)) | |
# Check r2 is really truly non-blocking | |
try: | |
os.read(r2, 1) | |
except BlockingIOError: | |
print("r2 definitely seems to be in non-blocking mode") | |
# Check that r1 is really truly still in blocking mode | |
def sleep_then_write(): | |
time.sleep(1) | |
os.write(w, b"c") | |
threading.Thread(target=sleep_then_write, daemon=True).start() | |
assert os.read(r1, 1) == b"c" | |
print("r1 definitely seems to be in blocking mode") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment