Skip to content

Instantly share code, notes, and snippets.

@ngetahun
Forked from asottile/queueing.py
Created November 3, 2020 10:44
Show Gist options
  • Save ngetahun/1499e5adf8d49cc6798f37525eae9206 to your computer and use it in GitHub Desktop.
Save ngetahun/1499e5adf8d49cc6798f37525eae9206 to your computer and use it in GitHub Desktop.
really bad queue simulating sqs backed by sqlite
import contextlib
import fcntl
import json
import sqlite3
import time
import uuid
from typing import Any
from typing import Generator
from typing import Optional
from typing import Tuple
@contextlib.contextmanager
def lock() -> Generator[None, None, None]:
with open('queue.lock', 'a+') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
try:
yield
finally:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
def _XXX_init_schema(db: sqlite3.Connection) -> None:
db.execute(
'CREATE TABLE IF NOT EXISTS queue_items ('
' queue_name TEXT NOT NULL,'
' group_key TEXT NOT NULL,'
' data_json TEXT NOT NULL,'
' visibility_time_end INT NULL,'
' receipt_handle TEXT NULL'
');',
)
@contextlib.contextmanager
def _db() -> Generator[sqlite3.Connection, None, None]:
with lock():
with contextlib.closing(sqlite3.connect('queue.sqlite')) as ctx:
with ctx as db:
_XXX_init_schema(db)
yield db
def enqueue(queue_name: str, group_key: str, data: Any) -> None:
with _db() as db:
db.execute(
'INSERT INTO queue_items VALUES (?, ?, ?, NULL, NULL)',
(queue_name, group_key, json.dumps(data)),
)
def enqueue_with_delay(
queue_name: str,
group_key: str,
data: Any,
*,
delay: int,
) -> None:
time_end = int(time.time() + delay)
with _db() as db:
db.execute(
'INSERT INTO queue_items VALUES (?, ?, ?, ?, NULL)',
(queue_name, group_key, json.dumps(data), time_end),
)
def long_poll(queue_name: str, timeout: int) -> Optional[Tuple[str, Any]]:
receipt_query = (
'UPDATE queue_items '
'SET receipt_handle = ?, visibility_time_end = ? '
'WHERE ROWID = ?'
)
end = time.time() + timeout
while time.time() < end:
with _db() as db:
queued = db.execute(
'SELECT ROWID, group_key, data_json, visibility_time_end '
'FROM queue_items '
'WHERE queue_name = ? '
'ORDER BY ROWID ASC',
(queue_name,),
)
seen_group_keys = set()
for rowid, group_key, data_json, time_end in queued:
if group_key in seen_group_keys:
continue
elif time_end is not None and time_end > time.time():
seen_group_keys.add(group_key)
continue
new_handle = uuid.uuid4().hex
# TODO: update expiration time based on queue
new_end_time = time.time() + 120
db.execute(receipt_query, (new_handle, new_end_time, rowid))
return new_handle, json.loads(data_json)
time.sleep(.1)
return None
def mark_completed(queue_name: str, receipt_handle: str) -> None:
query = (
'DELETE FROM queue_items '
'WHERE queue_name = ? AND receipt_handle = ?'
)
with _db() as db:
db.execute(query, (queue_name, receipt_handle))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment