Skip to content

Instantly share code, notes, and snippets.

@aricart
Last active February 15, 2024 16:17
Show Gist options
  • Save aricart/18ae2f305f263ec915330684a3029698 to your computer and use it in GitHub Desktop.
Save aricart/18ae2f305f263ec915330684a3029698 to your computer and use it in GitHub Desktop.
Message Pipeline
So this example does a couple of things that are not in the obvious side.
If none of the fields of the message are read, there's no work cost as the pipeline never runs
If there's an error on the pipeline, it tosses, since the pipeline doesn't run until read, the error
will pop when the user access the message. This is not great but cannot add an async condition,
because the async portion is resolved when the message is read. Blowing up on access means that the
trace will be on message access (not great, but not as bad as it can be)
This means that if a pipeline is assigned far away, the error will be non-sensical and will lead
to issues filed due to user code problems
import {
Msg,
MsgHdrs,
Payload,
PublishOptions,
ReviverFn,
} from "../src/mod.ts";
import { MsgImpl } from "../nats-base-client/msg.ts";
import { Publisher } from "../nats-base-client/core.ts";
import { TD } from "../nats-base-client/encoders.ts";
export class MutableMsg implements Msg {
data: Uint8Array;
subject: string;
sid: number;
reply: string | undefined;
headers: MsgHdrs | undefined;
publisher: Publisher;
constructor(src: Msg) {
this.data = src.data;
this.subject = src.subject;
this.reply = src.reply;
this.headers = src.headers;
this.sid = src.sid;
this.publisher = (src as MsgImpl).publisher;
}
json<T>(reviver?: ReviverFn): T {
return JSON.parse(new TextDecoder().decode(this.data), reviver);
}
respond(payload?: Payload, opts?: PublishOptions): boolean {
this.publisher.publish(this.reply!, payload!, opts);
return true;
}
string(): string {
return TD.decode(this.data);
}
}
export interface Pipelines {
transform(m: Msg): Msg;
}
export type PipelineFn = (msg: Msg) => Msg;
export class Pipeline implements Pipelines {
private readonly pipeline: PipelineFn[];
constructor(...pipeline: PipelineFn[]) {
this.pipeline = pipeline;
}
transform(m: Msg): Msg {
return new PipelineMsg(m, ...this.pipeline);
}
}
export class PipelineMsg implements Msg {
private readonly src: Msg;
private readonly pipeline: PipelineFn[];
private transformed!: Msg;
constructor(m: Msg, ...pipeline: PipelineFn[]) {
this.src = m;
this.pipeline = pipeline;
}
init() {
if (this.transformed === undefined) {
let m = this.src;
for (const fn of this.pipeline) {
m = fn(m);
}
this.transformed = m;
}
}
get data(): Uint8Array {
this.init();
return this.transformed.data;
}
get subject(): string {
this.init();
return this.transformed.subject;
}
get reply() {
this.init();
return this.transformed.reply;
}
get sid() {
return this.src.sid;
}
get headers(): MsgHdrs | undefined {
this.init();
return this.transformed?.headers;
}
json<T>(reviver?: ReviverFn): T {
this.init();
return this.transformed.json(reviver);
}
respond(payload?: Payload, opts?: PublishOptions): boolean {
this.init();
return this.transformed.respond(payload, opts);
}
string(): string {
this.init();
return this.transformed.string();
}
}
import { connect, headers, Msg } from "../src/mod.ts";
import { MutableMsg, Pipeline } from "./msg_pipeline.ts";
const nc = await connect({ servers: "demo.nats.io:4222" });
function addHeader(m: Msg): Msg {
// this needs to clone the headers
const h = m.headers || headers();
h.set("X-Transformed", "yes");
const mm = new MutableMsg(m);
mm.headers = h;
return mm;
}
function reverse(m: Msg): Msg {
const mm = new MutableMsg(m);
mm.data = new TextEncoder().encode(m.string().split("").reverse().join(""));
return mm;
}
const pipeline = new Pipeline(addHeader, reverse);
nc.subscribe("q", {
callback(_, msg) {
const m = pipeline.transform(msg);
// javascript doesn't have respondMsg() or publishMsg()
msg.respond(m.data, { headers: m.headers });
},
});
const m = await nc.request("q", "hello");
console.log("X-Transformed:", m.headers?.get("X-Transformed"));
console.log(m.string());
await nc.close();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment