Skip to content

Instantly share code, notes, and snippets.

@minrk
Created October 30, 2017 13:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save minrk/bff185028b6063271ccad2b83faef63b to your computer and use it in GitHub Desktop.
Save minrk/bff185028b6063271ccad2b83faef63b to your computer and use it in GitHub Desktop.
"""background module, first imported
Defines a `schedule` function for calling in a background thread.
"""
from __future__ import print_function
import os
import struct
import select
import sys
from threading import Thread
# pipe for signaling events
r, w = os.pipe()
r = os.fdopen(r, 'rb')
w = os.fdopen(w, 'wb')
evts = {}
n = 0
def schedule(f):
"""Schedule f to be called in the background thread"""
global n
evt_id = n
n += 1
evts[evt_id] = f
w.write(struct.pack('H', evt_id))
w.flush()
print('sent', evt_id)
def background():
"""Eventloop for the background thread"""
while True:
rlist, _, _ = select.select([r], [], [], timeout=0.1)
if not rlist:
print('.', end='')
sys.stdout.flush()
continue
evt_b = r.read(2)
evt_id = struct.unpack('H', evt_b)[0]
print('handling %i' % evt_id)
f = evts.pop(evt_id)
f()
t = Thread(target=background)
t.daemon = True
t.start()
"""
Second module that tries to synchronize with a's background thread
by passing `evt.set` to be called in the background and waiting with `evt.wait`.
"""
from __future__ import print_function
from a import schedule
from threading import Event
print("scheduling one, two, three")
schedule(lambda: print('one'))
schedule(lambda: print('two'))
schedule(lambda: print('three'))
evt = Event()
schedule(evt.set)
print('waiting')
if evt.wait(timeout=2):
print('evt.set called (import)')
else:
print("evt.set never called (import)")
"""test script
1. `import a` to set up
2. verify that scheduling works
3. `import b` and verify that scheduling *doesn't* work during import
4. verify that scheduling still works now that we are back in the main thread
"""
from __future__ import print_function
from threading import Event
import time
print("importing a")
import a
time.sleep(1)
print("Thread ready?")
a.schedule(lambda: print("ready"))
time.sleep(1)
print("importing b")
import b
print("imported b")
time.sleep(1)
# wait again, not during import
evt = Event()
a.schedule(evt.set)
print('waiting again')
if evt.wait(timeout=2):
print('evt.set called (main)')
else:
print("evt.set never called (main)")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment