Last active
November 5, 2020 05:57
-
-
Save Ahuge/2175bc17354290cdd45f3e56ad279498 to your computer and use it in GitHub Desktop.
Basic Example of using a pure python Signal/Slot implementation talking between threads. Aims to feel like Qt's Signals.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ------------------------------------------------------------------------------------------------------------ # | |
# --------------------------------------------- Signal Library ----------------------------------------------- # | |
# ------------------------------------------------------------------------------------------------------------ # | |
import weakref | |
class Signal(object): | |
def __init__(self, *types): | |
self._call_types = types | |
self._connections = set() | |
def connect(self, cbl): | |
wOwner = None | |
wCbl = weakref.ref(cbl) | |
try: | |
wCbl = weakref.ref(cbl.im_func) | |
wOwner = weakref.ref(cbl.im_self) | |
except AttributeError: | |
pass | |
self._connections.add( | |
(wOwner, wCbl) | |
) | |
def disconnect(self, cbl): | |
for wOwner, wCbl in self._connections: | |
if wCbl() is cbl or (hasattr(cbl, "im_func") and wCbl() is cbl.im_func): | |
self._connections.remove((wOwner, wCbl)) | |
return | |
raise ValueError("Callable not found!") | |
def emit(self, *args): | |
if len(args) != len(self._call_types): | |
raise ValueError("Invalid Args, must be %s" % self._call_types) | |
for i, arg in enumerate(args): | |
if not isinstance(arg, self._call_types[i]): | |
raise ValueError( | |
"Invalid Arg %s, must be %s" % (arg, self._call_types[i]) | |
) | |
for wOwner, wCbl in self._connections: | |
cbl = wCbl() | |
owner = wOwner() if wOwner else None | |
if cbl: | |
if owner: | |
cbl(owner, *args) | |
else: | |
cbl(*args) | |
else: | |
print("Object may have been deleted") | |
# ------------------------------------------------------------------------------------------------------------ # | |
# --------------------------------------------- HTTP Slowdown code-------------------------------------------- # | |
# --------------------------------------------- This is a fake Api ------------------------------------------- # | |
# ------------------------------------------------------------------------------------------------------------ # | |
import random | |
import time | |
class WebApi(object): | |
def request(self, filename): | |
random.seed(filename) | |
delay = random.randint(1, 30000)/1000.0 | |
time.sleep(delay) | |
def ApiConnect(username): | |
random.seed(username) | |
delay = random.randint(1, 3) | |
time.sleep(delay) | |
return WebApi() | |
# ------------------------------------------------------------------------------------------------------------ # | |
# --------------------------------------------- Client Side code --------------------------------------------- # | |
# ------------------------------------------------------------------------------------------------------------ # | |
import time | |
import urllib2 | |
import datetime | |
from threading import Thread | |
from queue import Queue | |
RUNNING = False | |
class WebWorker(Thread): | |
RESPONSE_READY = Signal(str, str, float) | |
def __init__(self, queue): | |
super(WebWorker, self).__init__() | |
self.queue = queue | |
def do_dl(self, url): | |
s = time.time() | |
Api = ApiConnect("ahughes:"+url+str(id(self))) | |
Api.request(url) | |
response = urllib2.urlopen(url) | |
html = response.read() | |
e = time.time() | |
self.RESPONSE_READY.emit(str(url), html, float(e-s)) | |
def run(self): | |
while True: | |
url = self.queue.get() | |
try: | |
self.do_dl(url) | |
except: | |
self.RESPONSE_READY.emit(str(url), "", 0.0) | |
finally: | |
self.queue.task_done() | |
class HttpManager(object): | |
def __init__(self, url_list): | |
self.readers = [] | |
self.urls = [] | |
self._queue = Queue() | |
for url in url_list: | |
self.urls.append(url) | |
def execute(self): | |
global RUNNING | |
RUNNING = True | |
for url in self.urls: | |
self._queue.put(url) | |
worker = WebWorker(self._queue) | |
# Setting daemon to True will let the main thread exit even though the workers are blocking | |
worker.daemon = True | |
worker.RESPONSE_READY.connect(self.print_data) | |
self.readers.append(worker) | |
for reader in self.readers: | |
reader.start() | |
def print_data(self, url, html, delta): | |
global RUNNING | |
if self.urls: | |
if url in self.urls: | |
self.urls.pop(self.urls.index(url)) | |
if not self.urls: | |
RUNNING = False | |
print("Response took %f from %s is %d characters long." % (delta, url, len(html))) | |
else: | |
RUNNING = False | |
h = HttpManager( | |
[ | |
"http://www.nytimes.com", | |
"http://knowledge.autodesk.com/support/maya/getting-started/caas/simplecontent/content/maya-documentation.html", | |
"http://google.ca", | |
"http://placehold.it/350x15000", | |
"http://www.iana.org/domains/reserved", | |
"http://example.com", | |
"http://thecatapi.com/", | |
"http://placekitten.com/", | |
"http://placehold.it/350x150", | |
"http://www.webservicex.com/globalweather.asmx?wsdl", | |
"http://example.org", | |
"http://www.example.net", | |
"http://developer.salesforce.com/page/How_to_Write_Good_Unit_Tests" | |
] | |
) | |
h.execute() | |
print("Exitted Execute!") | |
while RUNNING: | |
print("%s: Ping!" % datetime.datetime.now()) | |
time.sleep(0.25) |
In the signal.emit method, the weak reference dies when class methods are used as callbacks
Interesting, when you say callbacks, do you mean the slots that you connect the signal to?
Would you be able to reply with a simple code sample of the bug?
Would you be able to reply with a simple code sample of the bug?
from signal import Signal
class Component:
Sig_testSignal = Signal(str)
def __init__(self):
self.Sig_testSignal.connect(self.slot)
self.emitSignal("foobar")
def emitSignal(self, data: str):
self.Sig_testSignal.emit(data)
print(f"[ INFO ] Send: {data}")
def slot(self, data: str):
print(f"[ INFO ] Recieved: {data}")
if __name__ == "__main__":
_ = Component()
file signal.py
contains the implementation of the Signal class.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
In the signal.emit method, the weak reference dies when class methods are used as callbacks