Skip to content

Instantly share code, notes, and snippets.

@codecats
Created March 24, 2017 23:04
Show Gist options
  • Save codecats/a05418370c132c401bbcea82c4412902 to your computer and use it in GitHub Desktop.
Save codecats/a05418370c132c401bbcea82c4412902 to your computer and use it in GitHub Desktop.
#this is an example daemon thread we use for sending events out to firebase. We dont care to much about the consistency of data on firebase.
class FirebaseEventSender(threading.Thread):
@ThreadAutoRestartCachedClassProperty
def _instance(cls) -> 'FirebaseEventSender':
lc = FirebaseEventSender()
lc.start()
return lc
@classmethod
def schedule(cls, tasks: List[Callable], firebase: 'Firebase'):
try:
for task in tasks:
cls._instance.events_to_send.put((task, firebase), block=False)
except Full:
pass
def __init__(self):
super().__init__(daemon=True)
self.tasks = []
self.events_to_send = Queue()
self.terminate_flag = threading.Event()
@terminating_method
def wait_on_task(self):
event = self.events_to_send.get()
self.tasks.append(event)
@terminating_method
def get_tasks(self):
try:
while True:
self.tasks.append(self.events_to_send.get_nowait())
except Empty:
pass
@terminating_method
def poll(self):
if not self.tasks:
self.wait_on_task()
self.get_tasks()
self.send()
self.tasks = []
@terminating_method
def send(self):
for task in self.tasks:
func, firebase = task
func(firebase)
def start(self):
self.terminate_flag = threading.Event()
super().start()
def terminate(self):
if self.is_alive():
self.terminate_flag.set()
self.join()
def run(self):
from django.conf import settings
while True:
self.poll()
if not self.tasks:
if settings.IS_UNIT_TESTING:
self.terminate_flag.set()
time.sleep(0.1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment