Skip to content

Instantly share code, notes, and snippets.

@scottstanie
Last active December 1, 2022 18:24
Show Gist options
  • Save scottstanie/867d4d8d5bba508bbc8d528d1d8b52b6 to your computer and use it in GitHub Desktop.
Save scottstanie/867d4d8d5bba508bbc8d528d1d8b52b6 to your computer and use it in GitHub Desktop.
Dummy implementation of a background loading thread which queues up chunks of data to be processed
import time
from queue import Empty, Queue
from random import random
from threading import Event, Thread
def _get_data(j, load_time=0.5):
"""Simulate data loading taking some amount of time."""
time.sleep(load_time)
# adding j just to differentiate the data
return [j + random(), j + random()]
class EagerLoader:
"""Class to load chunks of data in a separate thread."""
def __init__(self, max_queue_size=1):
# Input queue of requests to load slices
self._slice_queue = Queue()
# Output queue of loaded slices
self._data_queue = Queue()
self._max_data_queue_size = max_queue_size
self.finished_event = Event()
self.thread = Thread(target=self._consume_queue)
self.thread.start()
def _consume_queue(self):
"""Load data as slice requests come in."""
while not self.finished_event.is_set():
# If we've loaded all the data but it hasn't been popped,
# wait for it to be popped before loading next one
if self._data_queue.qsize() >= self._max_data_queue_size:
continue
print(f"{self._data_queue.qsize() = }, {self._max_data_queue_size = }")
try:
# Get a slice request
j = self._slice_queue.get(timeout=1)
except Empty:
continue
if j is None:
# If we get a None, we're done
print("done!")
self.notify_finished()
break
# Load the data for the slice
print("loader: Loading data for slice %d" % j)
data = _get_data(j)
# Put the data in the output queue, along with the slice requested
# in case we need to write to a specific part of the output array
self._data_queue.put((j, data))
print("loader thread: Finished loading data, shutting down")
def request_slice(self, s):
"""Request a slice to be loaded."""
self._slice_queue.put(s)
def get_data(self):
"""Get data from the queue."""
return self._data_queue.get()
def has_data_ready(self):
return self._data_queue.qsize() > 0
def notify_finished(self):
self.finished_event.set()
def is_finished(self):
return self.finished_event.is_set()
def shutdown(self):
self.notify_finished()
self.thread.join()
def __del__(self):
self.shutdown()
def main():
# Dummy requests
slice_requests = [1, 2, 3, 4, 5]
max_queue_size = 4
# Create the loader
# Will really have the filename and other args here
loader = EagerLoader(max_queue_size=max_queue_size)
print(f"main: starting data loader {loader}")
for s in slice_requests:
# Put the slice request in the input queue
loader.request_slice(s)
loader.request_slice(None)
print("Simulating data processing")
while not loader.is_finished() or loader.has_data_ready():
if not loader.has_data_ready():
print("Waiting on data to load...")
time.sleep(0.05)
continue
s, data = loader.get_data()
print(f"main: processing data for window {s}, data {data}")
# Simulate data processing that takes a second
time.sleep(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment