Skip to content

Instantly share code, notes, and snippets.

@bbelderbos
Last active June 1, 2026 12:49
Show Gist options
  • Select an option

  • Save bbelderbos/a9e3e438a201b28935bc9d2d174495ca to your computer and use it in GitHub Desktop.

Select an option

Save bbelderbos/a9e3e438a201b28935bc9d2d174495ca to your computer and use it in GitHub Desktop.
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "python-telegram-bot>=21",
# "openai>=1.40",
# "httpx",
# "python-decouple",
# "pydantic",
# ]
# ///
import json
import logging
from pathlib import Path
from typing import Literal, Protocol
import httpx
from decouple import config
from openai import AsyncOpenAI
from pydantic import BaseModel, HttpUrl
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Message, Update
from telegram.ext import (
Application,
CallbackQueryHandler,
CommandHandler,
ContextTypes,
)
logger = logging.getLogger(__name__)
TAGS = ["read", "lib", "tool", "skip"]
DEFAULT_TOPIC = "rust"
READING_LIST = Path("reading_list.jsonl")
LOBSTERS_FEED = "https://lobste.rs/t/{tag}.json"
SYSTEM = (
"Tag this software/tech headline with one of: "
"read (an article, post, or tutorial), "
"lib (a library, framework, or package you import), "
"tool (a CLI, app, or utility you run). "
"Use 'skip' only if it is off-topic or clickbait."
)
class Story(BaseModel):
title: str
url: HttpUrl
class TagChoice(BaseModel):
tag: Literal["read", "lib", "tool", "skip"]
class Classifier(Protocol):
async def tag(self, story: Story) -> str: ...
class OpenAIClassifier:
def __init__(self, api_key: str, model: str = "gpt-4o-mini") -> None:
self._client = AsyncOpenAI(api_key=api_key)
self._model = model
async def tag(self, story: Story) -> str:
completion = await self._client.beta.chat.completions.parse(
model=self._model,
messages=[
{"role": "system", "content": SYSTEM},
{"role": "user", "content": story.title},
],
response_format=TagChoice,
)
choice = completion.choices[0].message.parsed
return choice.tag if choice else "skip"
def _build_classifier() -> Classifier:
return OpenAIClassifier(config("OPENAI_API_KEY"))
async def fetch_stories(tag: str, *, limit: int = 5) -> list[Story]:
async with httpx.AsyncClient(
timeout=10, headers={"User-Agent": "trend-triage-bot"}
) as http:
response = await http.get(LOBSTERS_FEED.format(tag=tag))
response.raise_for_status()
return [
Story(
title=story["title"],
url=story["url"] or f"https://lobste.rs/s/{story['short_id']}",
)
for story in response.json()[:limit]
if story.get("title")
]
def save_to_reading_list(story: Story, tag: str) -> None:
with READING_LIST.open("a") as f:
f.write(json.dumps({"tag": tag, **story.model_dump(mode="json")}) + "\n")
def triage_keyboard(suggested: str) -> InlineKeyboardMarkup:
buttons = [
InlineKeyboardButton(
f">> {tag}" if tag == suggested else tag,
callback_data=f"tag:{tag}",
)
for tag in TAGS
]
rows = [buttons[i : i + 3] for i in range(0, len(buttons), 3)]
return InlineKeyboardMarkup(rows)
async def start_digest(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if update.message is None:
return
topic = context.args[0].lower() if context.args else DEFAULT_TOPIC
await update.message.reply_text(f"Fetching today's {topic} stories...")
try:
context.user_data["queue"] = await fetch_stories(topic)
except httpx.HTTPStatusError:
await update.message.reply_text(
f"No feed for '{topic}'. Try a Lobsters tag like rust, python, or go."
)
return
except httpx.RequestError:
await update.message.reply_text(
"Couldn't reach Lobsters right now — try again in a bit."
)
return
await show_next(update.message, context)
async def show_next(message: Message, context: ContextTypes.DEFAULT_TYPE) -> None:
queue: list[Story] = context.user_data.get("queue", [])
if not queue:
await message.reply_text("Inbox zero. That's all the trends today.")
return
story = queue[0]
suggested = await context.bot_data["classifier"].tag(story)
await message.reply_text(
f"{story.title}\n{story.url}",
reply_markup=triage_keyboard(suggested),
)
async def on_tag(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
query = update.callback_query
if query is None or query.data is None:
return
await query.answer() # 1. stop the spinner, first thing
_, tag = query.data.split(":", 1) # 2. "tag:read" -> "read"
queue = context.user_data.get("queue", [])
if not queue: # stale button after a restart
await query.edit_message_text("Session expired, send /digest again.")
return
story = queue.pop(0)
if tag != "skip":
save_to_reading_list(story, tag) # the human's final say
await query.edit_message_text( # 3. edit, don't reply
f"Filed under {tag}: {story.title}"
if tag != "skip"
else f"Skipped: {story.title}"
)
await show_next(query.message, context)
async def on_error(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
logger.exception("Handler failed", exc_info=context.error)
def main() -> None:
logging.basicConfig(
format="%(asctime)s %(name)s %(levelname)s %(message)s",
level=logging.INFO,
)
logger.info("Starting trend triage bot, polling for updates")
app = Application.builder().token(config("TELEGRAM_BOT_TOKEN")).build()
app.bot_data["classifier"] = _build_classifier()
app.add_handler(CommandHandler("digest", start_digest))
app.add_handler(CallbackQueryHandler(on_tag, pattern="^tag:"))
app.add_error_handler(on_error)
app.run_polling()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment