Skip to content

Instantly share code, notes, and snippets.

@s3rius
Last active April 26, 2024 10:58
Show Gist options
  • Save s3rius/f54301d28fe34159f24b7bb9c0d51939 to your computer and use it in GitHub Desktop.
Save s3rius/f54301d28fe34159f24b7bb9c0d51939 to your computer and use it in GitHub Desktop.
Dynamic schedule source for taskiq.

Here's an example of dynamic scheduler for taskiq. It uses postgres to store all tasks.

We created custom schedule source that is capable of storing and retrieving scheduled tasks.

Start workers by running:

taskiq worker tkq:broker --fs-discover

To run scheduler:

taskiq scheduler tkq:sched

Then you can add a task, by running

python main.py
import asyncio
from datetime import datetime, timedelta
from tasks import my_lovely_task
from tkq import db_source
async def main():
await db_source.startup()
await db_source.add_task(
my_lovely_task.kicker().with_labels(a=22, b=33),
datetime.utcnow() + timedelta(seconds=5),
a=int(datetime.utcnow().timestamp()),
)
await db_source.shutdown()
if __name__ == "__main__":
asyncio.run(main())
...
[tool.poetry.dependencies]
python = "^3.11"
taskiq = "^0.8.6"
taskiq-redis = "^0.4.0"
psycopg = {version = "^3.1.10", extras = ["pool"]}
...
import json
from datetime import datetime
from typing import Any, Coroutine, Dict, List
from psycopg.rows import class_row
from psycopg_pool import AsyncConnectionPool
from pydantic import BaseModel
from taskiq import ScheduleSource
from taskiq.kicker import AsyncKicker
from taskiq.scheduler.scheduler import ScheduledTask
from typing_extensions import ParamSpec
_PAR = ParamSpec("_PAR")
class DbSchedule(BaseModel):
id: int
task_name: str
args: List[Any]
kwargs: Dict[str, Any]
labels: Dict[str, Any]
time: datetime
class DbScheduleSource(ScheduleSource):
def __init__(self, db_url: str ) -> None:
self.pool = AsyncConnectionPool(db_url, open=False)
async def startup(self):
await self.pool.open()
async with self.pool.connection() as conn:
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS
taskiq_schedules(
id SERIAL PRIMARY KEY,
task_name TEXT NOT NULL,
args JSONB NOT NULL,
kwargs JSONB NOT NULL,
labels JSONB NOT NULL,
time TIMESTAMP NOT NULL
);
"""
)
async def shutdown(self) -> None:
await self.pool.close()
async def get_schedules(self) -> List[ScheduledTask]:
async with self.pool.connection() as conn:
async with conn.cursor(
binary=True,
row_factory=class_row(DbSchedule),
) as cur:
ret = await cur.execute("SELECT * FROM taskiq_schedules;")
tasks = await ret.fetchall()
schdedules = []
for task in tasks:
schdedules.append(
ScheduledTask(
source=self,
task_name=task.task_name,
args=task.args,
kwargs=task.kwargs,
labels={
"_sched_id": task.id,
**task.labels,
},
time=task.time,
)
)
return schdedules
async def add_task(
self,
task: AsyncKicker[_PAR, Any],
time: datetime,
*args: _PAR.args,
**kwargs: _PAR.kwargs,
) -> None:
async with self.pool.connection() as conn:
await conn.execute(
"""
INSERT INTO
taskiq_schedules(
task_name,
args,
kwargs,
labels,
time
)
VALUES (
%(name)s,
%(args)s,
%(kwargs)s,
%(labels)s,
%(time)s
);""",
{
"name": task.task_name,
"args": json.dumps(list(args)),
"kwargs": json.dumps(kwargs),
"labels": json.dumps(task.labels),
"time": time,
},
)
async def remove_schedule(self, schedule_id: int) -> None:
async with self.pool.connection() as conn:
await conn.execute(
"DELETE FROM taskiq_schedules WHERE id=%(id)s",
{
"id": schedule_id,
},
)
async def post_send(self, task: ScheduledTask) -> Coroutine[Any, Any, None] | None:
schedule_id = task.labels.get("_sched_id")
if schedule_id is None:
return
await self.remove_schedule(int(schedule_id))
from tkq import broker
@broker.task
async def my_lovely_task(a: int):
print("AAA", a)
from taskiq import TaskiqScheduler
from taskiq_redis import ListQueueBroker
from scheduler import DbScheduleSource
broker = ListQueueBroker("redis://localhost")
db_source = DbScheduleSource("postgresql://test:test@localhost:5432/test")
sched = TaskiqScheduler(
broker,
sources=[db_source],
refresh_delay=2,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment