Created
May 31, 2026 02:53
-
-
Save sulrich/48ea1b2d2d1ccaa3d46f27e2e2088a9b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # requires-python = ">=3.11" | |
| # dependencies = [ | |
| # "atproto", | |
| # "grapheme", | |
| # "Mastodon.py", | |
| # "python-frontmatter", | |
| # "requests", | |
| # ] | |
| # /// | |
| """ | |
| posse: syndicate new botwerks posts to Mastodon and Bluesky. | |
| reads content/{posts,til,links}/ as the source of truth, checks the live | |
| feed.json to confirm the deploy has propagated, then posts new entries with | |
| a configurable delay between items. state is tracked in data/syndicated.json | |
| so already-posted entries are never reposted. | |
| modes: | |
| --init seed state with all currently eligible items; no posting | |
| --dry-run print what would be posted; no api calls, no state writes | |
| (default) post to both platforms, update state, sleep between items | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import sys | |
| import time | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| import frontmatter | |
| import grapheme | |
| import requests | |
| from atproto import Client as BlueskyClient | |
| from atproto import client_utils | |
| from mastodon import Mastodon | |
| REPO_ROOT = Path(__file__).resolve().parent.parent | |
| CONTENT_DIR = REPO_ROOT / "content" | |
| STATE_FILE = REPO_ROOT / "data" / "syndicated.json" | |
| # canonical base URL (matches hugo.toml baseURL, served via cloudflare). | |
| # https because we want clean links in social posts; cloudflare upgrades anyway. | |
| BASE_URL = "https://botwerks.net" | |
| LIVE_FEED_URL = "https://botwerks.net/feed.json" | |
| # posts/til syndicate by default; links require this tag. | |
| LINKS_SYNDICATE_TAG = "syndicate" | |
| # bluesky's hard limit is 300 graphemes. | |
| BLUESKY_LIMIT = 300 | |
| DEFAULT_DELAY_SECONDS = 180 # 3 min between items | |
| FEED_POLL_INTERVAL = 30 | |
| FEED_POLL_TIMEOUT = 600 # 10 min | |
| @dataclass | |
| class Candidate: | |
| guid: str # the URL, used as primary key in state | |
| title: str | |
| description: str | |
| date: datetime | |
| section: str | |
| source_path: Path | |
| tags: list[str] = field(default_factory=list) | |
| def load_state() -> dict: | |
| if STATE_FILE.exists(): | |
| return json.loads(STATE_FILE.read_text()) | |
| return {"version": 1, "entries": {}} | |
| def write_state(state: dict) -> None: | |
| STATE_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| tmp = STATE_FILE.with_suffix(".json.tmp") | |
| tmp.write_text(json.dumps(state, indent=2, sort_keys=True) + "\n") | |
| tmp.replace(STATE_FILE) | |
| def coerce_datetime(value) -> datetime | None: | |
| if value is None: | |
| return None | |
| if isinstance(value, datetime): | |
| return value if value.tzinfo else value.replace(tzinfo=timezone.utc) | |
| if isinstance(value, str): | |
| try: | |
| dt = datetime.fromisoformat(value.replace("Z", "+00:00")) | |
| return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) | |
| except ValueError: | |
| return None | |
| return None | |
| def build_permalink(source_path: Path, dt: datetime) -> str: | |
| # hugo.toml: [Permalinks] posts/links/til = ":year/:month/:contentbasename" | |
| # contentbasename = filename without extension | |
| return f"{BASE_URL}/{dt.year}/{dt.month:02d}/{source_path.stem}/" | |
| def discover_candidates() -> list[Candidate]: | |
| candidates: list[Candidate] = [] | |
| for section in ("posts", "til", "links"): | |
| section_dir = CONTENT_DIR / section | |
| if not section_dir.is_dir(): | |
| continue | |
| for md_path in sorted(section_dir.glob("*.md")): | |
| if md_path.name == "_index.md": | |
| continue | |
| try: | |
| post = frontmatter.load(md_path) | |
| except Exception as exc: | |
| print( | |
| f"warn: skipping {md_path.name}: parse error: {exc}", | |
| file=sys.stderr, | |
| ) | |
| continue | |
| fm = post.metadata | |
| if fm.get("draft") is True: | |
| continue | |
| if fm.get("syndicate") is False: | |
| continue | |
| tags = [str(t).lower() for t in (fm.get("tags") or [])] | |
| if section == "links" and LINKS_SYNDICATE_TAG not in tags: | |
| continue | |
| dt = coerce_datetime(fm.get("date")) | |
| if dt is None: | |
| print( | |
| f"warn: skipping {md_path.name}: no parseable date", file=sys.stderr | |
| ) | |
| continue | |
| if dt > datetime.now(timezone.utc): | |
| continue | |
| title = str(fm.get("title") or md_path.stem).strip() | |
| description = str(fm.get("description") or "").strip() | |
| candidates.append( | |
| Candidate( | |
| guid=build_permalink(md_path, dt), | |
| title=title, | |
| description=description, | |
| date=dt, | |
| section=section, | |
| source_path=md_path, | |
| tags=tags, | |
| ) | |
| ) | |
| candidates.sort(key=lambda c: c.date) | |
| return candidates | |
| @dataclass | |
| class FormattedPost: | |
| title: str | |
| description: str # possibly truncated; "" if dropped | |
| url: str | |
| @property | |
| def text(self) -> str: | |
| if self.description: | |
| return f"{self.title}\n\n{self.description}\n\n{self.url}" | |
| return f"{self.title}\n\n{self.url}" | |
| def format_post(c: Candidate, limit: int = BLUESKY_LIMIT) -> FormattedPost: | |
| fp = FormattedPost(title=c.title, description=c.description, url=c.guid) | |
| if grapheme.length(fp.text) <= limit: | |
| return fp | |
| if not fp.description: | |
| return fp # already minimal; let it overflow (rare; mastodon accepts it) | |
| # truncate description with ellipsis until total fits. | |
| # layout is "title\n\nDESC\n\nurl" — 4 newlines of overhead around DESC. | |
| overhead = grapheme.length(c.title) + grapheme.length(c.guid) + 4 | |
| room_for_desc = limit - overhead - 1 # leave 1 for the ellipsis | |
| if room_for_desc <= 0: | |
| fp.description = "" | |
| return fp | |
| desc_graphemes = list(grapheme.graphemes(c.description)) | |
| fp.description = "".join(desc_graphemes[:room_for_desc]).rstrip() + "…" | |
| return fp | |
| def fetch_live_urls() -> set[str]: | |
| try: | |
| r = requests.get(LIVE_FEED_URL, timeout=30) | |
| r.raise_for_status() | |
| data = r.json() | |
| except Exception as exc: | |
| print(f"warn: live feed fetch failed: {exc}", file=sys.stderr) | |
| return set() | |
| urls = set() | |
| for item in data.get("items", []): | |
| u = item.get("url") or item.get("id") | |
| if not u: | |
| continue | |
| # feed emits http://botwerks.org/... — normalize to https for matching | |
| if u.startswith("http://"): | |
| u = "https://" + u[len("http://") :] | |
| urls.add(u.rstrip("/") + "/") | |
| return urls | |
| def wait_for_live(guid: str) -> bool: | |
| target = guid.rstrip("/") + "/" | |
| deadline = time.monotonic() + FEED_POLL_TIMEOUT | |
| while time.monotonic() < deadline: | |
| if target in fetch_live_urls(): | |
| return True | |
| time.sleep(FEED_POLL_INTERVAL) | |
| return False | |
| def post_to_mastodon(text: str) -> str: | |
| instance = os.environ["MASTODON_INSTANCE_URL"] | |
| token = os.environ["MASTODON_ACCESS_TOKEN"] | |
| m = Mastodon(api_base_url=instance, access_token=token) | |
| result = m.status_post(text, visibility="public") | |
| return str(result["id"]) | |
| def post_to_bluesky(fp: FormattedPost) -> str: | |
| handle = os.environ["BLUESKY_HANDLE"] | |
| password = os.environ["BLUESKY_APP_PASSWORD"] | |
| client = BlueskyClient() | |
| client.login(handle, password) | |
| # build with a URL facet so the link is clickable | |
| tb = client_utils.TextBuilder() | |
| if fp.description: | |
| tb.text(f"{fp.title}\n\n{fp.description}\n\n") | |
| else: | |
| tb.text(f"{fp.title}\n\n") | |
| tb.link(fp.url, fp.url) | |
| result = client.send_post(tb) | |
| return result.uri | |
| def syndicate_one(c: Candidate, dry_run: bool) -> dict | None: | |
| fp = format_post(c) | |
| print(f"\n--- {c.guid}") | |
| print(fp.text) | |
| if dry_run: | |
| return None | |
| print(" posting to mastodon...", end=" ", flush=True) | |
| mastodon_id = post_to_mastodon(fp.text) | |
| print(f"ok ({mastodon_id})") | |
| print(" posting to bluesky...", end=" ", flush=True) | |
| bluesky_uri = post_to_bluesky(fp) | |
| print(f"ok ({bluesky_uri})") | |
| return { | |
| "posted_at": datetime.now(timezone.utc).isoformat(timespec="seconds"), | |
| "mastodon_id": mastodon_id, | |
| "bluesky_uri": bluesky_uri, | |
| } | |
| def main() -> int: | |
| p = argparse.ArgumentParser(description=__doc__) | |
| p.add_argument( | |
| "--init", | |
| action="store_true", | |
| help="seed state with all eligible items; do not post", | |
| ) | |
| p.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="print intended posts; no API calls or state writes", | |
| ) | |
| p.add_argument( | |
| "--delay", | |
| type=int, | |
| default=int(os.environ.get("POSSE_DELAY_SECONDS", DEFAULT_DELAY_SECONDS)), | |
| help="seconds to sleep between items (default %(default)s)", | |
| ) | |
| args = p.parse_args() | |
| candidates = discover_candidates() | |
| state = load_state() | |
| already = state["entries"] | |
| new_items = [c for c in candidates if c.guid not in already] | |
| print(f"candidates total: {len(candidates)}") | |
| print(f"already syndicated: {len(already)}") | |
| print(f"new this run: {len(new_items)}") | |
| if not new_items: | |
| return 0 | |
| if args.init: | |
| now = datetime.now(timezone.utc).isoformat(timespec="seconds") | |
| for c in new_items: | |
| already[c.guid] = {"posted_at": now, "init": True} | |
| write_state(state) | |
| print(f"\nseeded state with {len(new_items)} entries -> {STATE_FILE}") | |
| return 0 | |
| if args.dry_run: | |
| for c in new_items: | |
| syndicate_one(c, dry_run=True) | |
| return 0 | |
| for idx, c in enumerate(new_items): | |
| print( | |
| f"\n[{idx + 1}/{len(new_items)}] waiting for {c.guid} to appear in live feed..." | |
| ) | |
| if not wait_for_live(c.guid): | |
| print( | |
| f" warn: {c.guid} not visible after {FEED_POLL_TIMEOUT}s; skipping (will retry next run)" | |
| ) | |
| continue | |
| try: | |
| entry = syndicate_one(c, dry_run=False) | |
| except Exception as exc: | |
| print(f" error: syndication failed for {c.guid}: {exc}", file=sys.stderr) | |
| # write whatever progress we have so far | |
| write_state(state) | |
| return 1 | |
| if entry is not None: | |
| already[c.guid] = entry | |
| write_state(state) | |
| if idx + 1 < len(new_items): | |
| print(f" sleeping {args.delay}s before next item...") | |
| time.sleep(args.delay) | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment