Last active
February 15, 2024 16:17
-
-
Save aricart/18ae2f305f263ec915330684a3029698 to your computer and use it in GitHub Desktop.
Message Pipeline
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
So this example does a couple of things that are not in the obvious side. | |
If none of the fields of the message are read, there's no work cost as the pipeline never runs | |
If there's an error on the pipeline, it tosses, since the pipeline doesn't run until read, the error | |
will pop when the user access the message. This is not great but cannot add an async condition, | |
because the async portion is resolved when the message is read. Blowing up on access means that the | |
trace will be on message access (not great, but not as bad as it can be) | |
This means that if a pipeline is assigned far away, the error will be non-sensical and will lead | |
to issues filed due to user code problems |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
Msg, | |
MsgHdrs, | |
Payload, | |
PublishOptions, | |
ReviverFn, | |
} from "../src/mod.ts"; | |
import { MsgImpl } from "../nats-base-client/msg.ts"; | |
import { Publisher } from "../nats-base-client/core.ts"; | |
import { TD } from "../nats-base-client/encoders.ts"; | |
export class MutableMsg implements Msg { | |
data: Uint8Array; | |
subject: string; | |
sid: number; | |
reply: string | undefined; | |
headers: MsgHdrs | undefined; | |
publisher: Publisher; | |
constructor(src: Msg) { | |
this.data = src.data; | |
this.subject = src.subject; | |
this.reply = src.reply; | |
this.headers = src.headers; | |
this.sid = src.sid; | |
this.publisher = (src as MsgImpl).publisher; | |
} | |
json<T>(reviver?: ReviverFn): T { | |
return JSON.parse(new TextDecoder().decode(this.data), reviver); | |
} | |
respond(payload?: Payload, opts?: PublishOptions): boolean { | |
this.publisher.publish(this.reply!, payload!, opts); | |
return true; | |
} | |
string(): string { | |
return TD.decode(this.data); | |
} | |
} | |
export interface Pipelines { | |
transform(m: Msg): Msg; | |
} | |
export type PipelineFn = (msg: Msg) => Msg; | |
export class Pipeline implements Pipelines { | |
private readonly pipeline: PipelineFn[]; | |
constructor(...pipeline: PipelineFn[]) { | |
this.pipeline = pipeline; | |
} | |
transform(m: Msg): Msg { | |
return new PipelineMsg(m, ...this.pipeline); | |
} | |
} | |
export class PipelineMsg implements Msg { | |
private readonly src: Msg; | |
private readonly pipeline: PipelineFn[]; | |
private transformed!: Msg; | |
constructor(m: Msg, ...pipeline: PipelineFn[]) { | |
this.src = m; | |
this.pipeline = pipeline; | |
} | |
init() { | |
if (this.transformed === undefined) { | |
let m = this.src; | |
for (const fn of this.pipeline) { | |
m = fn(m); | |
} | |
this.transformed = m; | |
} | |
} | |
get data(): Uint8Array { | |
this.init(); | |
return this.transformed.data; | |
} | |
get subject(): string { | |
this.init(); | |
return this.transformed.subject; | |
} | |
get reply() { | |
this.init(); | |
return this.transformed.reply; | |
} | |
get sid() { | |
return this.src.sid; | |
} | |
get headers(): MsgHdrs | undefined { | |
this.init(); | |
return this.transformed?.headers; | |
} | |
json<T>(reviver?: ReviverFn): T { | |
this.init(); | |
return this.transformed.json(reviver); | |
} | |
respond(payload?: Payload, opts?: PublishOptions): boolean { | |
this.init(); | |
return this.transformed.respond(payload, opts); | |
} | |
string(): string { | |
this.init(); | |
return this.transformed.string(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { connect, headers, Msg } from "../src/mod.ts"; | |
import { MutableMsg, Pipeline } from "./msg_pipeline.ts"; | |
const nc = await connect({ servers: "demo.nats.io:4222" }); | |
function addHeader(m: Msg): Msg { | |
// this needs to clone the headers | |
const h = m.headers || headers(); | |
h.set("X-Transformed", "yes"); | |
const mm = new MutableMsg(m); | |
mm.headers = h; | |
return mm; | |
} | |
function reverse(m: Msg): Msg { | |
const mm = new MutableMsg(m); | |
mm.data = new TextEncoder().encode(m.string().split("").reverse().join("")); | |
return mm; | |
} | |
const pipeline = new Pipeline(addHeader, reverse); | |
nc.subscribe("q", { | |
callback(_, msg) { | |
const m = pipeline.transform(msg); | |
// javascript doesn't have respondMsg() or publishMsg() | |
msg.respond(m.data, { headers: m.headers }); | |
}, | |
}); | |
const m = await nc.request("q", "hello"); | |
console.log("X-Transformed:", m.headers?.get("X-Transformed")); | |
console.log(m.string()); | |
await nc.close(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment