Skip to content

Instantly share code, notes, and snippets.

@olivermrbl
Last active November 2, 2023 15:11
Show Gist options
  • Save olivermrbl/a998ae21515ac42859afdbcbe027e167 to your computer and use it in GitHub Desktop.
Save olivermrbl/a998ae21515ac42859afdbcbe027e167 to your computer and use it in GitHub Desktop.
Proposal: Workflow SDK

Workflow SDK

An SDK on top of our existing workflow and orchestration tooling, simplifying and improving the developer experience of managing workflows.

Basic Example

In the example below, we define a simple workflow with two steps.

import { createWorkflow, createWorkflowHandler, createTransformer } from "@medusajs/workflows"

const myHandler1 = createWorkflowHandler("first", myHandler1Invoke, myHandler1Compensate)
const myHandler2 = createWorkflowHandler("second", myHandler2Invoke, { saveResponse: false })

const myWorkflow = createWorkflow("myWorkflow", async ({ data, container, context }) => {
  const first = await myHandler1(data)
  await myHandler2(first)
})

const input = { data: "someData" }
const container = {}
const context = { manager: "manager" }

myWorkflow.run(input, container, context)

This example illustrates a few key points:

  • Little to no boilerplate code is needed
  • Workflows are created using simple JavaScript
  • Data can be passed from one function to another as you would normally do

Obviously, this is a very simple example. However, it should be clear that the SDK will eliminate a significant amount of boilerplate code and provide a more intuitive API to manage workflows.

Workflow SDK

Reference

New APIs in the proposal

  • createWorkflow: Create a workflow and build its step definition
  • createWorkflowStep: Create a workflow step
  • replaceAction: Replace a workflow step

Creating and building a workflow

This proposal comes with a new createWorkflow util:

type MyWorkflowInput = {}
type MyWorkflowOutput = {}

const myWorkflow = createWorkflow<MyWorkflowInput, MyWorkflowOutput>(
  "myWorkflow", 
  async ({ data, container, context }) => {
    const first = await myHandler1(data)
    await myHandler2(first)
  }
)

The createWorkflow util is responsible for creating the workflow and building its step definition.

In the function body, we scan for handlers created using the createWorkflowStep util and register them as steps in the definition.

Additionally in this "registration step", data can be passed between handlers. The functionality is similar to that of the pipe function in existing workflows.

In the first iteration, the registration step is limited to the extend that it only scans for handlers, so all other logic in the function is ignored and without effect. However, one could imagine we could eventually support any type of JavaScript code, for example if-then statements. See Discussion at the bottom of the proposal.

Under the hood, the createWorkflow util creates all necessary boilerplate code:

export const myWorkflowName = "myWorkflow"

export const steps: TransactionStepsDefinition = {}

export const handlers = new Map([])

WorkflowManager.register(myWorkflowName, steps, handlers)

export const myWorkflow = exportWorkflow(myWorkflowName)

Creating a workflow step

After having created the workflow, you add workflow steps using a new util createWorkflowStep:

import { createWorkflowStep, createWorkflow } from "@medusajs/workflows"

type Handler1Input = {}
type Handler1Output = {}

const myHandler1 = createWorkflowStep<Handler1Input, Handler1Output>(
  "first", 
  myHandler1Invoke, 
  myHandler1Compensate
)

const myWorkflow = createWorkflow("myWorkflow", async ({ data, context }) => {
  const first = await myHandler1(data)
})

As seen in the snippet above, the util accepts the following:

  • first -> workflow step name
  • myHandler1Invoke -> invoke function of the step
  • myHandler1Compensate -> compensate function of the step

An alternative method signature of the util could look like:

const myHandler1 = createWorkflowStep<Handler1Input, Handler1Output>("first", {
  invoke: myHandler1Invoke, 
  compensating: myHandler1Compensate,
  onComplete: () => console.log("hello world")
})

The alternative would allow for more options without having to juggle correctly positioned method arguments.

Under the hood, the createWorkflowStep will extend the workflow definition and handlers as follows:

export const steps: TransactionStepsDefinition = {
  next: {
    action: "first"
  }
}

export const handlers = new Map(
  [
    "first",
    {
      invoke: myHandler1Invoke,
      compensate: myHandler2Compensate
    }
  ]
)

In the example above, the pipe function is left out for the sake of simplicity.

Adding another workflow step

type Handler2Input = {}
type Handler2Output = {}

const myHandler2 = createWorkflowStep<Handler2Input, Handler2Output>(
  "second", 
  myHandler2Invoke, 
  { maxRetries: 3, retryInterval: 1000 }
)

const myWorkflow = createWorkflow("myWorkflow", async ({ data, container, context }) => {
  const first = await myHandler1(data)
  await myHandler2(first)
})

Under the hood

export const steps: TransactionStepsDefinition = {
  next: {
    action: "first",
    next: {
      action: "second",
      noCompensation: true, // handler was created without a compensating action
      maxRetries: 3,
      retryInterval: 1000
    }
  }
}

export const handlers = new Map([
  [
    "first",
    {
      invoke: myHandler1Invoke,
      compensate: myHandler2Compensate
    }
  ],
  [
    "second",
    {
      invoke: myHandler2Invoke,
    }
  ],
])

Creating a workflow transformative step

To transform data between workflow steps, we will use the same createWorkflowHandler utility to create a new step in the workflow definition. The goal is to make the SDK so intuitive and simple so that you only ever need to understand the concept of a step.

Until now, using steps for transformation, validation, and core handler logic would result in an unmaintainably long definition. To solve this, we chose to allow for multiple handlers in a single step composed using the pipe function. This is all good and fine.

However, abstracting the handlers definitions and providing a simple API allows us to use steps for all business logic without having to worry about maintainability, readability, etc.

Overall, this will make workflows more composable.

const myTransformer = createWorkflowStep("myTransformer", myTransformer)

const myWorkflow = createWorkflow("myWorkflow", async ({ data, container, context }) => {
  const first = await myHandler1(data)
  const transformedData = await myTransformer(first)
  await myHandler2(first)
})

Under the hood

export const steps: TransactionStepsDefinition = {
  next: {
    action: "first",
    next: {
      action: "myTransformer",
      next: {
       action: "second",
        noCompensation: true, // handler was created without a compensating action
        maxRetries: 3,
      }
    },
  }
}

export const handlers = new Map([
  [
    "first",
    {
      invoke: myHandler1Invoke,
      compensate: myHandler2Compensate
    }
  ],
  [
    "myTransformer",
    {
      invoke: myTransformer,
    }
  ],
  [
    "second",
    {
      invoke: myHandler2Invoke,
    }
  ],
])

Extending a workflow

To extend a workflow, you will use a combination of the already presented createWorkflowStep util and the existing appendAction method of the WorkflowManager.

const myHandler3 = createWorkflowStep("third", myHandler3Invoke, { saveResponse: false })

myWorkflow.appendAction("third", "second", myHandler3)

Under the hood

export const steps: TransactionStepsDefinition = {
  next: {
    action: "first",
    next: {
      action: "myTransformer",
      next: {
       action: "second",
        noCompensation: true, // handler was created without a compensating action
        maxRetries: 3,
        next: {
          action: "third",
          noCompensation: true, // handler was created without a compensating action
          saveResponse: false,
        }
      }
    },
  }
}

export const handlers = new Map([
  [
    "first",
    {
      invoke: myHandler1Invoke,
      compensate: myHandler2Compensate
    }
  ],
  [
    "myTransformer",
    {
      invoke: myTransformer,
    }
  ],
  [
    "second",
    {
      invoke: myHandler2Invoke,
    }
  ],
  [
    "third",
    {
      invoke: myHandler3Invoke,
    }
  ],
])

Replacing a step

To replace a step in the workflow, you'll again use the util createWorkflowStep in combination with the replaceAction of the WorkflowManager.

You cannot use appendAction to replace a step.

Incorrect

const myNewHandler2 = createWorkflowStep("second", myHandler2Invoke)
myWorkflow.appendAction("first", "second", myNewHandler2)

Correct

const myNewHandler2 = createWorkflowHandler("second", myHandler2Invoke)
myWorkflow.replaceAction("second", myNewHandler2)

Proof of Concept

Discussion

Can we support all JavaScript, e.g. if-else, in the workflow build step?

const myWorkflow = createWorkflow("myWorkflow", async ({ data, container, context }) => {
  const first = await myHandler1(data)
  
  if (!first.someArray.length) {
    someHandler()
  } else {
    await myHandler2(first)
  }
})

Unrelated to the Workflow SDK

  • Where do we place custom workflows in a Medusa project? Following our established patterns, one solution would be to use the file system with a /workflows directory and add a related loader.
  • Where do we place workflow overrides in a Medusa project? Following our established patterns, one solution would be to use the file system with a /workflows directory and add a related loader.
  • Where do we place workflow modifications in a Medusa project? This is different from a loading mechanism, as suggested above, so we might need to come up with a new pattern.
@olivermrbl
Copy link
Author

olivermrbl commented Nov 2, 2023

Idea for compensating actions (stolen from Temporal).

I am just riffing in writing here, so that I don't forget the ideas

Single compensating action:

const workflowWithCompensatingAction = createWorkflow(
  "Checkout",
  async ({ data, container, context }) => {
    const handler1Result = await myHandler1(data)

    try {
      const secondResult = await myHandler2(handler1Result)

      return secondResult // return value of the entire workflow
    } catch (error) {
      await myHandler2Compensate(handler1Result)
    }
  }
)

Multiple compensating actions:

const workflowWithManyCompensatingActions = createWorkflow(
  "Checkout",
  async ({ data, container, context }) => {

    let handler1Result
    try {
      handler1Result = await myHandler1(data)
    } catch (error) {
      await myHandler1Compensate(data)
    }

    try {
      const secondResult = await myHandler2(handler1Result)

      return secondResult // return value of the entire workflow
    } catch (error) {
      await myHandler2Compensate(handler1Result)
    }
  }
)

@olivermrbl
Copy link
Author

olivermrbl commented Nov 2, 2023

Parallel actions:

const workflowWithParallelActions = createWorkflow(
  "Checkout",
  async ({ data, container, context }) => {

    await Promise.all([
      myHandler1(data),
      myHandler2(data)
    ])
  }
)

@olivermrbl
Copy link
Author

olivermrbl commented Nov 2, 2023

Idea: Query-builer approach

const checkout = createWorkflow("CheckoutFlow")

checkout.addStep("authorize-payment", {
  execute: myHandler1,
  compensate: myHandler1Compensate
})

checkout.addParallelSteps([
  {
    name: "calculate-risk-score",
    execute: myHandler1,
    compensate: myHandler1Compensate
  },
  {
    name: "calculate-totals",
    execute: myHandler2,
    compensate: myHandler2Compensate
  }
])

@olivermrbl
Copy link
Author

olivermrbl commented Nov 2, 2023

API Reference with a "query-builder approach"

// Creating a workflow
const checkout = createWorkflow("CheckoutFlow")

// Adding a step
checkout.addStep({
  name: "myHandler1",
  execute: myHandler1,
})

// Adding a step with a compensating action
checkout.addStep({
  name: "myHandler2",
  execute: myHandler2,
  compensate: myHandler2Compensate
})

// Adding a transformative step
// This is no different from a regular step. With an exceptional DX, this should not be an issue
checkout.addStep({
  name: "transformer",
  execute: transformer,
  input: "myHandler2",
})

// Adding a step pulling data from a previous step
checkout.addStep({
  name: "myHandler2",
  execute: myHandler2,
  compensate: myHandler2Compensate,
  input: "transformer",
  inputAlias: "data"
})



// Adding a step pulling data from multiple previous step
checkout.addStep({
  name: "myHandler2",
  execute: myHandler2,
  compensate: myHandler2Compensate,
  input: ["myHandler1", "someOtherHandler"],
  inputAlias: "data"
})

// Adding a step with a compensating action pulling data from a previous step
checkout.addStep({
  name: "myHandler2",
  execute: myHandler2,
  compensate: myHandler2Compensate,
  compensationInput: "myHandler1",
  compensationInputAlias: "data"
})

// Adding parallel steps
checkout.addParallelSteps([
  {
    execute: myHandler1,
    compensate: myHandler1Compensate
  },
  {
    execute: myHandler2,
    compensate: myHandler2Compensate
  }
])

// Running a workflow
await checkout.run({
  input: {}
})

// Appending a step
checkout.appendStep("myHandler1", {
  name: "myHandler2",
  execute: myHandler2,
  input: "myHandler1",
  inputAlias: "data"
})

// Prepending a step
checkout.prependStep("myHandler1", {
  name: "myHandler2",
  execute: myHandler2,
  input: "myHandler1",
  inputAlias: "data"
})

// Replacing a step
checkout.replace("myHandler1", {
  name: "myReplacement",
  execute: myReplacement,
})

// Building a full definition
const checkout = createWorkflow("CheckoutFlow")

checkout
  .addStep({
    name: "myHandler1",
    execute: myHandler1,
  })
  .addParallelSteps([
    {
      name: "myHandler2",
      execute: myHandler2,
    },
    {
      name: "myHandler3",
      execute: myHandler3,
    },
  ])
  .addStep({
    name: "transformer",
    execute: transformer,
    input: "myHandler1",
  })
  .addStep({
    name: "myHandler4",
    execute: myHandler4,
    input: ["myHandler2", "transformer"],
    inputAlias: "data",
    compensate: myHandler4Compensate,
    compensationInput: ["myHandler1", "myHandler2", "myHandler3"],
    compensationInputAlias: "data",
  })
  .addStep({
    name: "prepareResponse",
    execute: prepareResponse,
  })

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment