Skip to content

Instantly share code, notes, and snippets.

@jacobdam
Created September 17, 2023 05:24
Show Gist options
  • Save jacobdam/0979b899a6647b862a2268d62307f83d to your computer and use it in GitHub Desktop.
Save jacobdam/0979b899a6647b862a2268d62307f83d to your computer and use it in GitHub Desktop.
WIP - Simple implementation of workflow engine
import assert from "node:assert";
import { AsyncLocalStorage } from "node:async_hooks";
// database
interface WorkflowInst {
completedSteps: number;
recordedStepCalls: { name: string; params: any }[]; // for debug only
}
const db: { [_: string]: WorkflowInst } = {};
async function getWorkflowInstanceFromDB(
instId: string
): Promise<WorkflowInst | null> {
return db[instId] || null;
}
async function saveWorkflowInstanceToDB(
instId: string,
inst: WorkflowInst
): Promise<void> {
db[instId] = inst;
}
interface WorkflowContext {
currentStep: number;
instId: string;
workflowInst: WorkflowInst;
}
// workflow engine
const asyncLocalStorage = new AsyncLocalStorage();
async function runWorkflow<TParams>(
instId: string,
workflowFn: (_: TParams) => void,
params: TParams
): Promise<void> {
const ctx: WorkflowContext = {
currentStep: -1,
instId,
workflowInst: (await getWorkflowInstanceFromDB(instId)) || {
completedSteps: 0,
recordedStepCalls: [],
},
};
await saveWorkflowInstanceToDB(instId, ctx.workflowInst);
await asyncLocalStorage.run(ctx, () => {
return workflowFn(params);
});
}
async function _runStep<TParams>(
stepFn: (_: TParams) => void,
params: TParams
): Promise<void> {
const ctx: WorkflowContext = asyncLocalStorage.getStore() as any;
ctx.currentStep++;
if (ctx.currentStep < ctx.workflowInst.completedSteps) {
// ensure the completed step run previously have same stepName and params
const recordedStepCall =
ctx.workflowInst.recordedStepCalls[ctx.currentStep];
assert.equal(
recordedStepCall.name,
(stepFn as any).stepName,
"step function must be the same"
);
assert.deepEqual(
recordedStepCall.params,
params,
"step params must be the same"
);
return;
}
await stepFn(params);
ctx.workflowInst.recordedStepCalls.push({
name: (stepFn as any).stepName,
params,
});
ctx.workflowInst.completedSteps++;
await saveWorkflowInstanceToDB(ctx.instId, ctx.workflowInst);
}
// annotation syntax:
// @Step()
// async function someStep(params: { foo: string }) {
// }
function Step<T>(stepFn: (_: T) => void, fnName?: string) {
const fn = function step(params: T) {
_runStep(stepFn, params);
};
fn.stepName = fnName || stepFn.name;
return fn;
}
function log(...args: unknown[]) {
const ctx: WorkflowContext = asyncLocalStorage.getStore() as any;
console.log(ctx.instId, ...args);
}
// user code
function sleep(miliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, miliseconds));
}
async function workflow1(params: string) {
log("workflow1", "start", params);
await step1(params);
if (params === "bar") {
await step2(params);
}
log("workflow1", "end", params);
}
const step1 = Step(async function step1(params: string) {
log("step1", "start", params);
await sleep(1000);
log("step1", "end", params);
});
const step2 = Step(async function step2(params: string) {
log("step2", params);
});
runWorkflow("wf1-01", workflow1, "foo");
runWorkflow("wf1-02", workflow1, "bar");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment