Skip to content

Instantly share code, notes, and snippets.

@danopia
Created May 24, 2023 08:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danopia/b66023ac488c4913c3747e37669c7f55 to your computer and use it in GitHub Desktop.
Save danopia/b66023ac488c4913c3747e37669c7f55 to your computer and use it in GitHub Desktop.
import * as ows from "https://deno.land/x/stream_observables@v1.3/mod.ts";
import { trace, SpanKind, propagation, context, TextMapGetter, ROOT_CONTEXT, SpanContext } from "https://deno.land/x/observability@v0.4.0/opentelemetry/api.js";
type KvRealtimeEvent =
| {
type: 'insert' | 'replace';
appliedAt: Date;
key: Deno.KvKey;
value: unknown;
versionstamp: string;
}
| {
type: 'delete';
appliedAt: Date;
key: Deno.KvKey;
versionstamp: string;
}
;
const tracer = trace.getTracer('kv-realtime');
export class KvRealtimeContext {
constructor(
private readonly kv: Deno.Kv,
private readonly broadcastChannel?: BroadcastChannel
) {
broadcastChannel?.addEventListener('message', (evt) => {
const payload: { event: KvRealtimeEvent; baggage: Record<string,string> } = evt.data;
const ctx = propagation.extract(ROOT_CONTEXT, payload.baggage, BaggageGetter);
context.with(ctx, () => this.processEvent(payload.event));
});
}
private async generateEvent(event: KvRealtimeEvent) {
await tracer.startActiveSpan(`KvRealtime:${event.type}`, {
kind: SpanKind.PRODUCER,
attributes: {
'kv.event': event.type,
'kv.key': event.key as string[],
'kv.versionstamp': event.versionstamp,
},
}, async span => {
const baggage: Record<string,string> = {};
propagation.inject(context.active(), baggage, {
set: (h, k, v) => h[k] = typeof v === 'string' ? v : String(v),
});
this.broadcastChannel?.postMessage({
event,
baggage,
});
await this.processEvent(event)
.catch(err => {
span.recordException(err);
return Promise.reject(err);
})
.finally(() => span.end());
});
}
private async processEvent(event: KvRealtimeEvent) {
console.log('running kv event:', event.type, event.key.join('/'));
for (const observer of this.observers) {
if (event.key.length < observer.prefix.length) continue;
if (observer.prefix.every((part, idx) => event.key[idx] == part)) {
tracer.startActiveSpan(`KvRealtime:${event.type}`, {
kind: SpanKind.CONSUMER,
attributes: {
'kv.event': event.type,
'kv.key': event.key as string[],
'kv.versionstamp': event.versionstamp,
},
links: observer.spanCtx ? [{
context: observer.spanCtx,
}] : [],
}, span => {
observer.next(event);
span.end();
});
}
}
}
async getKey(key: Deno.KvKey) {
const result = await this.kv.get(key);
return result;
}
async collectList(opts: { prefix: Deno.KvKey }) {
const entities = new Array<Deno.KvEntry<unknown>>();
for await (const entry of this.kv.list(opts)) {
entities.push(entry);
}
return entities;
}
observers = new Set<{
prefix: Deno.KvKey,
next: ows.NextFunc<KvRealtimeEvent>,
spanCtx?: SpanContext,
}>();
observePrefix(prefix: Deno.KvKey, abortSignal: AbortSignal) {
return ows
.concat<KvRealtimeEvent | {type: 'ready'}>(
ows
.fromIterable(this.kv.list({ prefix }))
.pipeThrough(ows.map(entry => ({
type: 'insert',
appliedAt: new Date(),
key: entry.key,
value: entry.value,
versionstamp: entry.versionstamp,
}))),
ows
.just({type: 'ready'}),
ows
.fromNext<KvRealtimeEvent>(next => {
const observer = {
prefix,
next,
spanCtx: trace.getSpanContext(context.active()),
};
abortSignal.throwIfAborted();
this.observers.add(observer);
abortSignal.addEventListener('abort', () => {
this.observers.delete(observer);
next(ows.EOF);
});
})
// .pipeThrough(ows.filter<KvRealtimeEvent>(evt => {
// if (evt.key.length < prefix.length) return false;
// return prefix.every((part, idx) => evt.key[idx] == part);
// })),
);
}
async createKey(key: Deno.KvKey, value: unknown) {
const result = await this.kv.atomic()
.check({ key, versionstamp: null })
.set(key, value)
.commit();
if (result.ok) {
this.generateEvent({
type: 'insert',
appliedAt: new Date(),
key, value,
versionstamp: result.versionstamp,
});
}
return result;
}
async replaceKey(key: Deno.KvKey, versionstamp: string, value: unknown) {
const result = await this.kv.atomic()
.check({ key, versionstamp })
.set(key, value)
.commit();
if (result.ok) {
this.generateEvent({
type: 'replace',
appliedAt: new Date(),
key, value,
versionstamp: result.versionstamp,
});
}
return result;
}
async deleteKey(key: Deno.KvKey, versionstamp: string) {
const result = await this.kv.atomic()
.check({ key, versionstamp })
.delete(key)
.commit();
if (result.ok) {
this.generateEvent({
type: 'delete',
appliedAt: new Date(),
key,
versionstamp: result.versionstamp,
});
}
return result;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment