Skip to content

Instantly share code, notes, and snippets.

@mfa

mfa/00readme.md Secret

Created December 16, 2023 17:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mfa/1977381ef7f57c7884098c7dee233295 to your computer and use it in GitHub Desktop.
Save mfa/1977381ef7f57c7884098c7dee233295 to your computer and use it in GitHub Desktop.
FastApi + psycopg3 + PGMQ

FastApi + psycopg3 + PGMQ

This code is a proof of concept and has no guardrails!

Idea: use postgresql instance with pgmq installed on fly.io.

Requires:

  • running pgmq instance (mine is ams.empty-dream-2474.internal)
  • postgresql password

Deployment (needs changes to fly.toml and main.py file if you are not @mfa)

fly secrets set POSTGRES_PASSWORD=FIXME
fly deploy
# fly.toml app configuration file generated for floral-snow-9544 on 2023-12-16T14:13:21+01:00
#
# See https://fly.io/docs/reference/configuration/ for information about how to use this file.
#
app = "floral-snow-9544"
primary_region = "ams"
[build]
builtin = "python"
[http_service]
internal_port = 8000
force_https = true
auto_stop_machines = true
auto_start_machines = true
min_machines_running = 0
processes = ["app"]
import json
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from psycopg import sql
from psycopg.types.json import Jsonb
from psycopg_pool import AsyncConnectionPool
def get_conn_str():
return f"""
dbname=postgres
user=postgres
password={os.getenv('POSTGRES_PASSWORD')}
host=ams.empty-dream-2474.internal
port=5432
"""
@asynccontextmanager
async def lifespan(app: FastAPI):
app.async_pool = AsyncConnectionPool(conninfo=get_conn_str())
yield
await app.async_pool.close()
app = FastAPI(lifespan=lifespan)
@app.get("/init")
async def init(request: Request):
async with request.app.async_pool.connection() as conn:
async with conn.cursor() as cur:
# this needs to be run once to create the queue
await cur.execute(
"""
CREATE EXTENSION IF NOT EXISTS pgmq;
SELECT pgmq.create('queue0');
"""
)
return True
@app.get("/send")
async def send(request: Request, op: str, msg: str):
async with request.app.async_pool.connection() as conn:
async with conn.cursor() as cur:
await cur.execute(
sql.Composed(
[
sql.SQL("SELECT * from "),
sql.SQL("pgmq.send('queue0', '"),
sql.SQL(json.dumps({op: msg})), # there has to be a better way!
sql.SQL("')"),
]
)
)
results = await cur.fetchall()
return results
@app.get("/pop")
async def pop(request: Request):
async with request.app.async_pool.connection() as conn:
async with conn.cursor() as cur:
# pop one message from the queue
await cur.execute("SELECT pgmq.pop('queue0')")
results = await cur.fetchall()
return results
@app.get("/")
def index():
return {}
web: uvicorn main:app --host 0.0.0.0
fastapi[uvicorn]
psycopg[binary,pool]
uvicorn
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment