Skip to content

Instantly share code, notes, and snippets.

@miholeus
Last active September 28, 2022 05:53
Show Gist options
  • Save miholeus/8b2c2ad1e58692d02824940cfadcb0a6 to your computer and use it in GitHub Desktop.
Save miholeus/8b2c2ad1e58692d02824940cfadcb0a6 to your computer and use it in GitHub Desktop.
event manager using python
# coding: utf-8
import copy
from collections import defaultdict
try:
import Queue as Q # ver. < 3.0
except ImportError:
import queue as Q
class Event(object):
def __init__(self, name = None, target=None, params=None):
if params is None:
params = []
self._name = name
self._target = target
self._params = params
self._stop_propagation = False
@property
def name(self):
return self._name
@name.setter
def name(self, value):
self._name = value
@property
def target(self):
return self._target
@target.setter
def target(self, value):
self._target = value
@property
def params(self):
return self._params
@params.setter
def params(self, value):
assert isinstance(value, list)
self._params = value
def get_param(self, name, default=None):
return self._params.get(name, default)
def set_param(self, name, value):
self._params[name] = value
def stop_propagation(self, flag=True):
self._stop_propagation = flag
def propagation_is_stopped(self):
return self._stop_propagation
def copy(self):
event = Event(self.name, self.target, self.params)
return event
__copy__ = copy
class EventManager(object):
def __init__(self):
self.event_prototype = Event()
self.events = defaultdict(Q.PriorityQueue)
def set_event_prototype(self, event):
"""
Sets event prototype
Args:
event (Event): event instance
"""
self.event_prototype = event
def trigger(self, event_name, target=None, args=None):
event = copy.copy(self.event_prototype)
event.name = event_name
if target:
event.target = target
if args:
event.params = args
return self.trigger_listeners(event)
def attach(self, event_name, listener, priority=1):
"""
Attaches new listener to event
Args:
event_name (str): event's name
listener: callback listener for event
priority (int): priority for listener
"""
if not isinstance(event_name, str):
raise RuntimeError("{} expects a string for event; received {}".format(
self.attach.__name__, type(event_name)
))
self.events[event_name].put((priority, listener))
def detach(self, listener, event_name=None, force=False):
"""
Detaches listener
Args:
listener: callback listener for event
event_name (str): event's name
force (bool): if set to True, then given listener for event will be detached
"""
# if event is wildcard, we need to iterate through each listeners
if not event_name or ('*' == event_name and not force):
for name in self.events.keys():
self.detach(listener, name, True)
if not isinstance(event_name, str):
raise RuntimeError("{} expects a string for event; received {}".format(
self.attach.__name__, type(event_name)
))
queue = self.events[event_name]
new_queue = Q.PriorityQueue()
size = queue.qsize()
while size > 0:
evaluated_listener = queue.get()
if evaluated_listener[1] != listener:
# put back to new queue
new_queue.put(evaluated_listener)
size -= 1
queue = new_queue
# check if queue is empty
if queue.qsize() == 0:
del self.events[event_name]
def trigger_listeners(self, event):
"""
Triggers listeners of selected event
Args:
event (Event): event instance
Returns:
ResponseCollection
"""
name = event.name
if not name:
raise RuntimeError("Event is missing a name; cannot trigger")
# @todo some staff here
def clear_listeners(self, event_name):
"""
Clears all listeners for given event
Args:
event_name (str): event's name
"""
if event_name in self.events:
del self.events[event_name]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment