Skip to content

Instantly share code, notes, and snippets.

@garth
Last active February 29, 2024 01:43
Show Gist options
  • Save garth/9aeac433e83044b0e17d2637d15de179 to your computer and use it in GitHub Desktop.
Save garth/9aeac433e83044b0e17d2637d15de179 to your computer and use it in GitHub Desktop.
Connect emitter interface for Fireproof
import {
type DownloadMetaFnParams,
type DownloadDataFnParams,
type UploadMetaFnParams,
type UploadDataFnParams,
type ChannelEmitter,
} from './types'
import { Connection } from '@fireproof/connect'
export type ConnectEmitterParams = {
emitter: ChannelEmitter
}
export class ConnectEmitter extends Connection {
emitter: ChannelEmitter
messagePromise: Promise<Uint8Array[]>
messageResolve?: (value: Uint8Array[] | PromiseLike<Uint8Array[]>) => void
constructor(params: ConnectEmitterParams) {
super()
this.emitter = params.emitter
this.ready = Promise.resolve()
this.messagePromise = new Promise<Uint8Array[]>((resolve, reject) => {
this.messageResolve = resolve
})
}
async onConnect() {
if (!this.loader || !this.taskManager) {
throw new Error('loader and taskManager must be set')
}
this.emitter.on('update', (message) => {
const afn = async () => {
const eventBlock = await this.decodeEventBlock(message.data)
await this.taskManager!.handleEvent(eventBlock)
this.messageResolve?.([eventBlock.value.data.dbMeta])
// add the cid to our parents so we delete it when we send the update
this.parents.push(eventBlock.cid)
setTimeout(() => {
this.messagePromise = new Promise<Uint8Array[]>((resolve, reject) => {
this.messageResolve = resolve
})
}, 0)
}
void afn()
})
void this.emitter.emit('requestHead')
}
async dataUpload(data: Uint8Array, params: UploadDataFnParams) {
void this.emitter.emit('store', {
id: `${params.name}-car-${params.car}`,
data,
})
}
async dataDownload(params: DownloadDataFnParams) {
return new Promise<Uint8Array>((resolve) => {
const id = `${params.name}-car-${params.car}`
const messageHandler = (message: { id: string; data: Uint8Array | null }) => {
if (message.id === id) {
this.emitter.off('response', messageHandler)
if (message.data == null) {
throw new Error('Failure in downloading data!')
}
resolve(message.data)
}
}
this.emitter.on('response', messageHandler)
void this.emitter.emit('request', { id })
})
}
async metaUpload(bytes: Uint8Array, _params: UploadMetaFnParams) {
const event = await this.createEventBlock(bytes)
void this.emitter.emit('storeHead', {
id: event.cid.toString(),
data: event.bytes,
parents: this.parents.map((parent) => parent.toString()),
})
this.parents = [event.cid]
return null
}
async metaDownload(_params: DownloadMetaFnParams) {
return this.messagePromise
}
}
import { type Connectable } from '@fireproof/connect'
import { ConnectEmitter } from './connect-emitter'
import type { ChannelEmitter } from './types'
const webSocketCxs = new Map<string, ConnectEmitter>()
export const connect = {
emitter: ({ name, blockstore }: Connectable, emitter: ChannelEmitter) => {
if (!name) {
throw new Error('database name is required')
}
if (webSocketCxs.has(name)) {
return webSocketCxs.get(name)!
}
const connection = new ConnectEmitter({ emitter })
connection.connect(blockstore)
webSocketCxs.set(name, connection)
return connection
},
}
import type Emittery from 'emittery'
export type ChannelEmitter = Emittery<{
/**
* Store a message for future retrieval
*
* These messages are not broadcast to subscribers
*/
store: {
id: string
data: Uint8Array
}
/**
* This event is emitted when a message is received
*/
update: {
id?: string
data: Uint8Array
}
/**
* Request a stored message from the channel
*/
request: {
id: string
}
/**
* Response to a request
*/
response: {
id: string
/** Null if the message was not found */
data: Uint8Array | null
}
/**
* Special message to store fireproof database updates
*/
storeHead: {
id: string
data: Uint8Array
parents: string[]
}
/**
* Special message to request fireproof database updates
*
* The response will be sent as a series of `update` events
*/
requestHead: undefined
}>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment