Created
November 25, 2023 01:18
-
-
Save kadeer/5f35fd4048249b967e13bbbd4085d031 to your computer and use it in GitHub Desktop.
q-up with xstate and data loader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { createMachine, DoneInvokeEvent, interpret } from "xstate"; | |
import DataLoader from "dataloader"; | |
import { chunk } from "lodash"; | |
type Mode = "DryRun" | "Live"; | |
type AuthUser = { id: string }; | |
type BatchRecord = { email: string; active: boolean }; | |
type AppRecord = { id: string; email: string; active: boolean }; | |
type DBAction = "NOTHING" | "CREATE" | "UPDATE" | "DELETE"; | |
type SalesforceAction = | |
| "NOTHING" | |
| "CREATE_CONTACT" | |
| "UPDATE_CONTACT" | |
| "DELETE_CONTACT" | |
| "ACTIVATE_CONTACT" | |
| "DEACTIVATE_CONTACT"; | |
type ActiveCampaignAction = | |
| "NOTHING" | |
| "CREATE_CONTACT" | |
| "UPDATE_CONTACT" | |
| "DELETE_CONTACT" | |
| "ACTIVATE_CONTACT" | |
| "DEACTIVATE_CONTACT"; | |
type Auth0Action = | |
| "NOTHING" | |
| "CREATE_USER" | |
| "UPDATE_USER" | |
| "DELETE_USER" | |
| "ACTIVATE_USER" | |
| "DEACTIVATE_USER"; | |
type ElasticSearchAction = | |
| "NOTHING" | |
| "CREATE_USER" | |
| "UPDATE_USER" | |
| "DELETE_USER" | |
| "ACTIVATE_USER" | |
| "DEACTIVATE_USER"; | |
type GetDBActionDoneInvokeEventData = { | |
module: "db"; | |
index: number; | |
action: DBAction; | |
}; | |
type GetDBActionDoneInvokeEvent = | |
DoneInvokeEvent<GetDBActionDoneInvokeEventData>; | |
type GetSalesforceActionDoneInvokeEventData = { | |
module: "salesforce"; | |
index: number; | |
action: SalesforceAction; | |
}; | |
type GetSalesforceActionDoneInvokeEvent = | |
DoneInvokeEvent<GetSalesforceActionDoneInvokeEventData>; | |
type GetActiveCampaignActionDoneInvokeEventData = { | |
module: "activeCampaign"; | |
index: number; | |
action: ActiveCampaignAction; | |
}; | |
type GetActiveCampaignActionDoneInvokeEvent = | |
DoneInvokeEvent<GetActiveCampaignActionDoneInvokeEventData>; | |
type GetAuth0ActionDoneInvokeEventData = { | |
module: "auth0"; | |
index: number; | |
action: Auth0Action; | |
}; | |
type GetAuth0ActionDoneInvokeEvent = | |
DoneInvokeEvent<GetAuth0ActionDoneInvokeEventData>; | |
type ServiceActionDoneInvokeEventData = | |
| GetDBActionDoneInvokeEventData | |
| GetSalesforceActionDoneInvokeEventData | |
| GetActiveCampaignActionDoneInvokeEventData | |
| GetAuth0ActionDoneInvokeEventData; | |
const loaders = { | |
db: { | |
user: { | |
find: new DataLoader<string, AppRecord | null>((keys) => | |
Promise.resolve([ | |
null, | |
{ id: "", email: "", active: true }, | |
] as (AppRecord | null)[]), | |
), | |
create: new DataLoader((keys) => Promise.resolve(keys)), | |
update: new DataLoader((keys) => Promise.resolve(keys)), | |
delete: new DataLoader((keys) => Promise.resolve(keys)), | |
}, | |
activeCampaign: { | |
contact: { | |
find: new DataLoader((keys) => Promise.resolve(keys)), | |
create: new DataLoader((keys) => Promise.resolve(keys)), | |
update: new DataLoader((keys) => Promise.resolve(keys)), | |
delete: new DataLoader((keys) => Promise.resolve(keys)), | |
activate: new DataLoader((keys) => Promise.resolve(keys)), | |
deactivate: new DataLoader((keys) => Promise.resolve(keys)), | |
}, | |
}, | |
salesforce: { | |
contact: { | |
find: new DataLoader((keys) => Promise.resolve(keys)), | |
create: new DataLoader((keys) => Promise.resolve(keys)), | |
update: new DataLoader((keys) => Promise.resolve(keys)), | |
delete: new DataLoader((keys) => Promise.resolve(keys)), | |
activate: new DataLoader((keys) => Promise.resolve(keys)), | |
deactivate: new DataLoader((keys) => Promise.resolve(keys)), | |
}, | |
}, | |
}, | |
activeCampaign: { | |
contact: { | |
find: new DataLoader((keys) => Promise.resolve(keys)), | |
create: new DataLoader((keys) => Promise.resolve(keys)), | |
update: new DataLoader((keys) => Promise.resolve(keys)), | |
delete: new DataLoader((keys) => Promise.resolve(keys)), | |
activate: new DataLoader((keys) => Promise.resolve(keys)), | |
deactivate: new DataLoader((keys) => Promise.resolve(keys)), | |
}, | |
}, | |
auth0: {}, | |
salesforce: {}, | |
}; | |
const rootMachine = createMachine({ | |
tsTypes: {} as import("./index.typegen").Typegen0, | |
id: "root", | |
initial: "init", | |
predictableActionArguments: true, | |
schema: {} as { | |
context: { | |
index: number; | |
mode: Mode; | |
user: AuthUser; | |
input: BatchRecord; | |
record?: AppRecord; | |
dataLoaders: typeof loaders; | |
}; | |
}, | |
states: { | |
init: { | |
invoke: { | |
src: async (context) => { | |
// determine action to take based on context | |
// if the record is not in the DB, then return "create" | |
// if the record is in the DB and the control instruction is to update the record, then return "update" | |
// if the record is in the DB and the control instruction is to delete the record, then return "delete" | |
// if record needs to go from unprovisioned to provisioned, then return "provision" | |
// if record needs to go from provisioned to unprovisioned, then return "de-provision" | |
const record = await context.dataLoaders.db.user.find.load( | |
context.input.email, | |
); | |
const params = { | |
module: "db", | |
index: context.index, | |
} as const; | |
switch (true) { | |
// the record is not in the DB | |
case !record: | |
return Promise.resolve<GetDBActionDoneInvokeEventData>({ | |
...params, | |
action: "CREATE", | |
}); | |
// the record is in the DB but the active flags don't match | |
case context.input.active !== record?.active: | |
return Promise.resolve<GetDBActionDoneInvokeEventData>({ | |
...params, | |
action: "UPDATE", | |
}); | |
default: | |
return Promise.resolve<GetDBActionDoneInvokeEventData>({ | |
...params, | |
action: "NOTHING", | |
}); | |
} | |
}, | |
onDone: [ | |
{ | |
cond: (c, e: GetDBActionDoneInvokeEvent) => c.mode === "DryRun", | |
target: "provisioning", | |
}, | |
{ | |
cond: (c, e: GetDBActionDoneInvokeEvent) => | |
e.data.action === "CREATE", | |
target: "creatingDBRecord", | |
}, | |
{ | |
cond: (c, e: GetDBActionDoneInvokeEvent) => | |
e.data.action === "UPDATE", | |
target: "updatingDBRecord", | |
}, | |
{ | |
cond: (c, e: GetDBActionDoneInvokeEvent) => | |
e.data.action === "DELETE", | |
target: "deletingDBRecord", | |
}, | |
], | |
}, | |
}, | |
creatingDBRecord: { | |
invoke: { | |
src: (context) => { | |
// create the record in the DB | |
return context.dataLoaders.db.user.create.load({}); | |
}, | |
onDone: { | |
target: "provisioning", | |
}, | |
}, | |
}, | |
updatingDBRecord: { | |
invoke: { | |
src: (context) => { | |
// update the record in the DB | |
return context.dataLoaders.db.user.update.load({}); | |
}, | |
onDone: { | |
target: "provisioning", | |
}, | |
}, | |
}, | |
deletingDBRecord: { | |
invoke: { | |
src: (context) => { | |
// delete the record in the DB | |
return context.dataLoaders.db.user.delete.load({}); | |
}, | |
onDone: { | |
target: "provisioning", | |
}, | |
}, | |
}, | |
provisioning: { | |
type: "parallel", | |
states: { | |
salesforce: { | |
initial: "checking", | |
states: { | |
checking: { | |
invoke: { | |
src: (context) => { | |
// check if the record exists in salesforce | |
return Promise.resolve<GetSalesforceActionDoneInvokeEventData>( | |
{ | |
module: "salesforce", | |
index: context.index, | |
action: "CREATE_CONTACT", | |
}, | |
); | |
}, | |
onDone: [ | |
{ | |
cond: (c) => c.mode === "DryRun", | |
target: "success", | |
}, | |
{ | |
cond: (c, e: GetSalesforceActionDoneInvokeEvent) => | |
e.data.action === "CREATE_CONTACT", | |
target: "creatingContact", | |
}, | |
{ | |
cond: (c, e: GetSalesforceActionDoneInvokeEvent) => | |
e.data.action === "UPDATE_CONTACT", | |
target: "updatingContact", | |
}, | |
{ | |
cond: (c, e: GetSalesforceActionDoneInvokeEvent) => | |
e.data.action === "DELETE_CONTACT", | |
target: "deletingContact", | |
}, | |
{ | |
cond: (c, e: GetSalesforceActionDoneInvokeEvent) => | |
e.data.action === "ACTIVATE_CONTACT", | |
target: "activatingContact", | |
}, | |
{ | |
cond: (c, e: GetSalesforceActionDoneInvokeEvent) => | |
e.data.action === "DEACTIVATE_CONTACT", | |
target: "deactivatingContact", | |
}, | |
], | |
}, | |
}, | |
creatingContact: { | |
invoke: { | |
src: (context) => { | |
// create the record in salesforce | |
return Promise.resolve(); | |
}, | |
onDone: { | |
target: "success", | |
}, | |
}, | |
}, | |
updatingContact: { | |
invoke: { | |
src: (context) => { | |
// update the record in salesforce | |
return Promise.resolve(); | |
}, | |
onDone: { | |
target: "success", | |
}, | |
}, | |
}, | |
activatingContact: { | |
invoke: { | |
src: (context) => { | |
// activate the record in salesforce | |
return Promise.resolve(); | |
}, | |
onDone: { | |
target: "success", | |
}, | |
}, | |
}, | |
deactivatingContact: { | |
invoke: { | |
src: (context) => { | |
// deactivate the record in salesforce | |
return Promise.resolve(); | |
}, | |
onDone: { | |
target: "success", | |
}, | |
}, | |
}, | |
deletingContact: { | |
invoke: { | |
src: (context) => { | |
// delete the record in salesforce | |
return Promise.resolve(); | |
}, | |
onDone: { | |
target: "success", | |
}, | |
}, | |
}, | |
success: {}, | |
}, | |
}, | |
activeCampaign: { | |
initial: "checking", | |
states: { | |
checking: { | |
invoke: { | |
src: async (context) => { | |
// check if the record exists in activeCampaign | |
const record = | |
await context.dataLoaders.db.activeCampaign.contact.find.load( | |
context.input.email, | |
); | |
const params = { | |
module: "activeCampaign", | |
index: context.index, | |
} as const; | |
switch (true) { | |
// the record is not in the DB | |
case !record: | |
return Promise.resolve<GetActiveCampaignActionDoneInvokeEventData>( | |
{ | |
...params, | |
action: "CREATE_CONTACT", | |
}, | |
); | |
// the record is in the DB but the active flags don't match | |
case context.input.active !== context.record?.active: | |
return Promise.resolve<GetActiveCampaignActionDoneInvokeEventData>( | |
{ | |
...params, | |
action: "UPDATE_CONTACT", | |
}, | |
); | |
default: | |
return Promise.resolve<GetActiveCampaignActionDoneInvokeEventData>( | |
{ | |
...params, | |
action: "NOTHING", | |
}, | |
); | |
} | |
}, | |
onDone: [ | |
{ | |
cond: (c) => c.mode === "DryRun", | |
target: "success", | |
}, | |
{ | |
cond: (_, e: GetActiveCampaignActionDoneInvokeEvent) => | |
e.data.action === "CREATE_CONTACT", | |
target: "creatingContact", | |
}, | |
{ | |
cond: (_, e: GetActiveCampaignActionDoneInvokeEvent) => | |
e.data.action === "UPDATE_CONTACT", | |
target: "updatingContact", | |
}, | |
{ | |
cond: (_, e: GetActiveCampaignActionDoneInvokeEvent) => | |
e.data.action === "DELETE_CONTACT", | |
target: "deletingContact", | |
}, | |
{ | |
cond: (_, e: GetActiveCampaignActionDoneInvokeEvent) => | |
e.data.action === "ACTIVATE_CONTACT", | |
target: "activatingContact", | |
}, | |
{ | |
cond: (_, e: GetActiveCampaignActionDoneInvokeEvent) => | |
e.data.action === "DEACTIVATE_CONTACT", | |
target: "deactivatingContact", | |
}, | |
], | |
}, | |
}, | |
creatingContact: {}, | |
updatingContact: {}, | |
activatingContact: {}, | |
deactivatingContact: {}, | |
deletingContact: {}, | |
success: {}, | |
}, | |
}, | |
auth0: { | |
initial: "checking", | |
states: { | |
checking: { | |
invoke: { | |
src: (context) => { | |
// check if the record exists in auth0 | |
return Promise.resolve<GetAuth0ActionDoneInvokeEventData>({ | |
module: "auth0", | |
index: context.index, | |
action: "CREATE_USER", | |
}); | |
}, | |
onDone: [ | |
{ | |
cond: (c) => c.mode === "DryRun", | |
target: "success", | |
}, | |
], | |
}, | |
}, | |
creatingUser: {}, | |
updatingUser: {}, | |
activatingUser: {}, | |
deactivatingUser: {}, | |
deletingUser: {}, | |
success: {}, | |
}, | |
}, | |
}, | |
}, | |
}, | |
}); | |
// Runs the machine with the interpreter for a single record | |
// interpret( | |
// rootMachine.withContext({ | |
// index: 0, | |
// mode: "DryRun", | |
// user: { | |
// id: "123", | |
// }, | |
// record: { | |
// id: "123", | |
// }, | |
// dataLoaders: loaders, | |
// }), | |
// ).start(); | |
async function run() { | |
const batchSize = 50; | |
const records: BatchRecord[] = [ | |
{ email: "1@test.com", active: true }, | |
{ email: "2@test.com", active: true }, | |
]; | |
const actions: ServiceActionDoneInvokeEventData[][] = []; | |
const pooledMachines = new Array(Math.min(batchSize, records.length)) | |
.fill(0) | |
.map(() => { | |
const interpreter = interpret(rootMachine, { | |
deferEvents: false, | |
devTools: false, | |
}); | |
interpreter.onTransition((state, event) => { | |
if ("data" in event) { | |
const actionEvent = event as | |
| GetDBActionDoneInvokeEvent | |
| GetActiveCampaignActionDoneInvokeEvent | |
| GetAuth0ActionDoneInvokeEvent | |
| GetSalesforceActionDoneInvokeEvent; | |
if (!actions[actionEvent.data.index]) { | |
actions[actionEvent.data.index] = []; | |
} | |
console.log(actionEvent.data); | |
actions[actionEvent.data.index].push(actionEvent.data); | |
} | |
}); | |
return interpreter; | |
}); | |
let i = 0; | |
for await (const batch of chunk(records, batchSize)) { | |
const runPromises = pooledMachines.map((m) => { | |
m.start({ | |
...m.initialState, | |
context: { | |
index: i, | |
dataLoaders: loaders, | |
mode: "DryRun", | |
user: { | |
id: "123", | |
}, | |
input: batch[i], | |
}, | |
}); | |
i++; | |
}); | |
await Promise.all(runPromises); | |
} | |
} | |
run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment