Skip to content

Instantly share code, notes, and snippets.

@vkarpov15
Last active May 16, 2022 20:26
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vkarpov15/456ef1babb840cd49b4a7b79b29299a4 to your computer and use it in GitHub Desktop.
Save vkarpov15/456ef1babb840cd49b4a7b79b29299a4 to your computer and use it in GitHub Desktop.
Caching API Requests With Long-Lived Workflows in Temporal

There is no time limit on Temporal Workflow Executions. You can write a Workflow that runs forever, storing some state and responding to Signals and Queries that come in, as long as you remember to Continue As New. One neat use case for long-lived Workflows is caching API requests.

For example, suppose you want to display prices in different currencies based on cached exchange rates. Exchange rate APIs are often expensive for high volumes of requests, so caching can be worthwhile as long as stale data isn't a problem for your use case. You can create a single Temporal Workflow that makes 1 API request per day to get the latest exchange rates for a set of currencies, and stores the most recent result, with no explicit database calls or cron jobs. In this blog post, I'll describe how you can write an API caching Workflow for the moneyconvert.net API with the Temporal TypeScript SDK. You can find the full source code for this example on GitHub.

Getting Started

When you make an HTTP GET request to https://cdn.moneyconvert.net/api/latest.json, the API returns JSON that looks like what you see below.

{
  "table": "latest",
  "rates": {
    "AED": 3.6731,
    "AFN": 87.249991,
    "ALL": 114.227196,
    "AMD": 473.915796,
    ...
  },
  "lastupdate": "2022-05-11T00:46:04.483000+00:00"
}

Because Temporal Workflows must be deterministic, you can't make an API request directly from a Workflow. You need to create an Activity that makes an API request, and call that Activity from your Workflow. Below is an activities.ts file that makes an HTTP request to the above API endpoint, and returns the exchange rates.

import axios from 'axios';

export async function getExchangeRates(): Promise<any> {
  const res = await axios.get('https://cdn.moneyconvert.net/api/latest.json');
  return res.data.rates;
};

Next, let's write a Workflow that calls this Activity once per day. With Temporal Workflows, you can simply write a while (true) {} loop with a JavaScript sleep that pauses the Workflow until the next time you need to refresh exchange rates. Writing the Workflow this way may seem counterintuitive, because we've all had to learn over the course of our careers that writing applications isn't this easy. But, with Temporal, it actually is this easy!

import { defineQuery, proxyActivities, setHandler } from '@temporalio/workflow';
import type * as activities from './activities';

const { getExchangeRates } = proxyActivities<typeof activities>({
  startToCloseTimeout: '1 minute',
});

export const getExchangeRatesQuery = defineQuery<any, [string]>('getExchangeRates');

export async function exchangeRatesWorkflow(): Promise<any> {
  let rates: any = null;

  // Register a query handler that allows querying for the current rates
  setHandler(getExchangeRatesQuery, () => rates);

  while (true) {
    // Get the latest rates
    rates = await getExchangeRates();

    // Sleep until tomorrow at 12pm server time, and then get the rates again
    const today = new Date();
    const tomorrow = new Date(today);
    tomorrow.setHours(12, 0, 0, 0);
    tomorrow.setDate(tomorrow.getDate() + 1);
    // @ts-ignore
    await new Promise(resolve => setTimeout(resolve, tomorrow - today));
  }
}

That's the entire Workflow for caching exchange rates. The full source code is available in this GitHub repo. Notice there's no explicit references to a database or job queue. This Workflow is almost pure business logic, with a minimum of references to frameworks or services.

To run this Workflow, you can run a start-workflow.ts script as shown below. This script starts a Workflow and exits, leaving the Workflow running on the Worker. Note that only one Workflow with a given workflowId can run at any given time, so the below code also ensures that only one copy of this Workflow is running at any given time.

import { WorkflowClient } from '@temporalio/client';
import { exchangeRatesWorkflow } from '../workflows';

run().catch((err) => {
  console.error(err);
  process.exit(1);
});

async function run() {
  const client = new WorkflowClient();

  const handle = await client.start(exchangeRatesWorkflow, {
    taskQueue: 'exchange-rates',
    workflowId: 'exchange-rates-workflow',
  });
  console.log(`Started workflow ${handle.workflowId}`);
}

Note that there is one key detail that this Workflow is missing: a Continue As New. There's more about that later in this blog post.

Storing Historical Data

You can do more than just store the latest exchange rates. You can also store previous exchange rates. For example, suppose you want to store up to 30 days worth of historical exchange rates. You can store the rates in an in-memory JavaScript map in your Workflow as shown below.

const maxNumRates = 30;

export async function exchangeRatesWorkflow(): Promise<any> {
  const ratesByDay = new Map<string, any>();

  // Allow querying exchange rates by day
  setHandler(getExchangeRatesQuery, (date: string) => ratesByDay.get(date));

  while (true) {
    const exchangeRates = await getExchangeRates();
    const today = new Date();
    // Store today's exchange rates
    ratesByDay.set(toYYYYMMDD(today), exchangeRates);
    console.log(toYYYYMMDD(today), exchangeRates);

    // Delete the oldest key if we have more than 30 entries
    const keys = Array.from(ratesByDay.keys());
    if (keys.length > maxNumRates) {
      ratesByDay.delete(keys[0]);
    }

    // Wait until tomorrow at 12pm to refresh the exchange rates
    const tomorrow = new Date(today);
    tomorrow.setHours(12, 0, 0, 0);
    tomorrow.setDate(tomorrow.getDate() + 1);
    // @ts-ignore
    await new Promise(resolve => setTimeout(resolve, tomorrow - today));
  }
}

Temporal makes ratesByDay durable, even though ratesByDay is just a normal JavaScript variable. That's because Temporal stores the entire history of events for this Workflow. If the machine running exchangeRatesWorkflow() crashes, Temporal can resume the Workflow on another machine by replaying the entire event history.

Continue As New

The exchangeRatesWorkflow can run for an unlimited period of time: days, months, even years. However, Temporal caps a Workflow at 50,000 events. In the exchangeRatesWorkflow, there are 4 events fired at every iteration of the while loop, assuming no API errors.

  1. EVENT_TYPE_TIMER_FIRED: the setTimeout() resolved and it's time to refresh the exchange rates
  2. EVENT_TYPE_ACTIVITY_TASK_STARTED: the getExchangeRates() activity started executing
  3. EVENT_TYPE_ACTIVITY_TASK_COMPLETED: the getExchangeRates() activity completed successfully
  4. EVENT_TYPE_TIMER_STARTED: the Workflow used setTimeout() to pause until tomorrow

With 1 API request per day, the exchangeRatesWorkflow() can run for almost 12,500 days (approximately 34 years) before running into the 50,000 event limit. However, you should still handle this limit. And that's what Continue As New is for.

You can think of Continue As New as restarting your Workflow with from an initial state. The only data that exchangeRatesWorkflow() needs to respond to queries is the ratesByDay map, so exchangeRatesWorkflow() needs to Continue As New with a serialized version of the ratesByDay map. The exchangeRatesWorkflow() also needs to be able to resume from a previous state. Continue As New just calls exchangeRatesWorkflow() with an initial state. Below is

const maxNumRates = 30;
const maxIterations = 10000;

// `storedRatesByDay` contains the serialized data from Continue As New, if available. Otherwise, just an
// empty array.
export async function exchangeRatesWorkflow(storedRatesByDay: Array<[string, any]> = []): Promise<any> {
  const ratesByDay = new Map<string, any>(storedRatesByDay);
  setHandler(getExchangeRatesQuery, (date: string) => ratesByDay.get(date));

  // Max out at 10k iterations so we don't get too close to the 50k event limit
  for (let i = 0; i < maxIterations; ++i) {
    const exchangeRates = await getExchangeRates();
    const today = new Date();
    ratesByDay.set(toYYYYMMDD(today), exchangeRates);

    tomorrow.setHours(12, 0, 0, 0);
    tomorrow.setDate(tomorrow.getDate() + 1);
    // @ts-ignore
    await sleep(tomorrow - today);
  }

  // After 10k iterations, trigger a Continue As New and finish the Workflow
  await continueAsNew<typeof exchangeRatesWorkflow>(Array.from(ratesByDay.entries()));
}

Moving On

Temporal Workflows make it easy to build stateful caching layers for APIs. With Temporal, you just write the business logic. Temporal handles persisting the data, ensuring only one copy of the Workflow is running, and allowing clients to query the cached API data. Next time you need to build a tool that pings an API on a regular schedule, try building it with Temporal.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment