Skip to content

Instantly share code, notes, and snippets.

@Ahuge
Last active November 5, 2020 05:57
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Ahuge/2175bc17354290cdd45f3e56ad279498 to your computer and use it in GitHub Desktop.
Save Ahuge/2175bc17354290cdd45f3e56ad279498 to your computer and use it in GitHub Desktop.
Basic Example of using a pure python Signal/Slot implementation talking between threads. Aims to feel like Qt's Signals.
# ------------------------------------------------------------------------------------------------------------ #
# --------------------------------------------- Signal Library ----------------------------------------------- #
# ------------------------------------------------------------------------------------------------------------ #
import weakref
class Signal(object):
def __init__(self, *types):
self._call_types = types
self._connections = set()
def connect(self, cbl):
wOwner = None
wCbl = weakref.ref(cbl)
try:
wCbl = weakref.ref(cbl.im_func)
wOwner = weakref.ref(cbl.im_self)
except AttributeError:
pass
self._connections.add(
(wOwner, wCbl)
)
def disconnect(self, cbl):
for wOwner, wCbl in self._connections:
if wCbl() is cbl or (hasattr(cbl, "im_func") and wCbl() is cbl.im_func):
self._connections.remove((wOwner, wCbl))
return
raise ValueError("Callable not found!")
def emit(self, *args):
if len(args) != len(self._call_types):
raise ValueError("Invalid Args, must be %s" % self._call_types)
for i, arg in enumerate(args):
if not isinstance(arg, self._call_types[i]):
raise ValueError(
"Invalid Arg %s, must be %s" % (arg, self._call_types[i])
)
for wOwner, wCbl in self._connections:
cbl = wCbl()
owner = wOwner() if wOwner else None
if cbl:
if owner:
cbl(owner, *args)
else:
cbl(*args)
else:
print("Object may have been deleted")
# ------------------------------------------------------------------------------------------------------------ #
# --------------------------------------------- HTTP Slowdown code-------------------------------------------- #
# --------------------------------------------- This is a fake Api ------------------------------------------- #
# ------------------------------------------------------------------------------------------------------------ #
import random
import time
class WebApi(object):
def request(self, filename):
random.seed(filename)
delay = random.randint(1, 30000)/1000.0
time.sleep(delay)
def ApiConnect(username):
random.seed(username)
delay = random.randint(1, 3)
time.sleep(delay)
return WebApi()
# ------------------------------------------------------------------------------------------------------------ #
# --------------------------------------------- Client Side code --------------------------------------------- #
# ------------------------------------------------------------------------------------------------------------ #
import time
import urllib2
import datetime
from threading import Thread
from queue import Queue
RUNNING = False
class WebWorker(Thread):
RESPONSE_READY = Signal(str, str, float)
def __init__(self, queue):
super(WebWorker, self).__init__()
self.queue = queue
def do_dl(self, url):
s = time.time()
Api = ApiConnect("ahughes:"+url+str(id(self)))
Api.request(url)
response = urllib2.urlopen(url)
html = response.read()
e = time.time()
self.RESPONSE_READY.emit(str(url), html, float(e-s))
def run(self):
while True:
url = self.queue.get()
try:
self.do_dl(url)
except:
self.RESPONSE_READY.emit(str(url), "", 0.0)
finally:
self.queue.task_done()
class HttpManager(object):
def __init__(self, url_list):
self.readers = []
self.urls = []
self._queue = Queue()
for url in url_list:
self.urls.append(url)
def execute(self):
global RUNNING
RUNNING = True
for url in self.urls:
self._queue.put(url)
worker = WebWorker(self._queue)
# Setting daemon to True will let the main thread exit even though the workers are blocking
worker.daemon = True
worker.RESPONSE_READY.connect(self.print_data)
self.readers.append(worker)
for reader in self.readers:
reader.start()
def print_data(self, url, html, delta):
global RUNNING
if self.urls:
if url in self.urls:
self.urls.pop(self.urls.index(url))
if not self.urls:
RUNNING = False
print("Response took %f from %s is %d characters long." % (delta, url, len(html)))
else:
RUNNING = False
h = HttpManager(
[
"http://www.nytimes.com",
"http://knowledge.autodesk.com/support/maya/getting-started/caas/simplecontent/content/maya-documentation.html",
"http://google.ca",
"http://placehold.it/350x15000",
"http://www.iana.org/domains/reserved",
"http://example.com",
"http://thecatapi.com/",
"http://placekitten.com/",
"http://placehold.it/350x150",
"http://www.webservicex.com/globalweather.asmx?wsdl",
"http://example.org",
"http://www.example.net",
"http://developer.salesforce.com/page/How_to_Write_Good_Unit_Tests"
]
)
h.execute()
print("Exitted Execute!")
while RUNNING:
print("%s: Ping!" % datetime.datetime.now())
time.sleep(0.25)
@winwinashwin
Copy link

In the signal.emit method, the weak reference dies when class methods are used as callbacks

@Ahuge
Copy link
Author

Ahuge commented Nov 4, 2020

In the signal.emit method, the weak reference dies when class methods are used as callbacks

Interesting, when you say callbacks, do you mean the slots that you connect the signal to?

Would you be able to reply with a simple code sample of the bug?

@winwinashwin
Copy link

Would you be able to reply with a simple code sample of the bug?

from signal import Signal


class Component:
    Sig_testSignal = Signal(str)

    def __init__(self):
        self.Sig_testSignal.connect(self.slot)

        self.emitSignal("foobar")

    def emitSignal(self, data: str):
        self.Sig_testSignal.emit(data)

        print(f"[ INFO ] Send: {data}")

    def slot(self, data: str):
        print(f"[ INFO ] Recieved: {data}")


if __name__ == "__main__":
    _ = Component()

file signal.py contains the implementation of the Signal class.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment