Last active
June 1, 2026 12:49
-
-
Save bbelderbos/a9e3e438a201b28935bc9d2d174495ca to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.12" | |
| # dependencies = [ | |
| # "python-telegram-bot>=21", | |
| # "openai>=1.40", | |
| # "httpx", | |
| # "python-decouple", | |
| # "pydantic", | |
| # ] | |
| # /// | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from typing import Literal, Protocol | |
| import httpx | |
| from decouple import config | |
| from openai import AsyncOpenAI | |
| from pydantic import BaseModel, HttpUrl | |
| from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Message, Update | |
| from telegram.ext import ( | |
| Application, | |
| CallbackQueryHandler, | |
| CommandHandler, | |
| ContextTypes, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| TAGS = ["read", "lib", "tool", "skip"] | |
| DEFAULT_TOPIC = "rust" | |
| READING_LIST = Path("reading_list.jsonl") | |
| LOBSTERS_FEED = "https://lobste.rs/t/{tag}.json" | |
| SYSTEM = ( | |
| "Tag this software/tech headline with one of: " | |
| "read (an article, post, or tutorial), " | |
| "lib (a library, framework, or package you import), " | |
| "tool (a CLI, app, or utility you run). " | |
| "Use 'skip' only if it is off-topic or clickbait." | |
| ) | |
| class Story(BaseModel): | |
| title: str | |
| url: HttpUrl | |
| class TagChoice(BaseModel): | |
| tag: Literal["read", "lib", "tool", "skip"] | |
| class Classifier(Protocol): | |
| async def tag(self, story: Story) -> str: ... | |
| class OpenAIClassifier: | |
| def __init__(self, api_key: str, model: str = "gpt-4o-mini") -> None: | |
| self._client = AsyncOpenAI(api_key=api_key) | |
| self._model = model | |
| async def tag(self, story: Story) -> str: | |
| completion = await self._client.beta.chat.completions.parse( | |
| model=self._model, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM}, | |
| {"role": "user", "content": story.title}, | |
| ], | |
| response_format=TagChoice, | |
| ) | |
| choice = completion.choices[0].message.parsed | |
| return choice.tag if choice else "skip" | |
| def _build_classifier() -> Classifier: | |
| return OpenAIClassifier(config("OPENAI_API_KEY")) | |
| async def fetch_stories(tag: str, *, limit: int = 5) -> list[Story]: | |
| async with httpx.AsyncClient( | |
| timeout=10, headers={"User-Agent": "trend-triage-bot"} | |
| ) as http: | |
| response = await http.get(LOBSTERS_FEED.format(tag=tag)) | |
| response.raise_for_status() | |
| return [ | |
| Story( | |
| title=story["title"], | |
| url=story["url"] or f"https://lobste.rs/s/{story['short_id']}", | |
| ) | |
| for story in response.json()[:limit] | |
| if story.get("title") | |
| ] | |
| def save_to_reading_list(story: Story, tag: str) -> None: | |
| with READING_LIST.open("a") as f: | |
| f.write(json.dumps({"tag": tag, **story.model_dump(mode="json")}) + "\n") | |
| def triage_keyboard(suggested: str) -> InlineKeyboardMarkup: | |
| buttons = [ | |
| InlineKeyboardButton( | |
| f">> {tag}" if tag == suggested else tag, | |
| callback_data=f"tag:{tag}", | |
| ) | |
| for tag in TAGS | |
| ] | |
| rows = [buttons[i : i + 3] for i in range(0, len(buttons), 3)] | |
| return InlineKeyboardMarkup(rows) | |
| async def start_digest(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
| if update.message is None: | |
| return | |
| topic = context.args[0].lower() if context.args else DEFAULT_TOPIC | |
| await update.message.reply_text(f"Fetching today's {topic} stories...") | |
| try: | |
| context.user_data["queue"] = await fetch_stories(topic) | |
| except httpx.HTTPStatusError: | |
| await update.message.reply_text( | |
| f"No feed for '{topic}'. Try a Lobsters tag like rust, python, or go." | |
| ) | |
| return | |
| except httpx.RequestError: | |
| await update.message.reply_text( | |
| "Couldn't reach Lobsters right now — try again in a bit." | |
| ) | |
| return | |
| await show_next(update.message, context) | |
| async def show_next(message: Message, context: ContextTypes.DEFAULT_TYPE) -> None: | |
| queue: list[Story] = context.user_data.get("queue", []) | |
| if not queue: | |
| await message.reply_text("Inbox zero. That's all the trends today.") | |
| return | |
| story = queue[0] | |
| suggested = await context.bot_data["classifier"].tag(story) | |
| await message.reply_text( | |
| f"{story.title}\n{story.url}", | |
| reply_markup=triage_keyboard(suggested), | |
| ) | |
| async def on_tag(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
| query = update.callback_query | |
| if query is None or query.data is None: | |
| return | |
| await query.answer() # 1. stop the spinner, first thing | |
| _, tag = query.data.split(":", 1) # 2. "tag:read" -> "read" | |
| queue = context.user_data.get("queue", []) | |
| if not queue: # stale button after a restart | |
| await query.edit_message_text("Session expired, send /digest again.") | |
| return | |
| story = queue.pop(0) | |
| if tag != "skip": | |
| save_to_reading_list(story, tag) # the human's final say | |
| await query.edit_message_text( # 3. edit, don't reply | |
| f"Filed under {tag}: {story.title}" | |
| if tag != "skip" | |
| else f"Skipped: {story.title}" | |
| ) | |
| await show_next(query.message, context) | |
| async def on_error(update: object, context: ContextTypes.DEFAULT_TYPE) -> None: | |
| logger.exception("Handler failed", exc_info=context.error) | |
| def main() -> None: | |
| logging.basicConfig( | |
| format="%(asctime)s %(name)s %(levelname)s %(message)s", | |
| level=logging.INFO, | |
| ) | |
| logger.info("Starting trend triage bot, polling for updates") | |
| app = Application.builder().token(config("TELEGRAM_BOT_TOKEN")).build() | |
| app.bot_data["classifier"] = _build_classifier() | |
| app.add_handler(CommandHandler("digest", start_digest)) | |
| app.add_handler(CallbackQueryHandler(on_tag, pattern="^tag:")) | |
| app.add_error_handler(on_error) | |
| app.run_polling() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment