Skip to content

Instantly share code, notes, and snippets.

@raggesilver
Created May 27, 2018 01:57
Show Gist options
  • Save raggesilver/aa63fe46ea6b736a036f90ff36aa3c71 to your computer and use it in GitHub Desktop.
Save raggesilver/aa63fe46ea6b736a036f90ff36aa3c71 to your computer and use it in GitHub Desktop.
import os
import sys
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GObject
from gi.repository import GLib
GObject.threads_init()
import threading
import time
import random
import re
from signal import signal, SIGWINCH, SIGKILL, SIGTERM
from IPython.core.debugger import Tracer
from IPython.core import ultratb
sys.excepthook = ultratb.FormattedTB(mode='Verbose',
color_scheme='Linux',
call_pdb=True,
ostream=sys.__stdout__)
from colorlog import ColoredFormatter
import logging
from gettext import gettext as _
import traceback
from functools import wraps
def setup_logger():
"""Return a logger with a default ColoredFormatter."""
formatter = ColoredFormatter(
"(%(threadName)-9s) %(log_color)s%(levelname)-8s%(reset)s %(message_log_color)s%(message)s",
datefmt=None,
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red',
},
secondary_log_colors={
'message': {
'ERROR': 'red',
'CRITICAL': 'red',
'DEBUG': 'yellow'
}
},
style='%'
)
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return logger
def trace(func):
"""Tracing wrapper to log when function enter/exit happens.
:param func: Function to wrap
:type func: callable
"""
@wraps(func)
def wrapper(*args, **kwargs):
logger.debug('Start {!r}'. format(func.__name__))
result = func(*args, **kwargs)
logger.debug('End {!r}'. format(func.__name__))
return result
return wrapper
# Create a player
logger = setup_logger()
# Based on https://gist.github.com/bossjones/e21b53c6dff04e8fdb3d
class _IdleObject(GObject.GObject):
""" Override GObject.GObject to always emit signals in the main thread by
emmitting on an idle handler """
@trace
def __init__(self):
GObject.GObject.__init__(self)
@trace
def emit(self, *args):
GLib.idle_add(GObject.GObject.emit, self, *args)
class _IdeThread(threading.Thread, _IdleObject):
""" Cancellable thread that uses GObject signals to emit information to
the GUI on the main thread """
__gsignals__ = {
"completed": (
GObject.SignalFlags.RUN_LAST, None, []
),
"progress": (
GObject.SignalFlags.RUN_LAST, None, [GObject.TYPE_FLOAT])
}
@trace
def __init__(self, *args):
threading.Thread.__init__(self)
_IdleObject.__init__(self)
self.cancelled = False
self.fn = args[0] # Function to run in thread
self.name = args[1]
self.setName("%s" % self.name)
@trace
def cancel(self):
""" Python threads are not cancellable, so we must create our own
thread cancelling logic """
self.cancelled = True # Pretty logic HUH?
@trace
def run(self):
print("Running %s" % str(self))
self.fn() # Call the function
a = self.emit("completed") # Emit completed
print("Emitted %s" % a)
return False
class IdeThreadManager:
""" Docstrings for IdeThreadManager """
@trace
def __init__(self, maxThreads=os.cpu_count()):
self.maxThreads = maxThreads
self.threads = {}
self.pendingThreadArgs = []
@trace
def _register_thread_completed(self, thread, *args):
del(self.threads[args])
running = len(self.threads) - len(self.pendingThreadArgs)
print("%s completed. %s running, %s pending" % (
thread,
running,
len(self.pendingThreadArgs)
))
if running < self.maxThreads:
try:
args = self.pendingThreadArgs.pop()
print("Starting pending %s" % self.threads[args])
self.threads[args].start()
except IndexError:
pass
@trace
def make_thread(self, completeCallback, userData, *args):
running = len(self.threads) - len(self.pendingThreadArgs)
if args not in self.threads:
thread = _IdeThread(*args)
# signals run in the order connected. Connect the user completed
# run user callback first
# then delete the thread
thread.connect("completed", completeCallback, userData)
thread.connect("completed",
self._register_thread_completed,
*args)
self.threads[args] = thread
if running < self.maxThreads:
print("Starting %s" % thread)
self.threads[args].start()
else:
print("Queing %s" % thread)
self.pendingThreadArgs.append(args)
@trace
def stop_all_threads(self, block=False):
""" Stops all threads. If block is True then actually wait for the
thread to finish (may block the UI) """
for thread in self.threads.values():
thread.cancel()
if block and thread.isAlive():
thread.join() # wait until the thread actually finishes
@trace
def stop_thread(self, *args):
""" Blind trial to cancel a single specific thread """
if args in self.threads:
thread = self.threads[args]
print("Cancelling %s" % thread)
thread.cancel()
def thread_fn(*args):
print("A")
time.sleep(2)
print("AA")
@trace
def thread_callback(*args):
print("Callback called")
mg = IdeThreadManager()
mg.make_thread(
thread_callback,
None,
thread_fn,
"first"
)
# mg.make_thread(
# thread_callback,
# None,
# thread_fn,
# "second"
# )
# mg.make_thread(
# thread_callback,
# None,
# thread_fn,
# "third"
# )
# mg.make_thread(
# thread_callback,
# None,
# thread_fn,
# "fourth"
# )
# mg.make_thread(
# thread_callback,
# None,
# thread_fn,
# "fifth"
# )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment