-
-
Save jchris/0fcb5c65031a8f75085e4287a5d28e12 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type { CID } from 'multiformats' | |
import { encode, decode } from 'multiformats/block' | |
import { sha256 as hasher } from 'multiformats/hashes/sha2' | |
import * as codec from '@ipld/dag-cbor' | |
import { put, get, entries, EventData, root } from '@alanshaw/pail/crdt' | |
import { EventFetcher, vis } from '@alanshaw/pail/clock' | |
import { LoggingFetcher, Transaction } from './transaction' | |
import type { TransactionBlockstore } from './transaction' | |
import type { DocUpdate, ClockHead, AnyLink, DocValue, BulkResult, ChangesOptions, Doc, DocFileMeta, FileResult, DocFiles } from './types' | |
import { decodeFile, encodeFile } from './files' | |
import { DbLoader } from './loaders' | |
export async function applyBulkUpdateToCrdt( | |
tblocks: Transaction, | |
head: ClockHead, | |
updates: DocUpdate[], | |
options?: object | |
): Promise<BulkResult> { | |
let result | |
for (const update of updates) { | |
const link = await writeDocContent(tblocks, update) | |
result = await put(tblocks, head, update.key, link, options) | |
const resRoot = result.root.toString() | |
const isReturned = result.additions.some(a => a.cid.toString() === resRoot) | |
if (!isReturned) { | |
const hasRoot = await tblocks.get(result.root) // is a db-wide get | |
if (!hasRoot) { | |
console.error(`missing root in additions: ${result.additions.length} ${resRoot} keys: ${updates.map(u => u.key).toString()}`) | |
// make sure https://github.com/alanshaw/pail/pull/20 is applied | |
result.head = head | |
} | |
} | |
for (const { cid, bytes } of [...result.additions, ...result.removals, result.event]) { | |
tblocks.putSync(cid, bytes) | |
} | |
head = result.head | |
} | |
return { head } | |
} | |
// this whole thing can get pulled outside of the write queue | |
async function writeDocContent(blocks: Transaction, update: DocUpdate): Promise<AnyLink> { | |
let value: DocValue | |
if (update.del) { | |
value = { del: true } | |
} else { | |
await processFiles(blocks, update.value as Doc) | |
value = { doc: update.value } | |
} | |
const block = await encode({ value, hasher, codec }) | |
blocks.putSync(block.cid, block.bytes) | |
return block.cid | |
} | |
async function processFiles(blocks: Transaction, doc: Doc) { | |
if (doc._files) { | |
await processFileset(blocks, doc._files) | |
} | |
if (doc._publicFiles) { | |
await processFileset(blocks, doc._publicFiles, true) | |
} | |
} | |
async function processFileset(blocks: Transaction, files: DocFiles, publicFiles = false) { | |
const dbBlockstore = blocks.parent as TransactionBlockstore | |
const t = new Transaction(dbBlockstore) | |
dbBlockstore.transactions.add(t) | |
const didPut = [] | |
let totalSize = 0 | |
for (const filename in files) { | |
if (File === files[filename].constructor) { | |
const file = files[filename] as File | |
totalSize += file.size | |
const { cid, blocks: fileBlocks } = await encodeFile(file) | |
didPut.push(filename) | |
for (const block of fileBlocks) { | |
t.putSync(block.cid, block.bytes) | |
} | |
files[filename] = { cid, type: file.type, size: file.size } as DocFileMeta | |
} | |
} | |
// todo option to bypass this limit | |
if (totalSize > 1024 * 1024 * 1) throw new Error('Sync limit for files in a single update is 1MB') | |
if (didPut.length) { | |
const car = await dbBlockstore.loader?.commit(t, { files } as FileResult, { public: publicFiles }) | |
if (car) { | |
for (const name of didPut) { | |
files[name] = { car, ...files[name] } as DocFileMeta | |
} | |
} | |
} | |
} | |
export async function getValueFromCrdt(blocks: TransactionBlockstore, head: ClockHead, key: string): Promise<DocValue> { | |
if (!head.length) throw new Error('Getting from an empty database') | |
const link = await get(blocks, head, key) | |
if (!link) throw new Error(`Missing key ${key}`) | |
return await getValueFromLink(blocks, link) | |
} | |
export function readFiles(blocks: TransactionBlockstore | LoggingFetcher, { doc }: DocValue) { | |
if (!doc) return | |
if (doc._files) { | |
readFileset(blocks, doc._files) | |
} | |
if (doc._publicFiles) { | |
readFileset(blocks, doc._publicFiles, true) | |
} | |
} | |
function readFileset(blocks: TransactionBlockstore | LoggingFetcher, files: DocFiles, isPublic = false) { | |
for (const filename in files) { | |
const fileMeta = files[filename] as DocFileMeta | |
if (fileMeta.cid) { | |
// if (!blocks.loader) throw new Error('Missing loader') | |
if (isPublic) { fileMeta.url = `https://${fileMeta.cid.toString()}.ipfs.w3s.link/` } | |
if (fileMeta.car && blocks.loader) { | |
const ld = blocks.loader as DbLoader | |
fileMeta.file = async () => await decodeFile({ | |
get: async (cid: AnyLink) => { | |
const reader = await ld.loadFileCar(fileMeta.car!, isPublic) | |
const block = await reader.get(cid as CID) | |
if (!block) throw new Error(`Missing block ${cid.toString()}`) | |
return block.bytes | |
} | |
}, fileMeta.cid, fileMeta) | |
} | |
} | |
files[filename] = fileMeta | |
} | |
} | |
async function getValueFromLink(blocks: TransactionBlockstore | LoggingFetcher, link: AnyLink): Promise<DocValue> { | |
const block = await blocks.get(link) | |
if (!block) throw new Error(`Missing linked block ${link.toString()}`) | |
const { value } = (await decode({ bytes: block.bytes, hasher, codec })) as { value: DocValue } | |
readFiles(blocks, value) | |
return value | |
} | |
class DirtyEventFetcher<T> extends EventFetcher<T> { | |
// @ts-ignore | |
async get(link) { | |
try { | |
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument | |
return await super.get(link) | |
} catch (e) { | |
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call | |
console.error('missing event', link.toString(), e) | |
return ({ value: null }) | |
} | |
} | |
} | |
export async function clockChangesSince( | |
blocks: TransactionBlockstore | LoggingFetcher, | |
head: ClockHead, | |
since: ClockHead, | |
opts: ChangesOptions | |
): Promise<{ result: DocUpdate[], head: ClockHead }> { | |
const eventsFetcher = (opts.dirty ? new DirtyEventFetcher<EventData>(blocks) : new EventFetcher<EventData>(blocks)) as EventFetcher<EventData> | |
const keys: Set<string> = new Set() | |
const updates = await gatherUpdates(blocks, eventsFetcher, head, since, [], keys, new Set<string>(), opts.limit || Infinity) | |
return { result: updates.reverse(), head } | |
} | |
async function gatherUpdates( | |
blocks: TransactionBlockstore | LoggingFetcher, | |
eventsFetcher: EventFetcher<EventData>, | |
head: ClockHead, | |
since: ClockHead, | |
updates: DocUpdate[] = [], | |
keys: Set<string>, didLinks: Set<string>, | |
limit: number | |
): Promise<DocUpdate[]> { | |
if (limit <= 0) return updates | |
const sHead = head.map(l => l.toString()) | |
for (const link of since) { | |
if (sHead.includes(link.toString())) { | |
return updates | |
} | |
} | |
for (const link of head) { | |
if (didLinks.has(link.toString())) continue | |
didLinks.add(link.toString()) | |
const { value: event } = await eventsFetcher.get(link) | |
if (!event) continue | |
const { key, value } = event.data | |
if (keys.has(key)) { | |
if (event.parents) { | |
updates = await gatherUpdates(blocks, eventsFetcher, event.parents, since, updates, keys, didLinks, limit) | |
} | |
} else { | |
keys.add(key) | |
const docValue = await getValueFromLink(blocks, value) | |
updates.push({ key, value: docValue.doc, del: docValue.del }) | |
limit-- | |
if (event.parents) { | |
updates = await gatherUpdates(blocks, eventsFetcher, event.parents, since, updates, keys, didLinks, limit) | |
} | |
} | |
} | |
return updates | |
} | |
export async function * getAllEntries(blocks: TransactionBlockstore, head: ClockHead) { | |
// return entries(blocks, head) | |
for await (const [key, link] of entries(blocks, head)) { | |
const docValue = await getValueFromLink(blocks, link) | |
yield { key, value: docValue.doc, del: docValue.del } as DocUpdate | |
} | |
} | |
export async function * clockVis(blocks: TransactionBlockstore, head: ClockHead) { | |
for await (const line of vis(blocks, head)) { | |
yield line | |
} | |
} | |
export async function doCompact(blocks: TransactionBlockstore, head: ClockHead) { | |
const blockLog = new LoggingFetcher(blocks) | |
const newBlocks = new Transaction(blocks) | |
for await (const [, link] of entries(blockLog, head)) { | |
const bl = await blockLog.get(link) | |
if (!bl) throw new Error('Missing block: ' + link.toString()) | |
// await newBlocks.put(link, bl.bytes) | |
} | |
// eslint-disable-next-line @typescript-eslint/no-unused-vars | |
for await (const _line of vis(blockLog, head)) { | |
void 1 | |
} | |
const result = await root(blockLog, head) | |
for (const { cid, bytes } of [...result.additions, ...result.removals]) { | |
newBlocks.putSync(cid, bytes) | |
} | |
await clockChangesSince(blockLog, head, [], {}) | |
for (const cid of blockLog.cids) { | |
const bl = await blocks.get(cid) | |
if (!bl) throw new Error('Missing block: ' + cid.toString()) | |
await newBlocks.put(cid, bl.bytes) | |
} | |
await blocks.commitCompaction(newBlocks, head) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type { Link } from 'multiformats' | |
import type { EventLink } from '@alanshaw/pail/clock' | |
import type { EventData } from '@alanshaw/pail/crdt' | |
import { ClockHead, Doc } from './types' | |
export type FireproofOptions = { | |
public?: boolean | |
meta?: DbMeta | |
persistIndexes?: boolean | |
} | |
// ts-unused-exports:disable-next-line | |
export type ClockLink = EventLink<EventData> | |
export type ClockHead = ClockLink[] | |
export type DocFragment = Uint8Array | string | number | boolean | null | DocFragment[] | { [key: string]: DocFragment } | |
export type Doc = DocBody & DocBase | |
export type DocBase = { | |
_id?: string | |
_files?: DocFiles | |
_publicFiles?: DocFiles | |
} | |
export type DocFileMeta = { | |
type: string; | |
size: number; | |
cid: AnyLink; | |
car?: AnyLink; | |
url?: string; | |
file?: () => Promise<File>; | |
} | |
type DocFiles = { | |
[key: string]: File | DocFileMeta | |
} | |
export type FileCarHeader = { | |
files: AnyLink[] | |
} | |
type DocBody = { | |
_files?: DocFiles | |
_publicFiles?: DocFiles | |
[key: string]: DocFragment | |
} | |
type DocMeta = { | |
proof?: DocFragment | |
clock?: ClockHead | |
} | |
export type DocUpdate = { | |
head: ClockHead, | |
key: string | |
value?: { [key: string]: any } | |
del?: boolean | |
} | |
export type DocValue = { | |
doc?: DocBody | |
del?: boolean | |
} | |
type IndexCars = { | |
[key: string]: AnyLink | |
} | |
export type IndexKey = [string, string] | string | |
export type IndexUpdate = { | |
key: IndexKey | |
value?: DocFragment | |
del?: boolean | |
} | |
export type IndexRow = { | |
id: string | |
key: IndexKey | |
doc?: Doc | null | |
value?: DocFragment | |
del?: boolean | |
} | |
type CarCommit = { | |
car?: AnyLink | |
} | |
export type BulkResult = { | |
head: ClockHead | |
} | |
export type FileResult = { | |
files: { [key: string]: DocFileMeta } | |
} | |
export type IdxMeta = { | |
byId: AnyLink | |
byKey: AnyLink | |
map: string | |
name: string | |
head: ClockHead | |
} | |
export type IdxMetaMap = { | |
indexes: Map<string, IdxMeta> | |
} | |
type CarHeader = { | |
cars: AnyLink[] | |
compact: AnyLink[] | |
} | |
export type IdxCarHeader = CarHeader & IdxMetaMap | |
export type DbCarHeader = CarHeader & { | |
head: ClockHead | |
} | |
export type AnyCarHeader = DbCarHeader | IdxCarHeader | FileCarHeader | |
export type CarLoaderHeader = DbCarHeader | IdxCarHeader | |
export type QueryOpts = { | |
descending?: boolean | |
limit?: number | |
includeDocs?: boolean | |
range?: [IndexKey, IndexKey] | |
key?: DocFragment, | |
keys?: DocFragment[] | |
prefix?: DocFragment | [DocFragment] | |
} | |
export type AnyLink = Link<unknown, number, number, 1 | 0> | |
export type AnyBlock = { cid: AnyLink; bytes: Uint8Array } | |
export type AnyDecodedBlock = { cid: AnyLink; bytes: Uint8Array, value: any } | |
export type BlockFetcher = { get: (link: AnyLink) => Promise<AnyBlock | undefined> } | |
type CallbackFn = (k: DocFragment, v?: DocFragment) => void | |
export type MapFn = (doc: Doc, map: CallbackFn) => DocFragment | void | |
export type DbMeta = { car: AnyLink, key: string | null } | |
export type CommitOpts = { noLoader?: boolean, compact?: boolean, public?: boolean } | |
export interface CarMakeable { | |
entries(): Iterable<AnyBlock> | |
get(cid: AnyLink): Promise<AnyBlock | undefined> | |
} | |
export type UploadMetaFnParams = { | |
name: string, | |
branch: string, | |
} | |
export type UploadDataFnParams = { | |
type: 'data' | 'file', | |
name: string, | |
car: string, | |
size: string | |
} | |
export type DownloadFnParamTypes = 'data' | 'file' | |
export type DownloadDataFnParams = { | |
type: DownloadFnParamTypes, | |
name: string, | |
car: string, | |
} | |
export type DownloadMetaFnParams = { | |
name: string, | |
branch: string, | |
} | |
export type LoadHandler = (dbMetas: DbMeta[]) => Promise<void> | |
export type ChangesOptions = { | |
dirty?: boolean | |
limit?: number | |
} | |
export type ChangesResponse = { | |
clock: ClockHead | |
rows: { key: string; value: Doc }[] | |
} | |
export type DbResponse = { | |
id: string | |
clock: ClockHead | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment