This guide will teach you how to do a batch forecast for every item you have in your database.
The code snippets in this guide are intended to be run interactively as you follow along. The snippets are also interrelated, so outputs are automatically propagated from snippet to snippet, as needed. This is an experimental format that we're eager to receive feedback on. Please send us a note with any thoughts you have.
The forecast API you'll be using is based on the prophet package.
features
url
apiKey
Batch forecasts
Most forecasts are produced by a API that produces predictions for one timeseries at a time. Since each item has its own timeseries, you'll need a way to call the forecast API once for each item.
Preparing the data and invoking the API can be quite slow if done serially. It would be much faster to do this in large, parallel batches. Ideally you would run these batches in a repeatable and scalable fashion that makes it easy to inspect and analyze after the fact.
The Batch Job API works with any tabular data and any GraphQL API. Under the hood it uses an SQLite database to store all configuration, fetched data, query results, and API outputs.
url
apiKey
This guide uses the batch job API to prepare your data and manage invocation of the forecasting API.
Each job is operated out of a separate SQLite database. Jobs are created via the createJob
mutation.
Job
Task
To keep things fast and easy to debug, you'll use multiple jobs. The first job will just import and prepare the historical data. The second job will use the output of the first and actually do the forecast.
Since you're working with raw export from your traditional database, you need to do some basic cleanup and aggregation. We'll calculate how many of each item you sold on each date via a single SQL statement.
mutation createOrdersJob(
$url: String! = "https://gist.github.com/ajbouh/fb9037f723a9706f05f9f43d680a19c6/raw/aec94cd1f3191c7213e346871e615548f5228668/sales.csv"
) @api(id: "batch") {
createJob(
input: {
tasks: [
{
fetch: {
name: "raw_orders"
format: CSV
formatOptions: {
columns: {
name: "quantity"
type: INTEGER
}
}
url: $url
}
},
{
sql: {
query: """
CREATE TABLE orders
AS SELECT
sum(Quantity) as y,
item_id,
ds
FROM (
SELECT
StockCode as item_id,
Quantity,
InvoiceDate as ds
FROM raw_orders
WHERE
Quantity > 0
ORDER BY
ds ASC
)
GROUP BY
item_id,
ds
;
"""
}
}
]
}
) {
awaitFinish(timeoutSeconds: 120) {
id @export(as: "ordersJob")
}
}
}
Using the returned id
, you can query the job status.
query ordersJobStatus(
$job: String! @import(from: "ordersJob")
) @api(id: "batch") {
job(id: $job) {
status
error {
message
}
}
}
We can also wait for the job to be complete.
query ordersJobFinalStatus(
$job: String! @import(from: "ordersJob")
) @api(id: "batch") {
job(id: $job) {
awaitFinish(timeoutSeconds: 120) {
status
error {
message
}
}
}
}
And make adhoc SQL queries
query ordersJobPeekItems(
$job: String! @import(from: "ordersJob"),
$itemId: String! = "10002"
) @api(id: "batch") {
job(id: $job) {
first_ten_items: query(sql: "SELECT distinct item_id FROM orders LIMIT 10;")
sales_for_single_item: query(
sql: "SELECT ds, y FROM orders WHERE item_id = ? AND y > 0 LIMIT 10;"
parameters: [$itemId]
style: ROW_OBJECTS
) @export(as: "item_id_10_sales")
}
}
At this point you might want to try making a forecast API with your item data.
query forecastSingleItem(
$records: [PastEventInput!]! @import(from: "item_id_10_sales.rows"),
$futureEvents: [FutureEvent!]! = [{ds: "2020-05-10"}],
$seasonalityPriorScale: Float = 10,
$holidaysPriorScale: Float = 10,
$changepointPriorScale: Float = 0.05,
$intervalWidth: Float = 0.8,
) @api(id: "prophet") {
historyFromRecords(records: $records) {
fitProphet(
seasonalityPriorScale: $seasonalityPriorScale,
holidaysPriorScale: $holidaysPriorScale,
changepointPriorScale: $changepointPriorScale,
intervalWidth: $intervalWidth,
) {
forecast(futureRecords: $futureEvents) {
history {
metrics {
rmse
mape
mdape
}
}
future {
records {
ds
yhat
yhatLower
yhatUpper
}
}
}
}
}
}
Using the returned id
, you'll define a new Job that invokes the forecast API
mutation createForecastJob(
$futureDates: [Any!]! = ["2021-05-20"],
$ordersJob: String! @import(from: "ordersJob")
$forecastURL: String! @apiURL(id: "prophet"),
$forecastURLAuthToken: String! @apiToken(id: "prophet")
) @api(id: "batch") {
createJob(
input: {
secrets: [
{
key: "forecast_auth_token"
value: $forecastURLAuthToken
}
]
rateLimits: [
{
key: "prophet"
limit: 10
durationSeconds: 60
burst: 10
}
]
tasks: [
{
attach: {
name: "orders"
jobSetID: $ordersJob
}
}
{
sql: {
query: "CREATE TABLE items AS SELECT distinct item_id FROM orders.orders limit 10;"
}
}
{
literal: {
name: "future_dates"
columns: [
{
name: "ds"
type: TEXT
values: $futureDates
}
]
}
}
{
graphql: {
name: "forecasts"
url: $forecastURL
rateLimitKey: "prophet",
fromTable: "items"
columns: ["item_id"]
authorization: {
bearer: {
tokenFromSecret: "forecast_auth_token"
}
}
vars: [
{
name: "records",
valueFromSQL: {
query: "SELECT ds, y FROM orders.orders WHERE item_id = ? AND y > 0",
parameters: ["item_id"],
}
}
{
name: "futureEvents"
valueFromSQL: {
query: "SELECT ds FROM future_dates"
}
}
]
operation: """
query forecasts(
$records: [PastEventInput!]!,
$futureEvents: [FutureEvent!]!,
$seasonalityPriorScale: Float = 10,
$holidaysPriorScale: Float = 10,
$changepointPriorScale: Float = 0.05,
$intervalWidth: Float = 0.8,
) {
historyFromRecords(records: $records) {
fitProphet(
seasonalityPriorScale: $seasonalityPriorScale,
holidaysPriorScale: $holidaysPriorScale,
changepointPriorScale: $changepointPriorScale,
intervalWidth: $intervalWidth,
) {
forecast(futureRecords: $futureEvents) {
history {
metrics {
rmse
mape
mdape
}
}
future {
records {
ds
yhat
yhatLower
yhatUpper
}
}
}
}
}
"""
}
}
]
}
) {
awaitFinish(timeoutSeconds: 120) {
id @export(as: "forecastJob")
status
error {
message
}
}
}
}
query pollStatus(
$job: String! @import(from: "forecastJob")
) @api(id: "batch") {
job(id: $job) {
status
error {
message
}
}
}
query getUI(
$job: String! @import(from: "forecastJob")
) @api(id: "batch") {
job(id: $job) {
ui @export(as: "forecastUIURL")
}
}
with ordersJob as id