Last active
September 3, 2020 09:49
-
-
Save homm/b8caf60c11997da69b1e to your computer and use it in GitHub Desktop.
Memory-efficient task runner for threaded execution. Requires https://pypi.python.org/pypi/futures or python 3.2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.db import connections, DEFAULT_DB_ALIAS | |
def real_queryset_iterator(qs, pk='pk', chunk_size=5000): | |
qs = qs.order_by(pk) | |
if pk.startswith('-'): | |
pk = pk[1:] | |
lookup = pk + '__lt' | |
else: | |
lookup = pk + '__gt' | |
chunk = list(qs[:chunk_size]) | |
while chunk: | |
for item in chunk: | |
yield item | |
last_pk = getattr(chunk[-1], pk) | |
chunk = list(qs.filter(**{lookup: last_pk})[:chunk_size]) | |
def close_connection_iterator(iterator, db=DEFAULT_DB_ALIAS): | |
for item in iterator: | |
connections[db].close() | |
yield item |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def task_queue(task, iterator, concurrency=10, on_fail=lambda _: None): | |
def submit(): | |
try: | |
obj = next(iterator) | |
except StopIteration: | |
return | |
if result.cancelled(): | |
return | |
stats['delayed'] += 1 | |
future = executor.submit(task, obj) | |
future.obj = obj | |
future.add_done_callback(upload_done) | |
def upload_done(future): | |
with io_lock: | |
submit() | |
stats['delayed'] -= 1 | |
stats['done'] += 1 | |
if future.exception(): | |
on_fail(future.exception(), future.obj) | |
if stats['delayed'] == 0: | |
result.set_result(stats) | |
def cleanup(_): | |
with io_lock: | |
executor.shutdown(wait=False) | |
from threading import RLock | |
from concurrent.futures import ThreadPoolExecutor, Future | |
io_lock = RLock() | |
executor = ThreadPoolExecutor(concurrency) | |
result = Future() | |
result.stats = stats = {'done': 0, 'delayed': 0} | |
result.add_done_callback(cleanup) | |
with io_lock: | |
for _ in range(concurrency): | |
submit() | |
return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment