Skip to content

Instantly share code, notes, and snippets.

@InBrewJ
Last active March 23, 2023 13:40
Show Gist options
  • Save InBrewJ/4c3441fc216a0c55d67d33a155691a19 to your computer and use it in GitHub Desktop.
Save InBrewJ/4c3441fc216a0c55d67d33a155691a19 to your computer and use it in GitHub Desktop.
An evening's noodling into a generic pipeline runner in JS
// ETLite.js is a cool name, eh?
// A slight bastardisation of the 'Chain of Responsibility' pattern
// Inspired by:
// https://github.com/InBrewJ/ts-for-dsa-minimal/blob/master/typescript-tasks/003-fizzbuzz-chain-responsibility/003-fizzbuzz-chain-responsibility.ts
// In a sense, is this is a 'Tree of Responsibility'?
// It's kind of like a DAG. It IS a DAG?
// We essentially want a tree of stages
// At the end of the pipeline, dump a report
// top level stage keeps track of how many stages there are? Maybe? Is this important?
// Or is the top level stage the Integrator?
const pprint = (obj) => {
return JSON.stringify(obj);
};
const throwsError = (input) => {
console.log(`throwsError :: input => ${pprint(input)}`);
throw new Error("You throw me right round");
};
const createLocationsLookupTable = (locationsInput) => {
console.log(
`createLocationsLookupTable :: sharedInput => ${pprint(
locationsInput
)} (something v synchronous)`
);
const juicyData = [
{
dataOne: 1,
filterMe: "bye",
},
{
dataOne: 1,
},
].map((item) => {
return {
...item,
...locationsInput,
};
});
const lookupTable = new Map([
[1, "sunny"],
[2, "raining"],
]);
return {
juicyData,
lookupTable,
};
};
const asyncNetworkCall = async () => {
return new Promise((resolve) => {
setTimeout(() => {
console.log("resolving inFunctionAsync...");
resolve([
{
dataTwo: 2,
},
{
dataTwo: 2,
meFilter: "hey",
},
]);
}, 1000);
});
};
const getLocations = (connection) => (input) => {
console.log(`getLocations :: input ${pprint(input)}`);
return {
shared: "sharedKey",
};
};
// do this concurrently with getDataOne (or at least try)
const getChargePoints =
(connection) =>
async ({ juicyData, lookupTable }) => {
console.log(`getChargePoints :: juicyData => ${pprint(juicyData)}`);
console.log(`getChargePoints :: lookupTable => ${Array.from(lookupTable)}`);
try {
const inFunctionAsync = await asyncNetworkCall();
const blendedForNoReason = [...inFunctionAsync, ...juicyData];
const mutatedForNoReason = blendedForNoReason.map((item, index) => {
return {
...item,
dataOne: lookupTable.get(item.dataOne) || item.dataOne,
dataTwo: lookupTable.get(item.dataTwo) || item.dataTwo,
};
});
return mutatedForNoReason;
} catch (error) {
console.log("NOPE in getChargePoints");
console.log(error);
}
};
const pushUpdates = (connection) => async (input) => {
console.log(`pushUpdates :: pushing ${pprint(input)} to XDX`);
return new Promise((resolve) => {
setTimeout(() => {
console.log("resolving pushUpdates...");
resolve([
{
pushResponse: "pushed, well done!",
},
]);
}, 2000);
});
};
class AbstractStage {
#nextHandler = undefined;
#handlerFn = (e) => {
// This is perhaps a bad idea...
console.log(`WARNING -> handler for ${this.#name} is passthrough default!`);
return e;
};
#name = "";
#reporter = undefined;
constructor(name, reporter) {
this.#name = name;
this.#reporter = reporter;
}
setHandler(handlerFn) {
this.#handlerFn = handlerFn;
}
setNext(handler) {
this.#nextHandler = handler;
return handler;
}
async execute(thing) {
if (this.#handlerFn === undefined) {
throw Error("You need to set handlerFn!");
}
// call handlerFn, pass result onto next stage
// it might be a promise, it might not be
// we don't care, always Promise.resolve it
//
const result = this.#handlerFn(thing);
return Promise.resolve(result)
.then((r) => {
this.#reporter(
`${this.#name} stage has finished with response => ${pprint(r)}`
);
if (this.#nextHandler !== undefined) {
this.#nextHandler.execute(r);
}
})
.catch((e) => {
this.#reporter(`${this.#name} errored out! :: ${e.message}`);
});
}
}
// Effectively nominal types
class Stage extends AbstractStage {}
class FetchStage extends AbstractStage {}
class PushStage extends AbstractStage {}
// #transformerFn is always synchronous?
class Transformer extends AbstractStage {
#transformerFn = undefined;
setTransformer(transformer) {
this.#transformerFn = transformer;
}
async execute(thing) {
const transformedResult = this.#transformerFn(thing);
await super.execute(transformedResult);
}
}
const byMeFilter = (x) => x.meFilter === undefined;
const byFilterMe = (x) => x.filterMe === undefined;
class Pipeline {
// made of many AsyncStage and SyncStage?
// reports on the progress of the stage at the end
#pipeline = undefined;
#report = [];
#reporter = (msg) => {
this.#report.push(msg);
};
build() {
const getLocationsStage = new FetchStage("getLocations", this.#reporter);
getLocationsStage.setHandler(
// getChargePoints = handler with HTTP (or something else?) connection
// note than at HTTP connection may have pagination (case by case)
getLocations("locations_connection_object")
);
const createLocationsLookupTableStage = new Stage(
"createLocationsLookupTable",
this.#reporter
);
createLocationsLookupTableStage.setHandler(createLocationsLookupTable);
const getChargepointsStage = new FetchStage(
"getChargepointsStage",
this.#reporter
);
getChargepointsStage.setHandler(
// getChargePoints = handler with HTTP (or something else?) connection
// note than at HTTP connection may have pagination (case by case)
getChargePoints("chargepoints_connection_object")
);
const filterWithinIntervalStage = new Transformer(
"filterWithinIntervalStage",
this.#reporter
);
// bit weird that you have to set the handler on a Transformer, eh?
// filterWithinIntervalStage.setHandler((e) => e);
filterWithinIntervalStage.setTransformer((e) => {
console.log("Running filterWithinIntervalTransformer");
return e.filter(byFilterMe).filter(byMeFilter);
});
const pushUpdatesStage = new PushStage("pushUpdatesStage", this.#reporter);
pushUpdatesStage.setHandler(pushUpdates("xdx_connection_object"));
const stages = [
getLocationsStage,
createLocationsLookupTableStage,
getChargepointsStage,
// filterWithinIntervalStage,
pushUpdatesStage,
];
const [firstStage, ...rest] = stages;
const final = new Stage("Final stage, report results", this.#reporter);
final.setHandler(() => {
const topReportString =
"*".repeat(30) + " PIPELINE REPORT " + "*".repeat(30);
console.log(topReportString);
console.log(this.#report.join("\n"));
console.log("*".repeat(topReportString.length));
});
rest
.reduce(
(pipelineInProgress, stage) => pipelineInProgress.setNext(stage),
firstStage
)
.setNext(final);
this.#pipeline = firstStage;
// alternative pipeline construction, simpler to read but
// less parametric...
// getLocationsStage
// .setNext(createLocationsLookupTableStage)
// .setNext(getChargepointsStage)
// .setNext(filterWithinIntervalStage)
// .setNext(pushUpdatesStage)
// .setNext(final);
// How do we know when the pipeline has terminated?
// this.#pipeline = getLocationsStage;
}
// run
async run() {
await this.#pipeline.execute();
}
}
/// main
(async () => {
const myFirstIntegrator = new Pipeline();
myFirstIntegrator.build();
myFirstIntegrator.run();
})();
// possible syntax for DAG creation:
// presumably a DAG needs some sort of cycle detection to validate DAGness
// graph algo help?
// - traverse graph (DFS?)
// - add vertices to <array>
// - if any duplicates appear in <array> (binary search, nlogn), graph has cycles - not a DAG?
// - does a vertex / node some hash to record 'uniqueness'?
// each node can have N children (at same level in tree)
class PipelineStage {
// parent must be set first, bc it runs first
#parent = undefined;
// queue?
// or is this a .setNext list Pipeline thing?
#children = [];
constructor() {}
async run() {
// return result from #parent
// if children.length pass result into first stage of each child pipeline
// if no children, return the result
}
addStage(pipelineStage) {
if (this.#parent === undefined) {
this.#parent = pipelineStage;
return;
}
this.#children.push(pipelineStage);
}
}
const dag0 = [
{
parent: sharedFirstStage,
// outputs of 'parent' are available to first stage of all 'children' pipelines
children: [
[stage1x, stage2x, stage3x], // pipeline stage with only children?
[stage1y, stage2y, stage3y], // pipeline stage with only children?
],
},
];
// const dag00 = [
// {
// rootStage: sharedFirstStage,
// // outputs of 'parent' are available to first stage of all 'children' pipelines
// subPipelines: [
// [stage1x, stage2x, stage3x],
// [stage1y, stage2y, stage3y],
// ],
// },
// ];
// const dag = {
// sharedFirstStage: {
// pipeline1: [stage1, stage2, stage3],
// pipeline2: [stage1, stage2, stage3],
// pipeline3: [stage1, stage2, stage3],
// },
// };
// const dag2 = {
// sharedFirstStage: {
// pipeline1: [
// {
// sharedStage1: {
// pipelineSub1: [stageSub1, stageSub2, stageSub3],
// },
// },
// stage2,
// stage3,
// ],
// },
// };
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment