Created
May 18, 2024 05:19
-
-
Save navix/4511cc9e01dfb4f89ded960d0fb43e67 to your computer and use it in GitHub Desktop.
Very basic bluesky firehost subscription
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {AtUriUtil} from '@@api/at/at-uri'; | |
import {cborToLexRecord, readCar} from '@atproto/repo'; | |
import {Subscription} from '@atproto/xrpc-server'; | |
import {bskyAgent} from '~/at/bsky-agent'; | |
import {db} from '~/core'; | |
import {CID} from 'multiformats/cid'; | |
const endpoint = 'wss://bsky.network'; | |
interface RepoEvent { | |
ops: { | |
cid: CID | null; | |
path: string; | |
action: 'create' | 'delete' | 'update'; | |
}[]; | |
seq: number; | |
prev: string; | |
repo: string; | |
time: string; | |
blobs: any[]; | |
blocks: Uint8Array; | |
commit: string; | |
rebase: string; | |
tooBig: boolean; | |
$type: string; | |
} | |
const sub = new Subscription({ | |
service: endpoint, | |
method: 'com.atproto.sync.subscribeRepos', | |
getParams: () => getCursor(), | |
validate: (value: unknown) => { | |
try { | |
return value as any; | |
} catch (err) { | |
console.error('repo subscription skipped invalid message', err); | |
} | |
}, | |
}); | |
export async function firehoseSub() { | |
console.info('>> firehoseSub'); | |
await ensureCursor(); | |
const run = async (subscriptionReconnectDelay: number) => { | |
console.info('>> firehoseSub run'); | |
try { | |
for await (const evt of sub) { | |
if (evt.$type === 'com.atproto.sync.subscribeRepos#info' && evt.name === 'OutdatedCursor') { | |
await dropCursor(); | |
throw new Error('OutdatedCursor'); | |
} | |
if (!evt.blocks) { | |
continue; | |
} | |
try { | |
await handleEvent(evt); | |
} catch (err) { | |
console.error('repo subscription could not handle message', err); | |
} | |
// update stored cursor every 20 events or so | |
if (isCommit(evt) && evt.seq % 100 === 0) { | |
await updateCursor(evt.seq); | |
} | |
} | |
} catch (err) { | |
console.error('repo subscription errored', err); | |
setTimeout(() => run(subscriptionReconnectDelay), subscriptionReconnectDelay); | |
} | |
}; | |
run(1000).then(); | |
} | |
async function ensureCursor() { | |
const res = await db.selectFrom('AtSubState').selectAll().where('service', '=', endpoint).executeTakeFirst(); | |
if (!res) { | |
await db.insertInto('AtSubState').values({service: endpoint, cursor: 0}).execute(); | |
} | |
} | |
async function updateCursor(cursor: number) { | |
await db.updateTable('AtSubState').set({cursor}).where('service', '=', endpoint).execute(); | |
} | |
async function getCursor(): Promise<{cursor?: number}> { | |
const res = await db.selectFrom('AtSubState').selectAll().where('service', '=', endpoint).executeTakeFirst(); | |
return res?.cursor ? {cursor: res.cursor} : {}; | |
} | |
async function dropCursor() { | |
await db.deleteFrom('AtSubState').where('service', '=', endpoint).execute(); | |
} | |
async function handleEvent(evt: RepoEvent) { | |
const car = await readCar(evt.blocks); | |
for (const op of evt.ops) { | |
if (op.action === 'create') { | |
if (!op.cid) continue; | |
const recordBytes = car.blocks.get(op.cid); | |
if (!recordBytes) continue; | |
const [collection, repoId] = op.path.split('/'); | |
if (collection === 'app.bsky.feed.post') { | |
const record: any = cborToLexRecord(recordBytes); | |
const uri = `at://${evt.repo}/${op.path}`; | |
// Add profile | |
try { | |
await db | |
.insertInto('AtProfile') | |
.values({ | |
did: evt.repo, | |
}) | |
.onConflict(oc => oc.doNothing()) | |
.execute(); | |
} catch (e) { | |
console.error('Firehose: add profile error', e); | |
} | |
updateProfileMeta(evt.repo).then(); | |
// Add post | |
const atUri = AtUriUtil.parse(uri); | |
if (!record.reply) { | |
await db | |
.insertInto('AtPost') | |
.values({ | |
createdAt: record.createdAt, | |
did: evt.repo, | |
rkey: atUri.rkey, | |
}) | |
.onConflict(oc => oc.doNothing()) | |
.execute() | |
.then(); | |
} | |
} | |
} else if (op.action === 'delete') { | |
const [collection, repoId] = op.path.split('/'); | |
if (collection === 'app.bsky.feed.post') { | |
for (const op of evt.ops) { | |
const rkey = op.path.split('/')[1]; | |
try { | |
await db.deleteFrom('AtPost').where('did', '=', evt.repo).where('rkey', '=', rkey).execute(); | |
} catch (e) { | |
console.error('Firehose: del post error', e); | |
} | |
} | |
} | |
} | |
} | |
} | |
export function isCommit(v: unknown): v is any { | |
return isObj(v) && hasProp(v, '$type') && v.$type === 'com.atproto.sync.subscribeRepos#commit'; | |
} | |
export function isObj(v: unknown): v is Record<string, unknown> { | |
return typeof v === 'object' && v !== null; | |
} | |
export function hasProp<K extends PropertyKey>(data: object, prop: K): data is Record<K, unknown> { | |
return prop in data; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment