Skip to content

Instantly share code, notes, and snippets.

@homm
Last active September 3, 2020 09:49
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save homm/b8caf60c11997da69b1e to your computer and use it in GitHub Desktop.
Save homm/b8caf60c11997da69b1e to your computer and use it in GitHub Desktop.
Memory-efficient task runner for threaded execution. Requires https://pypi.python.org/pypi/futures or python 3.2
from django.db import connections, DEFAULT_DB_ALIAS
def real_queryset_iterator(qs, pk='pk', chunk_size=5000):
qs = qs.order_by(pk)
if pk.startswith('-'):
pk = pk[1:]
lookup = pk + '__lt'
else:
lookup = pk + '__gt'
chunk = list(qs[:chunk_size])
while chunk:
for item in chunk:
yield item
last_pk = getattr(chunk[-1], pk)
chunk = list(qs.filter(**{lookup: last_pk})[:chunk_size])
def close_connection_iterator(iterator, db=DEFAULT_DB_ALIAS):
for item in iterator:
connections[db].close()
yield item
def task_queue(task, iterator, concurrency=10, on_fail=lambda _: None):
def submit():
try:
obj = next(iterator)
except StopIteration:
return
if result.cancelled():
return
stats['delayed'] += 1
future = executor.submit(task, obj)
future.obj = obj
future.add_done_callback(upload_done)
def upload_done(future):
with io_lock:
submit()
stats['delayed'] -= 1
stats['done'] += 1
if future.exception():
on_fail(future.exception(), future.obj)
if stats['delayed'] == 0:
result.set_result(stats)
def cleanup(_):
with io_lock:
executor.shutdown(wait=False)
from threading import RLock
from concurrent.futures import ThreadPoolExecutor, Future
io_lock = RLock()
executor = ThreadPoolExecutor(concurrency)
result = Future()
result.stats = stats = {'done': 0, 'delayed': 0}
result.add_done_callback(cleanup)
with io_lock:
for _ in range(concurrency):
submit()
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment