Skip to content

Instantly share code, notes, and snippets.

@sulrich
Created May 31, 2026 02:53
Show Gist options
  • Select an option

  • Save sulrich/48ea1b2d2d1ccaa3d46f27e2e2088a9b to your computer and use it in GitHub Desktop.

Select an option

Save sulrich/48ea1b2d2d1ccaa3d46f27e2e2088a9b to your computer and use it in GitHub Desktop.
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "atproto",
# "grapheme",
# "Mastodon.py",
# "python-frontmatter",
# "requests",
# ]
# ///
"""
posse: syndicate new botwerks posts to Mastodon and Bluesky.
reads content/{posts,til,links}/ as the source of truth, checks the live
feed.json to confirm the deploy has propagated, then posts new entries with
a configurable delay between items. state is tracked in data/syndicated.json
so already-posted entries are never reposted.
modes:
--init seed state with all currently eligible items; no posting
--dry-run print what would be posted; no api calls, no state writes
(default) post to both platforms, update state, sleep between items
"""
import argparse
import json
import os
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
import frontmatter
import grapheme
import requests
from atproto import Client as BlueskyClient
from atproto import client_utils
from mastodon import Mastodon
REPO_ROOT = Path(__file__).resolve().parent.parent
CONTENT_DIR = REPO_ROOT / "content"
STATE_FILE = REPO_ROOT / "data" / "syndicated.json"
# canonical base URL (matches hugo.toml baseURL, served via cloudflare).
# https because we want clean links in social posts; cloudflare upgrades anyway.
BASE_URL = "https://botwerks.net"
LIVE_FEED_URL = "https://botwerks.net/feed.json"
# posts/til syndicate by default; links require this tag.
LINKS_SYNDICATE_TAG = "syndicate"
# bluesky's hard limit is 300 graphemes.
BLUESKY_LIMIT = 300
DEFAULT_DELAY_SECONDS = 180 # 3 min between items
FEED_POLL_INTERVAL = 30
FEED_POLL_TIMEOUT = 600 # 10 min
@dataclass
class Candidate:
guid: str # the URL, used as primary key in state
title: str
description: str
date: datetime
section: str
source_path: Path
tags: list[str] = field(default_factory=list)
def load_state() -> dict:
if STATE_FILE.exists():
return json.loads(STATE_FILE.read_text())
return {"version": 1, "entries": {}}
def write_state(state: dict) -> None:
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
tmp = STATE_FILE.with_suffix(".json.tmp")
tmp.write_text(json.dumps(state, indent=2, sort_keys=True) + "\n")
tmp.replace(STATE_FILE)
def coerce_datetime(value) -> datetime | None:
if value is None:
return None
if isinstance(value, datetime):
return value if value.tzinfo else value.replace(tzinfo=timezone.utc)
if isinstance(value, str):
try:
dt = datetime.fromisoformat(value.replace("Z", "+00:00"))
return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
except ValueError:
return None
return None
def build_permalink(source_path: Path, dt: datetime) -> str:
# hugo.toml: [Permalinks] posts/links/til = ":year/:month/:contentbasename"
# contentbasename = filename without extension
return f"{BASE_URL}/{dt.year}/{dt.month:02d}/{source_path.stem}/"
def discover_candidates() -> list[Candidate]:
candidates: list[Candidate] = []
for section in ("posts", "til", "links"):
section_dir = CONTENT_DIR / section
if not section_dir.is_dir():
continue
for md_path in sorted(section_dir.glob("*.md")):
if md_path.name == "_index.md":
continue
try:
post = frontmatter.load(md_path)
except Exception as exc:
print(
f"warn: skipping {md_path.name}: parse error: {exc}",
file=sys.stderr,
)
continue
fm = post.metadata
if fm.get("draft") is True:
continue
if fm.get("syndicate") is False:
continue
tags = [str(t).lower() for t in (fm.get("tags") or [])]
if section == "links" and LINKS_SYNDICATE_TAG not in tags:
continue
dt = coerce_datetime(fm.get("date"))
if dt is None:
print(
f"warn: skipping {md_path.name}: no parseable date", file=sys.stderr
)
continue
if dt > datetime.now(timezone.utc):
continue
title = str(fm.get("title") or md_path.stem).strip()
description = str(fm.get("description") or "").strip()
candidates.append(
Candidate(
guid=build_permalink(md_path, dt),
title=title,
description=description,
date=dt,
section=section,
source_path=md_path,
tags=tags,
)
)
candidates.sort(key=lambda c: c.date)
return candidates
@dataclass
class FormattedPost:
title: str
description: str # possibly truncated; "" if dropped
url: str
@property
def text(self) -> str:
if self.description:
return f"{self.title}\n\n{self.description}\n\n{self.url}"
return f"{self.title}\n\n{self.url}"
def format_post(c: Candidate, limit: int = BLUESKY_LIMIT) -> FormattedPost:
fp = FormattedPost(title=c.title, description=c.description, url=c.guid)
if grapheme.length(fp.text) <= limit:
return fp
if not fp.description:
return fp # already minimal; let it overflow (rare; mastodon accepts it)
# truncate description with ellipsis until total fits.
# layout is "title\n\nDESC\n\nurl" — 4 newlines of overhead around DESC.
overhead = grapheme.length(c.title) + grapheme.length(c.guid) + 4
room_for_desc = limit - overhead - 1 # leave 1 for the ellipsis
if room_for_desc <= 0:
fp.description = ""
return fp
desc_graphemes = list(grapheme.graphemes(c.description))
fp.description = "".join(desc_graphemes[:room_for_desc]).rstrip() + "…"
return fp
def fetch_live_urls() -> set[str]:
try:
r = requests.get(LIVE_FEED_URL, timeout=30)
r.raise_for_status()
data = r.json()
except Exception as exc:
print(f"warn: live feed fetch failed: {exc}", file=sys.stderr)
return set()
urls = set()
for item in data.get("items", []):
u = item.get("url") or item.get("id")
if not u:
continue
# feed emits http://botwerks.org/... — normalize to https for matching
if u.startswith("http://"):
u = "https://" + u[len("http://") :]
urls.add(u.rstrip("/") + "/")
return urls
def wait_for_live(guid: str) -> bool:
target = guid.rstrip("/") + "/"
deadline = time.monotonic() + FEED_POLL_TIMEOUT
while time.monotonic() < deadline:
if target in fetch_live_urls():
return True
time.sleep(FEED_POLL_INTERVAL)
return False
def post_to_mastodon(text: str) -> str:
instance = os.environ["MASTODON_INSTANCE_URL"]
token = os.environ["MASTODON_ACCESS_TOKEN"]
m = Mastodon(api_base_url=instance, access_token=token)
result = m.status_post(text, visibility="public")
return str(result["id"])
def post_to_bluesky(fp: FormattedPost) -> str:
handle = os.environ["BLUESKY_HANDLE"]
password = os.environ["BLUESKY_APP_PASSWORD"]
client = BlueskyClient()
client.login(handle, password)
# build with a URL facet so the link is clickable
tb = client_utils.TextBuilder()
if fp.description:
tb.text(f"{fp.title}\n\n{fp.description}\n\n")
else:
tb.text(f"{fp.title}\n\n")
tb.link(fp.url, fp.url)
result = client.send_post(tb)
return result.uri
def syndicate_one(c: Candidate, dry_run: bool) -> dict | None:
fp = format_post(c)
print(f"\n--- {c.guid}")
print(fp.text)
if dry_run:
return None
print(" posting to mastodon...", end=" ", flush=True)
mastodon_id = post_to_mastodon(fp.text)
print(f"ok ({mastodon_id})")
print(" posting to bluesky...", end=" ", flush=True)
bluesky_uri = post_to_bluesky(fp)
print(f"ok ({bluesky_uri})")
return {
"posted_at": datetime.now(timezone.utc).isoformat(timespec="seconds"),
"mastodon_id": mastodon_id,
"bluesky_uri": bluesky_uri,
}
def main() -> int:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument(
"--init",
action="store_true",
help="seed state with all eligible items; do not post",
)
p.add_argument(
"--dry-run",
action="store_true",
help="print intended posts; no API calls or state writes",
)
p.add_argument(
"--delay",
type=int,
default=int(os.environ.get("POSSE_DELAY_SECONDS", DEFAULT_DELAY_SECONDS)),
help="seconds to sleep between items (default %(default)s)",
)
args = p.parse_args()
candidates = discover_candidates()
state = load_state()
already = state["entries"]
new_items = [c for c in candidates if c.guid not in already]
print(f"candidates total: {len(candidates)}")
print(f"already syndicated: {len(already)}")
print(f"new this run: {len(new_items)}")
if not new_items:
return 0
if args.init:
now = datetime.now(timezone.utc).isoformat(timespec="seconds")
for c in new_items:
already[c.guid] = {"posted_at": now, "init": True}
write_state(state)
print(f"\nseeded state with {len(new_items)} entries -> {STATE_FILE}")
return 0
if args.dry_run:
for c in new_items:
syndicate_one(c, dry_run=True)
return 0
for idx, c in enumerate(new_items):
print(
f"\n[{idx + 1}/{len(new_items)}] waiting for {c.guid} to appear in live feed..."
)
if not wait_for_live(c.guid):
print(
f" warn: {c.guid} not visible after {FEED_POLL_TIMEOUT}s; skipping (will retry next run)"
)
continue
try:
entry = syndicate_one(c, dry_run=False)
except Exception as exc:
print(f" error: syndication failed for {c.guid}: {exc}", file=sys.stderr)
# write whatever progress we have so far
write_state(state)
return 1
if entry is not None:
already[c.guid] = entry
write_state(state)
if idx + 1 < len(new_items):
print(f" sleeping {args.delay}s before next item...")
time.sleep(args.delay)
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment