Skip to content

Instantly share code, notes, and snippets.

@openfly
Created December 20, 2016 23:23
Show Gist options
  • Save openfly/0d839765519990d276c2c85b6f9648a1 to your computer and use it in GitHub Desktop.
Save openfly/0d839765519990d276c2c85b6f9648a1 to your computer and use it in GitHub Desktop.
testing an idea out
import requests
import multiprocessing
import sys
class EventHook(object):
def __init__(self):
self.__handlers = []
def __iadd__(self, handler):
self.__handlers.append(handler)
return self
def __isub__(self, handler):
self.__handlers.remove(handler)
return self
def fire(self, *args, **keywargs):
for handler in self.__handlers:
handler(*args, **keywargs)
def clearObjectHandlers(self, inObject):
for theHandler in self.__handlers:
if theHandler.im_self == inObject:
self -= theHandler
class Listener():
def __init__(self):
self.onChange = EventHook()
def test_print():
print "test_print called."
def poll_api(lister):
p = multiprocessing.current_process()
print p.name
print p.pid
while(True):
response = requests.get('http://localhost:8844/date')
print response.text
lister.onChange.fire()
sys.stdout.flush()
return 0
lister = Listener()
lister.onChange += test_print
bg = multiprocessing.Process(name='daemon', target=poll_api, args=(lister,))
bg.daemon = True
bg.start()
print 'testing beyond thread'
bg.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment