Skip to content

Instantly share code, notes, and snippets.

@antont
Last active October 6, 2023 10:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save antont/237c23b3edf35f1a9861727c52cfb3fe to your computer and use it in GitHub Desktop.
Save antont/237c23b3edf35f1a9861727c52cfb3fe to your computer and use it in GitHub Desktop.
Python thread to run async tasks in the background, in a single eventloop & thread
import threading
import asyncio
from queue import Queue
import time
from typing import Awaitable
import flask
from firebase_functions.private import serving
concurrent_handler_id = 0
runner = None #set in init()
"""
NOTE: see this for hints maybe:
https://www.googlecloudcommunity.com/gc/Serverless/The-issue-with-pythons-s-threading-on-Google-Function/m-p/610388
"""
def run_once(loop: asyncio.AbstractEventLoop):
loop.call_later(0.1, loop.stop) #runs for 0.1s, before we check for new jobs
#this is more efficient for the tasks, yet responsive enough to not spawn more instances
#loop.call_soon(loop.stop)
loop.run_forever()
class TaskRunnerThread(threading.Thread):
def __init__(self, in_flask=True):
super().__init__()
print("[TaskRunnerThread] init")
self.loop = asyncio.new_event_loop()
self.queue = Queue()
self.tasks: set[Awaitable] = set()
self.in_flask = in_flask
self.did_jobs = False
def run(self):
print("[TaskRunnerThread] run")
# if self.in_flask:
# print("Flask:", flask.current_app)
# flask.current_app.teardown_request_funcs.setdefault(None, []).append(self.finish)
while not self.did_jobs or len(self.tasks) > 0:
if self.queue.qsize() > 0:
job = self.queue.get(block=True)
print("[TaskRunnerThread] Creating task for", job)
task = self.loop.create_task(job)
task.add_done_callback(self.tasks.remove)
self.tasks.add(task)
self.did_jobs = True
run_once(self.loop)
print("[TaskRunnerThread] run Is DONE!")
stop(self)
def add_job(self, job):
print("[TaskRunnerThread] add_job", job)
self.queue.put(job)
#TODO: this would require calling via queue to run in correct thread
# def finish(self):
# print("[TaskRunnerThread] finish")
# for task in self.tasks:
# self.loop.run_until_complete(task)
def init():
global runner
runner = TaskRunnerThread()
serving.runner = runner
runner.start()
def stop(done_runner):
global runner
assert done_runner is runner
runner = None
serving.runner = None
async def jobtest(name):
for i in range(10):
print(f"{name} @{i}")
await asyncio.sleep(1)
if __name__ == '__main__':
runner = TaskRunnerThread(False) #testing outside Flask
runner.start()
runner.add_job(jobtest('A'))
time.sleep(2)
runner.add_job(jobtest('B'))
time.sleep(5)
#runner.finish()
runner.join()
"""whoa this works :o
i functions: Loaded environment variables from .env.*****
> [TaskRunnerThread] init
> [TaskRunnerThread] run
> [TaskRunnerThread] add_job <coroutine object do_concurrent at 0x10b189640>
i functions: Beginning execution of "us-central1-test_concurrent"
> [TaskRunnerThread] Creating task for <coroutine object do_concurrent at 0x10b189640>
> [TaskRunnerThread] add_job <coroutine object do_concurrent at 0x10b18aa40>
> [TaskRunnerThread] Creating task for <coroutine object do_concurrent at 0x10b18aa40>
i functions: Finished "us-central1-test_concurrent" in 2.017917ms
i functions: Beginning execution of "us-central1-test_concurrent"
> [TaskRunnerThread] add_job <coroutine object do_concurrent at 0x10b18b040>
i functions: Finished "us-central1-test_concurrent" in 2.567125ms
> [TaskRunnerThread] Creating task for <coroutine object do_concurrent at 0x10b18b040>
> [do_concurrent] 1: sleeping at 0 in 22728
> [do_concurrent] 2: sleeping at 0 in 22728
> [do_concurrent] 3: sleeping at 0 in 22728
> [do_concurrent] 1: sleeping at 1 in 22728
> [do_concurrent] 2: sleeping at 1 in 22728
i functions: Beginning execution of "us-central1-test_concurrent"
> [TaskRunnerThread] add_job <coroutine object do_concurrent at 0x10b189f40>
i functions: Finished "us-central1-test_concurrent" in 1.852792ms
> [TaskRunnerThread] Creating task for <coroutine object do_concurrent at 0x10b189f40>
> [do_concurrent] 3: sleeping at 1 in 22728
> [do_concurrent] 1: sleeping at 2 in 22728
> [do_concurrent] 2: sleeping at 2 in 22728
> [do_concurrent] 4: sleeping at 0 in 22728
> [do_concurrent] 3: sleeping at 2 in 22728
> [do_concurrent] 1: sleeping at 3 in 22728
> [do_concurrent] 2: sleeping at 3 in 22728
> [do_concurrent] 4: sleeping at 1 in 22728
> [do_concurrent] 3: sleeping at 3 in 22728
> [do_concurrent] 1: sleeping at 4 in 22728
> [do_concurrent] 2: sleeping at 4 in 22728
> [do_concurrent] 4: sleeping at 2 in 22728
> [do_concurrent] 3: sleeping at 4 in 22728
> [do_concurrent] 1: sleeping at 5 in 22728
> [do_concurrent] 2: sleeping at 5 in 22728
> [do_concurrent] 4: sleeping at 3 in 22728
> [do_concurrent] 3: sleeping at 5 in 22728
> [do_concurrent] 1: sleeping at 6 in 22728
> [do_concurrent] 2: sleeping at 6 in 22728
> [do_concurrent] 4: sleeping at 4 in 22728
> [do_concurrent] 3: sleeping at 6 in 22728
> [do_concurrent] 1: sleeping at 7 in 22728
> [do_concurrent] 2: sleeping at 7 in 22728
> [do_concurrent] 4: sleeping at 5 in 22728
> [do_concurrent] 3: sleeping at 7 in 22728
> [do_concurrent] 1: sleeping at 8 in 22728
> [do_concurrent] 2: sleeping at 8 in 22728
> [do_concurrent] 4: sleeping at 6 in 22728
> [do_concurrent] 3: sleeping at 8 in 22728
> [do_concurrent] 1: sleeping at 9 in 22728
> [do_concurrent] 2: sleeping at 9 in 22728
> [do_concurrent] 4: sleeping at 7 in 22728
> [do_concurrent] 3: sleeping at 9 in 22728
> [do_concurrent] 4: sleeping at 8 in 22728
> [do_concurrent] 4: sleeping at 9 in 22728
> [TaskRunnerThread] run Is DONE!
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment