Skip to content

Instantly share code, notes, and snippets.

@kadeer
Created November 25, 2023 01:18
Show Gist options
  • Save kadeer/5f35fd4048249b967e13bbbd4085d031 to your computer and use it in GitHub Desktop.
Save kadeer/5f35fd4048249b967e13bbbd4085d031 to your computer and use it in GitHub Desktop.
q-up with xstate and data loader
import { createMachine, DoneInvokeEvent, interpret } from "xstate";
import DataLoader from "dataloader";
import { chunk } from "lodash";
type Mode = "DryRun" | "Live";
type AuthUser = { id: string };
type BatchRecord = { email: string; active: boolean };
type AppRecord = { id: string; email: string; active: boolean };
type DBAction = "NOTHING" | "CREATE" | "UPDATE" | "DELETE";
type SalesforceAction =
| "NOTHING"
| "CREATE_CONTACT"
| "UPDATE_CONTACT"
| "DELETE_CONTACT"
| "ACTIVATE_CONTACT"
| "DEACTIVATE_CONTACT";
type ActiveCampaignAction =
| "NOTHING"
| "CREATE_CONTACT"
| "UPDATE_CONTACT"
| "DELETE_CONTACT"
| "ACTIVATE_CONTACT"
| "DEACTIVATE_CONTACT";
type Auth0Action =
| "NOTHING"
| "CREATE_USER"
| "UPDATE_USER"
| "DELETE_USER"
| "ACTIVATE_USER"
| "DEACTIVATE_USER";
type ElasticSearchAction =
| "NOTHING"
| "CREATE_USER"
| "UPDATE_USER"
| "DELETE_USER"
| "ACTIVATE_USER"
| "DEACTIVATE_USER";
type GetDBActionDoneInvokeEventData = {
module: "db";
index: number;
action: DBAction;
};
type GetDBActionDoneInvokeEvent =
DoneInvokeEvent<GetDBActionDoneInvokeEventData>;
type GetSalesforceActionDoneInvokeEventData = {
module: "salesforce";
index: number;
action: SalesforceAction;
};
type GetSalesforceActionDoneInvokeEvent =
DoneInvokeEvent<GetSalesforceActionDoneInvokeEventData>;
type GetActiveCampaignActionDoneInvokeEventData = {
module: "activeCampaign";
index: number;
action: ActiveCampaignAction;
};
type GetActiveCampaignActionDoneInvokeEvent =
DoneInvokeEvent<GetActiveCampaignActionDoneInvokeEventData>;
type GetAuth0ActionDoneInvokeEventData = {
module: "auth0";
index: number;
action: Auth0Action;
};
type GetAuth0ActionDoneInvokeEvent =
DoneInvokeEvent<GetAuth0ActionDoneInvokeEventData>;
type ServiceActionDoneInvokeEventData =
| GetDBActionDoneInvokeEventData
| GetSalesforceActionDoneInvokeEventData
| GetActiveCampaignActionDoneInvokeEventData
| GetAuth0ActionDoneInvokeEventData;
const loaders = {
db: {
user: {
find: new DataLoader<string, AppRecord | null>((keys) =>
Promise.resolve([
null,
{ id: "", email: "", active: true },
] as (AppRecord | null)[]),
),
create: new DataLoader((keys) => Promise.resolve(keys)),
update: new DataLoader((keys) => Promise.resolve(keys)),
delete: new DataLoader((keys) => Promise.resolve(keys)),
},
activeCampaign: {
contact: {
find: new DataLoader((keys) => Promise.resolve(keys)),
create: new DataLoader((keys) => Promise.resolve(keys)),
update: new DataLoader((keys) => Promise.resolve(keys)),
delete: new DataLoader((keys) => Promise.resolve(keys)),
activate: new DataLoader((keys) => Promise.resolve(keys)),
deactivate: new DataLoader((keys) => Promise.resolve(keys)),
},
},
salesforce: {
contact: {
find: new DataLoader((keys) => Promise.resolve(keys)),
create: new DataLoader((keys) => Promise.resolve(keys)),
update: new DataLoader((keys) => Promise.resolve(keys)),
delete: new DataLoader((keys) => Promise.resolve(keys)),
activate: new DataLoader((keys) => Promise.resolve(keys)),
deactivate: new DataLoader((keys) => Promise.resolve(keys)),
},
},
},
activeCampaign: {
contact: {
find: new DataLoader((keys) => Promise.resolve(keys)),
create: new DataLoader((keys) => Promise.resolve(keys)),
update: new DataLoader((keys) => Promise.resolve(keys)),
delete: new DataLoader((keys) => Promise.resolve(keys)),
activate: new DataLoader((keys) => Promise.resolve(keys)),
deactivate: new DataLoader((keys) => Promise.resolve(keys)),
},
},
auth0: {},
salesforce: {},
};
const rootMachine = createMachine({
tsTypes: {} as import("./index.typegen").Typegen0,
id: "root",
initial: "init",
predictableActionArguments: true,
schema: {} as {
context: {
index: number;
mode: Mode;
user: AuthUser;
input: BatchRecord;
record?: AppRecord;
dataLoaders: typeof loaders;
};
},
states: {
init: {
invoke: {
src: async (context) => {
// determine action to take based on context
// if the record is not in the DB, then return "create"
// if the record is in the DB and the control instruction is to update the record, then return "update"
// if the record is in the DB and the control instruction is to delete the record, then return "delete"
// if record needs to go from unprovisioned to provisioned, then return "provision"
// if record needs to go from provisioned to unprovisioned, then return "de-provision"
const record = await context.dataLoaders.db.user.find.load(
context.input.email,
);
const params = {
module: "db",
index: context.index,
} as const;
switch (true) {
// the record is not in the DB
case !record:
return Promise.resolve<GetDBActionDoneInvokeEventData>({
...params,
action: "CREATE",
});
// the record is in the DB but the active flags don't match
case context.input.active !== record?.active:
return Promise.resolve<GetDBActionDoneInvokeEventData>({
...params,
action: "UPDATE",
});
default:
return Promise.resolve<GetDBActionDoneInvokeEventData>({
...params,
action: "NOTHING",
});
}
},
onDone: [
{
cond: (c, e: GetDBActionDoneInvokeEvent) => c.mode === "DryRun",
target: "provisioning",
},
{
cond: (c, e: GetDBActionDoneInvokeEvent) =>
e.data.action === "CREATE",
target: "creatingDBRecord",
},
{
cond: (c, e: GetDBActionDoneInvokeEvent) =>
e.data.action === "UPDATE",
target: "updatingDBRecord",
},
{
cond: (c, e: GetDBActionDoneInvokeEvent) =>
e.data.action === "DELETE",
target: "deletingDBRecord",
},
],
},
},
creatingDBRecord: {
invoke: {
src: (context) => {
// create the record in the DB
return context.dataLoaders.db.user.create.load({});
},
onDone: {
target: "provisioning",
},
},
},
updatingDBRecord: {
invoke: {
src: (context) => {
// update the record in the DB
return context.dataLoaders.db.user.update.load({});
},
onDone: {
target: "provisioning",
},
},
},
deletingDBRecord: {
invoke: {
src: (context) => {
// delete the record in the DB
return context.dataLoaders.db.user.delete.load({});
},
onDone: {
target: "provisioning",
},
},
},
provisioning: {
type: "parallel",
states: {
salesforce: {
initial: "checking",
states: {
checking: {
invoke: {
src: (context) => {
// check if the record exists in salesforce
return Promise.resolve<GetSalesforceActionDoneInvokeEventData>(
{
module: "salesforce",
index: context.index,
action: "CREATE_CONTACT",
},
);
},
onDone: [
{
cond: (c) => c.mode === "DryRun",
target: "success",
},
{
cond: (c, e: GetSalesforceActionDoneInvokeEvent) =>
e.data.action === "CREATE_CONTACT",
target: "creatingContact",
},
{
cond: (c, e: GetSalesforceActionDoneInvokeEvent) =>
e.data.action === "UPDATE_CONTACT",
target: "updatingContact",
},
{
cond: (c, e: GetSalesforceActionDoneInvokeEvent) =>
e.data.action === "DELETE_CONTACT",
target: "deletingContact",
},
{
cond: (c, e: GetSalesforceActionDoneInvokeEvent) =>
e.data.action === "ACTIVATE_CONTACT",
target: "activatingContact",
},
{
cond: (c, e: GetSalesforceActionDoneInvokeEvent) =>
e.data.action === "DEACTIVATE_CONTACT",
target: "deactivatingContact",
},
],
},
},
creatingContact: {
invoke: {
src: (context) => {
// create the record in salesforce
return Promise.resolve();
},
onDone: {
target: "success",
},
},
},
updatingContact: {
invoke: {
src: (context) => {
// update the record in salesforce
return Promise.resolve();
},
onDone: {
target: "success",
},
},
},
activatingContact: {
invoke: {
src: (context) => {
// activate the record in salesforce
return Promise.resolve();
},
onDone: {
target: "success",
},
},
},
deactivatingContact: {
invoke: {
src: (context) => {
// deactivate the record in salesforce
return Promise.resolve();
},
onDone: {
target: "success",
},
},
},
deletingContact: {
invoke: {
src: (context) => {
// delete the record in salesforce
return Promise.resolve();
},
onDone: {
target: "success",
},
},
},
success: {},
},
},
activeCampaign: {
initial: "checking",
states: {
checking: {
invoke: {
src: async (context) => {
// check if the record exists in activeCampaign
const record =
await context.dataLoaders.db.activeCampaign.contact.find.load(
context.input.email,
);
const params = {
module: "activeCampaign",
index: context.index,
} as const;
switch (true) {
// the record is not in the DB
case !record:
return Promise.resolve<GetActiveCampaignActionDoneInvokeEventData>(
{
...params,
action: "CREATE_CONTACT",
},
);
// the record is in the DB but the active flags don't match
case context.input.active !== context.record?.active:
return Promise.resolve<GetActiveCampaignActionDoneInvokeEventData>(
{
...params,
action: "UPDATE_CONTACT",
},
);
default:
return Promise.resolve<GetActiveCampaignActionDoneInvokeEventData>(
{
...params,
action: "NOTHING",
},
);
}
},
onDone: [
{
cond: (c) => c.mode === "DryRun",
target: "success",
},
{
cond: (_, e: GetActiveCampaignActionDoneInvokeEvent) =>
e.data.action === "CREATE_CONTACT",
target: "creatingContact",
},
{
cond: (_, e: GetActiveCampaignActionDoneInvokeEvent) =>
e.data.action === "UPDATE_CONTACT",
target: "updatingContact",
},
{
cond: (_, e: GetActiveCampaignActionDoneInvokeEvent) =>
e.data.action === "DELETE_CONTACT",
target: "deletingContact",
},
{
cond: (_, e: GetActiveCampaignActionDoneInvokeEvent) =>
e.data.action === "ACTIVATE_CONTACT",
target: "activatingContact",
},
{
cond: (_, e: GetActiveCampaignActionDoneInvokeEvent) =>
e.data.action === "DEACTIVATE_CONTACT",
target: "deactivatingContact",
},
],
},
},
creatingContact: {},
updatingContact: {},
activatingContact: {},
deactivatingContact: {},
deletingContact: {},
success: {},
},
},
auth0: {
initial: "checking",
states: {
checking: {
invoke: {
src: (context) => {
// check if the record exists in auth0
return Promise.resolve<GetAuth0ActionDoneInvokeEventData>({
module: "auth0",
index: context.index,
action: "CREATE_USER",
});
},
onDone: [
{
cond: (c) => c.mode === "DryRun",
target: "success",
},
],
},
},
creatingUser: {},
updatingUser: {},
activatingUser: {},
deactivatingUser: {},
deletingUser: {},
success: {},
},
},
},
},
},
});
// Runs the machine with the interpreter for a single record
// interpret(
// rootMachine.withContext({
// index: 0,
// mode: "DryRun",
// user: {
// id: "123",
// },
// record: {
// id: "123",
// },
// dataLoaders: loaders,
// }),
// ).start();
async function run() {
const batchSize = 50;
const records: BatchRecord[] = [
{ email: "1@test.com", active: true },
{ email: "2@test.com", active: true },
];
const actions: ServiceActionDoneInvokeEventData[][] = [];
const pooledMachines = new Array(Math.min(batchSize, records.length))
.fill(0)
.map(() => {
const interpreter = interpret(rootMachine, {
deferEvents: false,
devTools: false,
});
interpreter.onTransition((state, event) => {
if ("data" in event) {
const actionEvent = event as
| GetDBActionDoneInvokeEvent
| GetActiveCampaignActionDoneInvokeEvent
| GetAuth0ActionDoneInvokeEvent
| GetSalesforceActionDoneInvokeEvent;
if (!actions[actionEvent.data.index]) {
actions[actionEvent.data.index] = [];
}
console.log(actionEvent.data);
actions[actionEvent.data.index].push(actionEvent.data);
}
});
return interpreter;
});
let i = 0;
for await (const batch of chunk(records, batchSize)) {
const runPromises = pooledMachines.map((m) => {
m.start({
...m.initialState,
context: {
index: i,
dataLoaders: loaders,
mode: "DryRun",
user: {
id: "123",
},
input: batch[i],
},
});
i++;
});
await Promise.all(runPromises);
}
}
run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment