Skip to content

Instantly share code, notes, and snippets.

@jchris
Created September 21, 2023 18:55
Show Gist options
  • Save jchris/0fcb5c65031a8f75085e4287a5d28e12 to your computer and use it in GitHub Desktop.
Save jchris/0fcb5c65031a8f75085e4287a5d28e12 to your computer and use it in GitHub Desktop.
import type { CID } from 'multiformats'
import { encode, decode } from 'multiformats/block'
import { sha256 as hasher } from 'multiformats/hashes/sha2'
import * as codec from '@ipld/dag-cbor'
import { put, get, entries, EventData, root } from '@alanshaw/pail/crdt'
import { EventFetcher, vis } from '@alanshaw/pail/clock'
import { LoggingFetcher, Transaction } from './transaction'
import type { TransactionBlockstore } from './transaction'
import type { DocUpdate, ClockHead, AnyLink, DocValue, BulkResult, ChangesOptions, Doc, DocFileMeta, FileResult, DocFiles } from './types'
import { decodeFile, encodeFile } from './files'
import { DbLoader } from './loaders'
export async function applyBulkUpdateToCrdt(
tblocks: Transaction,
head: ClockHead,
updates: DocUpdate[],
options?: object
): Promise<BulkResult> {
let result
for (const update of updates) {
const link = await writeDocContent(tblocks, update)
result = await put(tblocks, head, update.key, link, options)
const resRoot = result.root.toString()
const isReturned = result.additions.some(a => a.cid.toString() === resRoot)
if (!isReturned) {
const hasRoot = await tblocks.get(result.root) // is a db-wide get
if (!hasRoot) {
console.error(`missing root in additions: ${result.additions.length} ${resRoot} keys: ${updates.map(u => u.key).toString()}`)
// make sure https://github.com/alanshaw/pail/pull/20 is applied
result.head = head
}
}
for (const { cid, bytes } of [...result.additions, ...result.removals, result.event]) {
tblocks.putSync(cid, bytes)
}
head = result.head
}
return { head }
}
// this whole thing can get pulled outside of the write queue
async function writeDocContent(blocks: Transaction, update: DocUpdate): Promise<AnyLink> {
let value: DocValue
if (update.del) {
value = { del: true }
} else {
await processFiles(blocks, update.value as Doc)
value = { doc: update.value }
}
const block = await encode({ value, hasher, codec })
blocks.putSync(block.cid, block.bytes)
return block.cid
}
async function processFiles(blocks: Transaction, doc: Doc) {
if (doc._files) {
await processFileset(blocks, doc._files)
}
if (doc._publicFiles) {
await processFileset(blocks, doc._publicFiles, true)
}
}
async function processFileset(blocks: Transaction, files: DocFiles, publicFiles = false) {
const dbBlockstore = blocks.parent as TransactionBlockstore
const t = new Transaction(dbBlockstore)
dbBlockstore.transactions.add(t)
const didPut = []
let totalSize = 0
for (const filename in files) {
if (File === files[filename].constructor) {
const file = files[filename] as File
totalSize += file.size
const { cid, blocks: fileBlocks } = await encodeFile(file)
didPut.push(filename)
for (const block of fileBlocks) {
t.putSync(block.cid, block.bytes)
}
files[filename] = { cid, type: file.type, size: file.size } as DocFileMeta
}
}
// todo option to bypass this limit
if (totalSize > 1024 * 1024 * 1) throw new Error('Sync limit for files in a single update is 1MB')
if (didPut.length) {
const car = await dbBlockstore.loader?.commit(t, { files } as FileResult, { public: publicFiles })
if (car) {
for (const name of didPut) {
files[name] = { car, ...files[name] } as DocFileMeta
}
}
}
}
export async function getValueFromCrdt(blocks: TransactionBlockstore, head: ClockHead, key: string): Promise<DocValue> {
if (!head.length) throw new Error('Getting from an empty database')
const link = await get(blocks, head, key)
if (!link) throw new Error(`Missing key ${key}`)
return await getValueFromLink(blocks, link)
}
export function readFiles(blocks: TransactionBlockstore | LoggingFetcher, { doc }: DocValue) {
if (!doc) return
if (doc._files) {
readFileset(blocks, doc._files)
}
if (doc._publicFiles) {
readFileset(blocks, doc._publicFiles, true)
}
}
function readFileset(blocks: TransactionBlockstore | LoggingFetcher, files: DocFiles, isPublic = false) {
for (const filename in files) {
const fileMeta = files[filename] as DocFileMeta
if (fileMeta.cid) {
// if (!blocks.loader) throw new Error('Missing loader')
if (isPublic) { fileMeta.url = `https://${fileMeta.cid.toString()}.ipfs.w3s.link/` }
if (fileMeta.car && blocks.loader) {
const ld = blocks.loader as DbLoader
fileMeta.file = async () => await decodeFile({
get: async (cid: AnyLink) => {
const reader = await ld.loadFileCar(fileMeta.car!, isPublic)
const block = await reader.get(cid as CID)
if (!block) throw new Error(`Missing block ${cid.toString()}`)
return block.bytes
}
}, fileMeta.cid, fileMeta)
}
}
files[filename] = fileMeta
}
}
async function getValueFromLink(blocks: TransactionBlockstore | LoggingFetcher, link: AnyLink): Promise<DocValue> {
const block = await blocks.get(link)
if (!block) throw new Error(`Missing linked block ${link.toString()}`)
const { value } = (await decode({ bytes: block.bytes, hasher, codec })) as { value: DocValue }
readFiles(blocks, value)
return value
}
class DirtyEventFetcher<T> extends EventFetcher<T> {
// @ts-ignore
async get(link) {
try {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
return await super.get(link)
} catch (e) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call
console.error('missing event', link.toString(), e)
return ({ value: null })
}
}
}
export async function clockChangesSince(
blocks: TransactionBlockstore | LoggingFetcher,
head: ClockHead,
since: ClockHead,
opts: ChangesOptions
): Promise<{ result: DocUpdate[], head: ClockHead }> {
const eventsFetcher = (opts.dirty ? new DirtyEventFetcher<EventData>(blocks) : new EventFetcher<EventData>(blocks)) as EventFetcher<EventData>
const keys: Set<string> = new Set()
const updates = await gatherUpdates(blocks, eventsFetcher, head, since, [], keys, new Set<string>(), opts.limit || Infinity)
return { result: updates.reverse(), head }
}
async function gatherUpdates(
blocks: TransactionBlockstore | LoggingFetcher,
eventsFetcher: EventFetcher<EventData>,
head: ClockHead,
since: ClockHead,
updates: DocUpdate[] = [],
keys: Set<string>, didLinks: Set<string>,
limit: number
): Promise<DocUpdate[]> {
if (limit <= 0) return updates
const sHead = head.map(l => l.toString())
for (const link of since) {
if (sHead.includes(link.toString())) {
return updates
}
}
for (const link of head) {
if (didLinks.has(link.toString())) continue
didLinks.add(link.toString())
const { value: event } = await eventsFetcher.get(link)
if (!event) continue
const { key, value } = event.data
if (keys.has(key)) {
if (event.parents) {
updates = await gatherUpdates(blocks, eventsFetcher, event.parents, since, updates, keys, didLinks, limit)
}
} else {
keys.add(key)
const docValue = await getValueFromLink(blocks, value)
updates.push({ key, value: docValue.doc, del: docValue.del })
limit--
if (event.parents) {
updates = await gatherUpdates(blocks, eventsFetcher, event.parents, since, updates, keys, didLinks, limit)
}
}
}
return updates
}
export async function * getAllEntries(blocks: TransactionBlockstore, head: ClockHead) {
// return entries(blocks, head)
for await (const [key, link] of entries(blocks, head)) {
const docValue = await getValueFromLink(blocks, link)
yield { key, value: docValue.doc, del: docValue.del } as DocUpdate
}
}
export async function * clockVis(blocks: TransactionBlockstore, head: ClockHead) {
for await (const line of vis(blocks, head)) {
yield line
}
}
export async function doCompact(blocks: TransactionBlockstore, head: ClockHead) {
const blockLog = new LoggingFetcher(blocks)
const newBlocks = new Transaction(blocks)
for await (const [, link] of entries(blockLog, head)) {
const bl = await blockLog.get(link)
if (!bl) throw new Error('Missing block: ' + link.toString())
// await newBlocks.put(link, bl.bytes)
}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _line of vis(blockLog, head)) {
void 1
}
const result = await root(blockLog, head)
for (const { cid, bytes } of [...result.additions, ...result.removals]) {
newBlocks.putSync(cid, bytes)
}
await clockChangesSince(blockLog, head, [], {})
for (const cid of blockLog.cids) {
const bl = await blocks.get(cid)
if (!bl) throw new Error('Missing block: ' + cid.toString())
await newBlocks.put(cid, bl.bytes)
}
await blocks.commitCompaction(newBlocks, head)
}
import type { Link } from 'multiformats'
import type { EventLink } from '@alanshaw/pail/clock'
import type { EventData } from '@alanshaw/pail/crdt'
import { ClockHead, Doc } from './types'
export type FireproofOptions = {
public?: boolean
meta?: DbMeta
persistIndexes?: boolean
}
// ts-unused-exports:disable-next-line
export type ClockLink = EventLink<EventData>
export type ClockHead = ClockLink[]
export type DocFragment = Uint8Array | string | number | boolean | null | DocFragment[] | { [key: string]: DocFragment }
export type Doc = DocBody & DocBase
export type DocBase = {
_id?: string
_files?: DocFiles
_publicFiles?: DocFiles
}
export type DocFileMeta = {
type: string;
size: number;
cid: AnyLink;
car?: AnyLink;
url?: string;
file?: () => Promise<File>;
}
type DocFiles = {
[key: string]: File | DocFileMeta
}
export type FileCarHeader = {
files: AnyLink[]
}
type DocBody = {
_files?: DocFiles
_publicFiles?: DocFiles
[key: string]: DocFragment
}
type DocMeta = {
proof?: DocFragment
clock?: ClockHead
}
export type DocUpdate = {
head: ClockHead,
key: string
value?: { [key: string]: any }
del?: boolean
}
export type DocValue = {
doc?: DocBody
del?: boolean
}
type IndexCars = {
[key: string]: AnyLink
}
export type IndexKey = [string, string] | string
export type IndexUpdate = {
key: IndexKey
value?: DocFragment
del?: boolean
}
export type IndexRow = {
id: string
key: IndexKey
doc?: Doc | null
value?: DocFragment
del?: boolean
}
type CarCommit = {
car?: AnyLink
}
export type BulkResult = {
head: ClockHead
}
export type FileResult = {
files: { [key: string]: DocFileMeta }
}
export type IdxMeta = {
byId: AnyLink
byKey: AnyLink
map: string
name: string
head: ClockHead
}
export type IdxMetaMap = {
indexes: Map<string, IdxMeta>
}
type CarHeader = {
cars: AnyLink[]
compact: AnyLink[]
}
export type IdxCarHeader = CarHeader & IdxMetaMap
export type DbCarHeader = CarHeader & {
head: ClockHead
}
export type AnyCarHeader = DbCarHeader | IdxCarHeader | FileCarHeader
export type CarLoaderHeader = DbCarHeader | IdxCarHeader
export type QueryOpts = {
descending?: boolean
limit?: number
includeDocs?: boolean
range?: [IndexKey, IndexKey]
key?: DocFragment,
keys?: DocFragment[]
prefix?: DocFragment | [DocFragment]
}
export type AnyLink = Link<unknown, number, number, 1 | 0>
export type AnyBlock = { cid: AnyLink; bytes: Uint8Array }
export type AnyDecodedBlock = { cid: AnyLink; bytes: Uint8Array, value: any }
export type BlockFetcher = { get: (link: AnyLink) => Promise<AnyBlock | undefined> }
type CallbackFn = (k: DocFragment, v?: DocFragment) => void
export type MapFn = (doc: Doc, map: CallbackFn) => DocFragment | void
export type DbMeta = { car: AnyLink, key: string | null }
export type CommitOpts = { noLoader?: boolean, compact?: boolean, public?: boolean }
export interface CarMakeable {
entries(): Iterable<AnyBlock>
get(cid: AnyLink): Promise<AnyBlock | undefined>
}
export type UploadMetaFnParams = {
name: string,
branch: string,
}
export type UploadDataFnParams = {
type: 'data' | 'file',
name: string,
car: string,
size: string
}
export type DownloadFnParamTypes = 'data' | 'file'
export type DownloadDataFnParams = {
type: DownloadFnParamTypes,
name: string,
car: string,
}
export type DownloadMetaFnParams = {
name: string,
branch: string,
}
export type LoadHandler = (dbMetas: DbMeta[]) => Promise<void>
export type ChangesOptions = {
dirty?: boolean
limit?: number
}
export type ChangesResponse = {
clock: ClockHead
rows: { key: string; value: Doc }[]
}
export type DbResponse = {
id: string
clock: ClockHead
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment