Skip to content

Instantly share code, notes, and snippets.

@navix
Created May 18, 2024 05:19
Show Gist options
  • Save navix/4511cc9e01dfb4f89ded960d0fb43e67 to your computer and use it in GitHub Desktop.
Save navix/4511cc9e01dfb4f89ded960d0fb43e67 to your computer and use it in GitHub Desktop.
Very basic bluesky firehost subscription
import {AtUriUtil} from '@@api/at/at-uri';
import {cborToLexRecord, readCar} from '@atproto/repo';
import {Subscription} from '@atproto/xrpc-server';
import {bskyAgent} from '~/at/bsky-agent';
import {db} from '~/core';
import {CID} from 'multiformats/cid';
const endpoint = 'wss://bsky.network';
interface RepoEvent {
ops: {
cid: CID | null;
path: string;
action: 'create' | 'delete' | 'update';
}[];
seq: number;
prev: string;
repo: string;
time: string;
blobs: any[];
blocks: Uint8Array;
commit: string;
rebase: string;
tooBig: boolean;
$type: string;
}
const sub = new Subscription({
service: endpoint,
method: 'com.atproto.sync.subscribeRepos',
getParams: () => getCursor(),
validate: (value: unknown) => {
try {
return value as any;
} catch (err) {
console.error('repo subscription skipped invalid message', err);
}
},
});
export async function firehoseSub() {
console.info('>> firehoseSub');
await ensureCursor();
const run = async (subscriptionReconnectDelay: number) => {
console.info('>> firehoseSub run');
try {
for await (const evt of sub) {
if (evt.$type === 'com.atproto.sync.subscribeRepos#info' && evt.name === 'OutdatedCursor') {
await dropCursor();
throw new Error('OutdatedCursor');
}
if (!evt.blocks) {
continue;
}
try {
await handleEvent(evt);
} catch (err) {
console.error('repo subscription could not handle message', err);
}
// update stored cursor every 20 events or so
if (isCommit(evt) && evt.seq % 100 === 0) {
await updateCursor(evt.seq);
}
}
} catch (err) {
console.error('repo subscription errored', err);
setTimeout(() => run(subscriptionReconnectDelay), subscriptionReconnectDelay);
}
};
run(1000).then();
}
async function ensureCursor() {
const res = await db.selectFrom('AtSubState').selectAll().where('service', '=', endpoint).executeTakeFirst();
if (!res) {
await db.insertInto('AtSubState').values({service: endpoint, cursor: 0}).execute();
}
}
async function updateCursor(cursor: number) {
await db.updateTable('AtSubState').set({cursor}).where('service', '=', endpoint).execute();
}
async function getCursor(): Promise<{cursor?: number}> {
const res = await db.selectFrom('AtSubState').selectAll().where('service', '=', endpoint).executeTakeFirst();
return res?.cursor ? {cursor: res.cursor} : {};
}
async function dropCursor() {
await db.deleteFrom('AtSubState').where('service', '=', endpoint).execute();
}
async function handleEvent(evt: RepoEvent) {
const car = await readCar(evt.blocks);
for (const op of evt.ops) {
if (op.action === 'create') {
if (!op.cid) continue;
const recordBytes = car.blocks.get(op.cid);
if (!recordBytes) continue;
const [collection, repoId] = op.path.split('/');
if (collection === 'app.bsky.feed.post') {
const record: any = cborToLexRecord(recordBytes);
const uri = `at://${evt.repo}/${op.path}`;
// Add profile
try {
await db
.insertInto('AtProfile')
.values({
did: evt.repo,
})
.onConflict(oc => oc.doNothing())
.execute();
} catch (e) {
console.error('Firehose: add profile error', e);
}
updateProfileMeta(evt.repo).then();
// Add post
const atUri = AtUriUtil.parse(uri);
if (!record.reply) {
await db
.insertInto('AtPost')
.values({
createdAt: record.createdAt,
did: evt.repo,
rkey: atUri.rkey,
})
.onConflict(oc => oc.doNothing())
.execute()
.then();
}
}
} else if (op.action === 'delete') {
const [collection, repoId] = op.path.split('/');
if (collection === 'app.bsky.feed.post') {
for (const op of evt.ops) {
const rkey = op.path.split('/')[1];
try {
await db.deleteFrom('AtPost').where('did', '=', evt.repo).where('rkey', '=', rkey).execute();
} catch (e) {
console.error('Firehose: del post error', e);
}
}
}
}
}
}
export function isCommit(v: unknown): v is any {
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#commit';
}
export function isObj(v: unknown): v is Record<string, unknown> {
return typeof v === 'object' && v !== null;
}
export function hasProp<K extends PropertyKey>(data: object, prop: K): data is Record<K, unknown> {
return prop in data;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment