Skip to content

Instantly share code, notes, and snippets.

from PyQt4 import Qt
def Signal(name, *args):
def get_signal(self):
if not hasattr(self, '_signallers'):
self._signallers = {}
if (name, args) not in self._signallers:
signal = Qt.pyqtSignal(*args, name=name)
signaller_type = type('Signaller', (Qt.QObject,), {name: signal})
self._signallers[name, args] = signaller_type()
@vxgmichel
vxgmichel / devicetest_with_events.py
Created May 11, 2016 08:53
Test file for using events with devicetest
from time import sleep
from devicetest import TangoTestContext
from PyTango import utils, EventType, DevState
from PyTango.server import Device, DeviceMeta, command, run
class Dummy(Device):
__metaclass__ = DeviceMeta
def init_device(self):
self.set_change_event('State', True, True)
@vxgmichel
vxgmichel / test_pr305.py
Created August 1, 2016 16:17
Non-interruptible asyncio program after PR #305
import asyncio
async def background():
while running:
await asyncio.sleep(0)
[x**2 for x in range(10**5)]
async def main():
return await asyncio.sleep(20, result='hello')
@vxgmichel
vxgmichel / tango-db.service
Created September 5, 2016 13:03
tango-db systemd service
# It's not recommended to modify this file in-place, because it will be
# overwritten during package upgrades. If you want to customize, the
# best way is to create a file "/etc/systemd/system/tango-db.service",
# containing
# .include /lib/systemd/system/tango-db.service
# ...make your changes here...
# or create a file "/etc/systemd/system/tango-db.service.d/foo.conf",
# which doesn't need to include ".include" call and which will be parsed
# after the file tango-db.service itself is parsed.
#
@vxgmichel
vxgmichel / multitasking.py
Last active November 2, 2016 16:42
Multitasking example using coroutines
import itertools
import collections
def run(tasks):
# Prepare
results = {}
count = itertools.count()
queue = collections.deque()
for task in tasks:
@vxgmichel
vxgmichel / primes.py
Created November 22, 2016 08:48
Fast eratosthenes prime generator with a cached wheel
"""Fast eratosthenes prime generator with a cached wheel"""
import sys
import time
import operator
from itertools import cycle, chain, accumulate, islice, count
from functools import lru_cache, reduce, partial
from contextlib import contextmanager
CACHE_LEVEL = 5
@vxgmichel
vxgmichel / aiogo.py
Last active July 5, 2017 14:57
Go-style generators in asyncio
import asyncio
from functools import wraps
def gogenerator(aiterable=None, buffering=0):
def decorator(aiterable):
@wraps(aiterable)
def wrapper(*args, **kwargs):
return go(aiterable(*args, **kwargs), buffering)
return wrapper
@vxgmichel
vxgmichel / protect.py
Created September 29, 2017 16:30
Protecting against SIGINT using ThreadPoolExecutor
# Common imports
from time import sleep
# Synchronous imports
from concurrent.futures import ThreadPoolExecutor
# Asynchronous imports
# from gevent.threadpool import ThreadPoolExecutor
@vxgmichel
vxgmichel / qgeventdispatcher.py
Created February 13, 2018 11:38
Qt-gevent dispatcher
import sys
import gevent.event
from collections import defaultdict, deque
from PyQt5.QtWidgets import QApplication, QPushButton
from PyQt5.QtCore import QAbstractEventDispatcher, QTimer, QTimerEvent
class QGeventDispatcher(QAbstractEventDispatcher):
@vxgmichel
vxgmichel / binary_search.py
Last active April 18, 2018 13:24
A binary search based on bisect module
import bisect
import itertools
def binary_search(f, lower=None, upper=None, precision=1):
# Switch lower and upper
if lower is None and upper is not None:
return precision - binary_search(
lambda x: not f(-x), -upper, None, precision)