Skip to content

Instantly share code, notes, and snippets.

@ajbouh
Last active July 31, 2021 22:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ajbouh/fb9037f723a9706f05f9f43d680a19c6 to your computer and use it in GitHub Desktop.
Save ajbouh/fb9037f723a9706f05f9f43d680a19c6 to your computer and use it in GitHub Desktop.
api gist example

Guide to batch forecasting

Introduction

This guide will teach you how to do a batch forecast for every item you have in your database.

How this guide works

The code snippets in this guide are intended to be run interactively as you follow along. The snippets are also interrelated, so outputs are automatically propagated from snippet to snippet, as needed. This is an experimental format that we're eager to receive feedback on. Please send us a note with any thoughts you have.

The Forecast API

The forecast API you'll be using is based on the prophet package.

features
url
apiKey
Batch forecasts

Most forecasts are produced by a API that produces predictions for one timeseries at a time. Since each item has its own timeseries, you'll need a way to call the forecast API once for each item.

Preparing the data and invoking the API can be quite slow if done serially. It would be much faster to do this in large, parallel batches. Ideally you would run these batches in a repeatable and scalable fashion that makes it easy to inspect and analyze after the fact.

The Generic Batch Job API

The Batch Job API works with any tabular data and any GraphQL API. Under the hood it uses an SQLite database to store all configuration, fetched data, query results, and API outputs.

``` card api=batch type=about features ```
url
apiKey

This guide uses the batch job API to prepare your data and manage invocation of the forecasting API.

Each job is operated out of a separate SQLite database. Jobs are created via the createJob mutation.

  Job
  Task

Overview

To keep things fast and easy to debug, you'll use multiple jobs. The first job will just import and prepare the historical data. The second job will use the output of the first and actually do the forecast.

Import raw order data from CSV and aggregate it on a per-item and per-day basis

Since you're working with raw export from your traditional database, you need to do some basic cleanup and aggregation. We'll calculate how many of each item you sold on each date via a single SQL statement.

mutation createOrdersJob(
  $url: String! = "https://gist.github.com/ajbouh/fb9037f723a9706f05f9f43d680a19c6/raw/aec94cd1f3191c7213e346871e615548f5228668/sales.csv"
) @api(id: "batch") {
  createJob(
    input: {
      tasks: [
        {
          fetch: {
            name: "raw_orders"
            format: CSV
            formatOptions: {
              columns: {
                name: "quantity"
                type: INTEGER
              }
            }
            url: $url
          }
        },
        {
          sql: {
            query: """
              CREATE TABLE orders
              AS SELECT
                sum(Quantity) as y,
                item_id,
                ds
              FROM (
                SELECT
                  StockCode as item_id,
                  Quantity,
                  InvoiceDate as ds
                FROM raw_orders
                WHERE
                  Quantity > 0
                ORDER BY
                  ds ASC
              )
              GROUP BY
                item_id,
                ds
              ;
            """
          }
        }
      ]
    }
  ) {
    awaitFinish(timeoutSeconds: 120) {
      id @export(as: "ordersJob")
    }
  }
}

Examine the aggregated data

Using the returned id, you can query the job status.

query ordersJobStatus(
  $job: String! @import(from: "ordersJob")
) @api(id: "batch") {
  job(id: $job) {
    status
    error {
      message
    }
  }
}

We can also wait for the job to be complete.

query ordersJobFinalStatus(
  $job: String! @import(from: "ordersJob")
) @api(id: "batch") {
  job(id: $job) {
    awaitFinish(timeoutSeconds: 120) {
      status
      error {
        message
      }
    }
  }
}

And make adhoc SQL queries

query ordersJobPeekItems(
  $job: String! @import(from: "ordersJob"),
  $itemId: String! = "10002"
) @api(id: "batch") {
  job(id: $job) {
    first_ten_items: query(sql: "SELECT distinct item_id FROM orders LIMIT 10;")
    sales_for_single_item: query(
      sql: "SELECT ds, y FROM orders WHERE item_id = ? AND y > 0 LIMIT 10;"
      parameters: [$itemId]
      style: ROW_OBJECTS
    ) @export(as: "item_id_10_sales")
  }
}

At this point you might want to try making a forecast API with your item data.

query forecastSingleItem(
  $records: [PastEventInput!]! @import(from: "item_id_10_sales.rows"),
  $futureEvents: [FutureEvent!]! = [{ds: "2020-05-10"}],
  $seasonalityPriorScale: Float = 10,
  $holidaysPriorScale: Float = 10,
  $changepointPriorScale: Float = 0.05,
  $intervalWidth: Float = 0.8,
) @api(id: "prophet") {
  historyFromRecords(records: $records) {
    fitProphet(
      seasonalityPriorScale: $seasonalityPriorScale,
      holidaysPriorScale: $holidaysPriorScale,
      changepointPriorScale: $changepointPriorScale,
      intervalWidth: $intervalWidth,
    ) {
      forecast(futureRecords: $futureEvents) {
        history {
          metrics {
            rmse
            mape
            mdape
          }
        }
        future {
          records {
            ds
            yhat
            yhatLower
            yhatUpper
          }
        }
      }
    }
  }
}

Use aggregated order data to make future forecasts

Using the returned id, you'll define a new Job that invokes the forecast API

mutation createForecastJob(
  $futureDates: [Any!]! = ["2021-05-20"],
  $ordersJob: String! @import(from: "ordersJob")
  $forecastURL: String! @apiURL(id: "prophet"),
  $forecastURLAuthToken: String! @apiToken(id: "prophet")
) @api(id: "batch") {
  createJob(
    input: {
      secrets: [
        {
          key: "forecast_auth_token"
          value: $forecastURLAuthToken
        }
      ]
      rateLimits: [
        {
          key: "prophet"
          limit: 10
          durationSeconds: 60
          burst: 10
        }
      ]
      tasks: [
        {
          attach: {
            name: "orders"
            jobSetID: $ordersJob
          }
        }
        {
          sql: {
            query: "CREATE TABLE items AS SELECT distinct item_id FROM orders.orders limit 10;"
          }
        }
        {
          literal: {
            name: "future_dates"
            columns: [
              {
                name: "ds"
                type: TEXT
                values: $futureDates
              }
            ]
          }
        }
        {
          graphql: {
            name: "forecasts"
            url: $forecastURL
            rateLimitKey: "prophet",
            fromTable: "items"
            columns: ["item_id"]
            authorization: {
              bearer: {
                tokenFromSecret: "forecast_auth_token"
              }
            }
            vars: [
              {
                name: "records",
                valueFromSQL: {
                  query: "SELECT ds, y FROM orders.orders WHERE item_id = ? AND y > 0",
                  parameters: ["item_id"],
                }
              }
              {
                name: "futureEvents"
                valueFromSQL: {
                  query: "SELECT ds FROM future_dates"
                }
              }
            ]
            operation: """
              query forecasts(
                $records: [PastEventInput!]!,
                $futureEvents: [FutureEvent!]!,
                $seasonalityPriorScale: Float = 10,
                $holidaysPriorScale: Float = 10,
                $changepointPriorScale: Float = 0.05,
                $intervalWidth: Float = 0.8,
              ) {
                historyFromRecords(records: $records) {
                  fitProphet(
                    seasonalityPriorScale: $seasonalityPriorScale,
                    holidaysPriorScale: $holidaysPriorScale,
                    changepointPriorScale: $changepointPriorScale,
                    intervalWidth: $intervalWidth,
                  ) {
                  forecast(futureRecords: $futureEvents) {
                    history {
                      metrics {
                        rmse
                        mape
                        mdape
                      }
                    }
                    future {
                      records {
                        ds
                        yhat
                        yhatLower
                        yhatUpper
                      }
                    }
                  }
                }
              }
            """
          }
        }
      ]
    }
  ) {
    awaitFinish(timeoutSeconds: 120) {
      id @export(as: "forecastJob")
      status
      error {
        message
      }
    }
  }
}

Poll for forecast status

query pollStatus(
  $job: String! @import(from: "forecastJob")
) @api(id: "batch") {
  job(id: $job) {
    status
    error {
      message
    }
  }
}

Get UI URL for forecast

query getUI(
  $job: String! @import(from: "forecastJob")
) @api(id: "batch") {
  job(id: $job) {
    ui @export(as: "forecastUIURL")
  }
}
with ordersJob as id
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment