Skip to content

Instantly share code, notes, and snippets.

@olivermrbl
Last active November 2, 2023 15:11
Show Gist options
  • Save olivermrbl/a998ae21515ac42859afdbcbe027e167 to your computer and use it in GitHub Desktop.
Save olivermrbl/a998ae21515ac42859afdbcbe027e167 to your computer and use it in GitHub Desktop.
Proposal: Workflow SDK

Workflow SDK

An SDK on top of our existing workflow and orchestration tooling, simplifying and improving the developer experience of managing workflows.

Basic Example

In the example below, we define a simple workflow with two steps.

import { createWorkflow, createWorkflowHandler, createTransformer } from "@medusajs/workflows"

const myHandler1 = createWorkflowHandler("first", myHandler1Invoke, myHandler1Compensate)
const myHandler2 = createWorkflowHandler("second", myHandler2Invoke, { saveResponse: false })

const myWorkflow = createWorkflow("myWorkflow", async ({ data, container, context }) => {
  const first = await myHandler1(data)
  await myHandler2(first)
})

const input = { data: "someData" }
const container = {}
const context = { manager: "manager" }

myWorkflow.run(input, container, context)

This example illustrates a few key points:

  • Little to no boilerplate code is needed
  • Workflows are created using simple JavaScript
  • Data can be passed from one function to another as you would normally do

Obviously, this is a very simple example. However, it should be clear that the SDK will eliminate a significant amount of boilerplate code and provide a more intuitive API to manage workflows.

Workflow SDK

Reference

New APIs in the proposal

  • createWorkflow: Create a workflow and build its step definition
  • createWorkflowStep: Create a workflow step
  • replaceAction: Replace a workflow step

Creating and building a workflow

This proposal comes with a new createWorkflow util:

type MyWorkflowInput = {}
type MyWorkflowOutput = {}

const myWorkflow = createWorkflow<MyWorkflowInput, MyWorkflowOutput>(
  "myWorkflow", 
  async ({ data, container, context }) => {
    const first = await myHandler1(data)
    await myHandler2(first)
  }
)

The createWorkflow util is responsible for creating the workflow and building its step definition.

In the function body, we scan for handlers created using the createWorkflowStep util and register them as steps in the definition.

Additionally in this "registration step", data can be passed between handlers. The functionality is similar to that of the pipe function in existing workflows.

In the first iteration, the registration step is limited to the extend that it only scans for handlers, so all other logic in the function is ignored and without effect. However, one could imagine we could eventually support any type of JavaScript code, for example if-then statements. See Discussion at the bottom of the proposal.

Under the hood, the createWorkflow util creates all necessary boilerplate code:

export const myWorkflowName = "myWorkflow"

export const steps: TransactionStepsDefinition = {}

export const handlers = new Map([])

WorkflowManager.register(myWorkflowName, steps, handlers)

export const myWorkflow = exportWorkflow(myWorkflowName)

Creating a workflow step

After having created the workflow, you add workflow steps using a new util createWorkflowStep:

import { createWorkflowStep, createWorkflow } from "@medusajs/workflows"

type Handler1Input = {}
type Handler1Output = {}

const myHandler1 = createWorkflowStep<Handler1Input, Handler1Output>(
  "first", 
  myHandler1Invoke, 
  myHandler1Compensate
)

const myWorkflow = createWorkflow("myWorkflow", async ({ data, context }) => {
  const first = await myHandler1(data)
})

As seen in the snippet above, the util accepts the following:

  • first -> workflow step name
  • myHandler1Invoke -> invoke function of the step
  • myHandler1Compensate -> compensate function of the step

An alternative method signature of the util could look like:

const myHandler1 = createWorkflowStep<Handler1Input, Handler1Output>("first", {
  invoke: myHandler1Invoke, 
  compensating: myHandler1Compensate,
  onComplete: () => console.log("hello world")
})

The alternative would allow for more options without having to juggle correctly positioned method arguments.

Under the hood, the createWorkflowStep will extend the workflow definition and handlers as follows:

export const steps: TransactionStepsDefinition = {
  next: {
    action: "first"
  }
}

export const handlers = new Map(
  [
    "first",
    {
      invoke: myHandler1Invoke,
      compensate: myHandler2Compensate
    }
  ]
)

In the example above, the pipe function is left out for the sake of simplicity.

Adding another workflow step

type Handler2Input = {}
type Handler2Output = {}

const myHandler2 = createWorkflowStep<Handler2Input, Handler2Output>(
  "second", 
  myHandler2Invoke, 
  { maxRetries: 3, retryInterval: 1000 }
)

const myWorkflow = createWorkflow("myWorkflow", async ({ data, container, context }) => {
  const first = await myHandler1(data)
  await myHandler2(first)
})

Under the hood

export const steps: TransactionStepsDefinition = {
  next: {
    action: "first",
    next: {
      action: "second",
      noCompensation: true, // handler was created without a compensating action
      maxRetries: 3,
      retryInterval: 1000
    }
  }
}

export const handlers = new Map([
  [
    "first",
    {
      invoke: myHandler1Invoke,
      compensate: myHandler2Compensate
    }
  ],
  [
    "second",
    {
      invoke: myHandler2Invoke,
    }
  ],
])

Creating a workflow transformative step

To transform data between workflow steps, we will use the same createWorkflowHandler utility to create a new step in the workflow definition. The goal is to make the SDK so intuitive and simple so that you only ever need to understand the concept of a step.

Until now, using steps for transformation, validation, and core handler logic would result in an unmaintainably long definition. To solve this, we chose to allow for multiple handlers in a single step composed using the pipe function. This is all good and fine.

However, abstracting the handlers definitions and providing a simple API allows us to use steps for all business logic without having to worry about maintainability, readability, etc.

Overall, this will make workflows more composable.

const myTransformer = createWorkflowStep("myTransformer", myTransformer)

const myWorkflow = createWorkflow("myWorkflow", async ({ data, container, context }) => {
  const first = await myHandler1(data)
  const transformedData = await myTransformer(first)
  await myHandler2(first)
})

Under the hood

export const steps: TransactionStepsDefinition = {
  next: {
    action: "first",
    next: {
      action: "myTransformer",
      next: {
       action: "second",
        noCompensation: true, // handler was created without a compensating action
        maxRetries: 3,
      }
    },
  }
}

export const handlers = new Map([
  [
    "first",
    {
      invoke: myHandler1Invoke,
      compensate: myHandler2Compensate
    }
  ],
  [
    "myTransformer",
    {
      invoke: myTransformer,
    }
  ],
  [
    "second",
    {
      invoke: myHandler2Invoke,
    }
  ],
])

Extending a workflow

To extend a workflow, you will use a combination of the already presented createWorkflowStep util and the existing appendAction method of the WorkflowManager.

const myHandler3 = createWorkflowStep("third", myHandler3Invoke, { saveResponse: false })

myWorkflow.appendAction("third", "second", myHandler3)

Under the hood

export const steps: TransactionStepsDefinition = {
  next: {
    action: "first",
    next: {
      action: "myTransformer",
      next: {
       action: "second",
        noCompensation: true, // handler was created without a compensating action
        maxRetries: 3,
        next: {
          action: "third",
          noCompensation: true, // handler was created without a compensating action
          saveResponse: false,
        }
      }
    },
  }
}

export const handlers = new Map([
  [
    "first",
    {
      invoke: myHandler1Invoke,
      compensate: myHandler2Compensate
    }
  ],
  [
    "myTransformer",
    {
      invoke: myTransformer,
    }
  ],
  [
    "second",
    {
      invoke: myHandler2Invoke,
    }
  ],
  [
    "third",
    {
      invoke: myHandler3Invoke,
    }
  ],
])

Replacing a step

To replace a step in the workflow, you'll again use the util createWorkflowStep in combination with the replaceAction of the WorkflowManager.

You cannot use appendAction to replace a step.

Incorrect

const myNewHandler2 = createWorkflowStep("second", myHandler2Invoke)
myWorkflow.appendAction("first", "second", myNewHandler2)

Correct

const myNewHandler2 = createWorkflowHandler("second", myHandler2Invoke)
myWorkflow.replaceAction("second", myNewHandler2)

Proof of Concept

Discussion

Can we support all JavaScript, e.g. if-else, in the workflow build step?

const myWorkflow = createWorkflow("myWorkflow", async ({ data, container, context }) => {
  const first = await myHandler1(data)
  
  if (!first.someArray.length) {
    someHandler()
  } else {
    await myHandler2(first)
  }
})

Unrelated to the Workflow SDK

  • Where do we place custom workflows in a Medusa project? Following our established patterns, one solution would be to use the file system with a /workflows directory and add a related loader.
  • Where do we place workflow overrides in a Medusa project? Following our established patterns, one solution would be to use the file system with a /workflows directory and add a related loader.
  • Where do we place workflow modifications in a Medusa project? This is different from a loading mechanism, as suggested above, so we might need to come up with a new pattern.
@adrien2p
Copy link

Overhaul It looks very nice and simple, of course, as mention in the gist, some configurations etc are left aside for simplicity.
I mentionned to seb that it would be nice to have a skip option which would be used t skip a step based on more global rules, in opposition to the if-then, both are valid and I think both could be available. My thinking is more about the finale workflow representation, let say in the UI. having a skip based on global things such as feature flag would allow us to display to the user what steps will run and which one will be skipped. When it comes to if-then it really depends on the data and we can't give the information the user before we get to the data and in that case it is managed by the step handler. Though, if the if-then was extracted somewhere else we could display to the user what are the condition for a step to run one handler or another. I would say that in that case both handlers in the step example should be part of two different step that run under certain condition. It would allow us to display the execution constraint to the user instead of having them hidden in the step hanndler upon creation. For example, if a user want to extend a workflow in a visual fashion we would be able to show all the necessary information.

About workflow extension, one thing that we will have to tackle is that the user has no idea if the workflow has already been extended or not when adding a new step. Therefor the complexity could be that the user add a new step but the step end up not in the right place if it is extended elsewhere during development, have you thought about something to resolve that? or maybe this is not something to resolve and the user just need to read the documentation of what he is installing.

@fPolic
Copy link

fPolic commented Oct 11, 2023

Looks very elegant, love the transformer concept!
I've noticed that we often have a case where the compensate handler of a step needs different data than the invoke. How would we write createWorkflow in that case: e.g.

const myWorkflow = createWorkflow("myWorkflow", async ({ data, container, context }) => {
  const first = await myHandler1(data)
  const second = await myHandler2(data)
  await myHandler3(first) // -> but the compensate of this handler needs `second` data, data from its invoke or some other data; how do we pass that?
})

@olivermrbl
Copy link
Author

olivermrbl commented Nov 2, 2023

Idea for compensating actions (stolen from Temporal).

I am just riffing in writing here, so that I don't forget the ideas

Single compensating action:

const workflowWithCompensatingAction = createWorkflow(
  "Checkout",
  async ({ data, container, context }) => {
    const handler1Result = await myHandler1(data)

    try {
      const secondResult = await myHandler2(handler1Result)

      return secondResult // return value of the entire workflow
    } catch (error) {
      await myHandler2Compensate(handler1Result)
    }
  }
)

Multiple compensating actions:

const workflowWithManyCompensatingActions = createWorkflow(
  "Checkout",
  async ({ data, container, context }) => {

    let handler1Result
    try {
      handler1Result = await myHandler1(data)
    } catch (error) {
      await myHandler1Compensate(data)
    }

    try {
      const secondResult = await myHandler2(handler1Result)

      return secondResult // return value of the entire workflow
    } catch (error) {
      await myHandler2Compensate(handler1Result)
    }
  }
)

@olivermrbl
Copy link
Author

olivermrbl commented Nov 2, 2023

Parallel actions:

const workflowWithParallelActions = createWorkflow(
  "Checkout",
  async ({ data, container, context }) => {

    await Promise.all([
      myHandler1(data),
      myHandler2(data)
    ])
  }
)

@olivermrbl
Copy link
Author

olivermrbl commented Nov 2, 2023

Idea: Query-builer approach

const checkout = createWorkflow("CheckoutFlow")

checkout.addStep("authorize-payment", {
  execute: myHandler1,
  compensate: myHandler1Compensate
})

checkout.addParallelSteps([
  {
    name: "calculate-risk-score",
    execute: myHandler1,
    compensate: myHandler1Compensate
  },
  {
    name: "calculate-totals",
    execute: myHandler2,
    compensate: myHandler2Compensate
  }
])

@olivermrbl
Copy link
Author

olivermrbl commented Nov 2, 2023

API Reference with a "query-builder approach"

// Creating a workflow
const checkout = createWorkflow("CheckoutFlow")

// Adding a step
checkout.addStep({
  name: "myHandler1",
  execute: myHandler1,
})

// Adding a step with a compensating action
checkout.addStep({
  name: "myHandler2",
  execute: myHandler2,
  compensate: myHandler2Compensate
})

// Adding a transformative step
// This is no different from a regular step. With an exceptional DX, this should not be an issue
checkout.addStep({
  name: "transformer",
  execute: transformer,
  input: "myHandler2",
})

// Adding a step pulling data from a previous step
checkout.addStep({
  name: "myHandler2",
  execute: myHandler2,
  compensate: myHandler2Compensate,
  input: "transformer",
  inputAlias: "data"
})



// Adding a step pulling data from multiple previous step
checkout.addStep({
  name: "myHandler2",
  execute: myHandler2,
  compensate: myHandler2Compensate,
  input: ["myHandler1", "someOtherHandler"],
  inputAlias: "data"
})

// Adding a step with a compensating action pulling data from a previous step
checkout.addStep({
  name: "myHandler2",
  execute: myHandler2,
  compensate: myHandler2Compensate,
  compensationInput: "myHandler1",
  compensationInputAlias: "data"
})

// Adding parallel steps
checkout.addParallelSteps([
  {
    execute: myHandler1,
    compensate: myHandler1Compensate
  },
  {
    execute: myHandler2,
    compensate: myHandler2Compensate
  }
])

// Running a workflow
await checkout.run({
  input: {}
})

// Appending a step
checkout.appendStep("myHandler1", {
  name: "myHandler2",
  execute: myHandler2,
  input: "myHandler1",
  inputAlias: "data"
})

// Prepending a step
checkout.prependStep("myHandler1", {
  name: "myHandler2",
  execute: myHandler2,
  input: "myHandler1",
  inputAlias: "data"
})

// Replacing a step
checkout.replace("myHandler1", {
  name: "myReplacement",
  execute: myReplacement,
})

// Building a full definition
const checkout = createWorkflow("CheckoutFlow")

checkout
  .addStep({
    name: "myHandler1",
    execute: myHandler1,
  })
  .addParallelSteps([
    {
      name: "myHandler2",
      execute: myHandler2,
    },
    {
      name: "myHandler3",
      execute: myHandler3,
    },
  ])
  .addStep({
    name: "transformer",
    execute: transformer,
    input: "myHandler1",
  })
  .addStep({
    name: "myHandler4",
    execute: myHandler4,
    input: ["myHandler2", "transformer"],
    inputAlias: "data",
    compensate: myHandler4Compensate,
    compensationInput: ["myHandler1", "myHandler2", "myHandler3"],
    compensationInputAlias: "data",
  })
  .addStep({
    name: "prepareResponse",
    execute: prepareResponse,
  })

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment