Skip to content

Instantly share code, notes, and snippets.

@andrhamm
Last active December 23, 2021 06:35
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrhamm/3db24c60fa320617217e5a3c84a7a2e3 to your computer and use it in GitHub Desktop.
Save andrhamm/3db24c60fa320617217e5a3c84a7a2e3 to your computer and use it in GitHub Desktop.
API Gateway Synchronous Step Function Execution
// handlers/await-state-machine-execution.js
import AWS from 'aws-sdk';
import { snakeCaseObj } from '../lib/common';
const stepfunctions = new AWS.StepFunctions({apiVersion: '2016-11-23'});
const {
STATE_MACHINE_ARN,
} = process.env;
// As gross as this is, AWS will soon release the ability to execute
// a Step Function synchronously and this will no longer be necessary
export const handler = async (input, context, callback) => {
// console.log(`event: ` + JSON.stringify(input, null, 2));
let {
stateMachineArn,
executionInput,
executionName,
} = input;
if (!stateMachineArn) {
stateMachineArn = STATE_MACHINE_ARN;
}
if (!executionInput) {
executionInput = input || {};
}
if (!executionName && input.requestContext) {
executionName = input.requestContext.requestId;
}
const { executionArn } = await stepfunctions.startExecution({
stateMachineArn,
input: JSON.stringify(executionInput),
...executionName && { name: executionName },
}).promise();
let delayMs = 50;
let execution;
let attempt = 0;
do {
await delay(delayMs);
execution = await stepfunctions.describeExecution({ executionArn }).promise();
console.log(`attempt ${++attempt} (${delayMs}ms) ${execution.status}`);
delayMs = 200;
} while (execution.status === 'RUNNING');
if (input.requestContext && input.requestContext.apiId) {
let statusCode;
let body;
let headers;
let headersOverride;
let errorMessage;
let errorType;
let trace;
if (execution.status === 'SUCCEEDED') {
statusCode = 200;
const result = JSON.parse(execution.output);
if (result.Error) {
let { Error: code, Cause: causeJson } = result;
if (causeJson) {
try {
// console.log(`causeJson: ${causeJson}`);
const cause = JSON.parse(causeJson);
({errorMessage, errorType, trace} = cause);
errorMessage = errorMessage.split("\n")[0];
// console.log(`trace: ${JSON.stringify(trace)}`);
({statusCode, headers: headersOverride, code} = JSON.parse(trace.find((l) => l.startsWith('Extra: ')).slice(7)));
} catch (e) {
console.log(`Failed to parse Extra from stack trace: ${e.message}`);
}
} else {
statusCode = error.includes('BadRequest') ? 400 : 500;
}
body = {
message: errorMessage || errorType || error,
code: code,
};
} else if (result.statusCode) {
// console.log('result.statusCode');
({ statusCode, body, headers } = result);
if (!body) {
body = result;
delete body.headers;
delete body.statusCode;
}
} else {
// console.log('else body = result');
body = result;
}
body = snakeCaseObj(body);
} else {
statusCode = 500;
body = {
message: `Internal Server Error`,
code: `EXECUTION_${execution.status}`,
};
}
const resp = {
statusCode,
headers: headers || ({
'Content-Type': 'application/json',
...headersOverride,
}),
body: JSON.stringify(body, null, 2),
};
console.log(`resp: ${JSON.stringify(resp, null, 2)}`);
callback(null, resp);
} else {
console.log(`requestContext and requestContext.apiId not found, normal callback`);
callback(null, execution);
}
}
// lib/errors.js
module.exports = class CustomError extends Error {
constructor (message, extra) {
// Calling parent constructor of base Error class.
super(`${message}\nExtra: ${JSON.stringify(extra)}`);
// Saving class name in the property of our custom error as a shortcut.
this.name = this.constructor.name;
// this.name = 'CustomError'; // might have to do this when minifying
// Capturing stack trace, excluding constructor call from it.
Error.captureStackTrace(this, this.constructor);
}
};
# ...
putJob:
handler: handlers/await-state-machine-execution.handler
description: Idempotent job create/update
timeout: 30
environment:
STATE_MACHINE_ARN: ${self:resources.Outputs.UpdateJobStateMachineARN.Value}
events:
- http:
path: jobs
method: put
# ...
// in any of the step function tasks (lambda functions) when wanting to return an error with API response status code + message, use this custom error... it shoves your extra data into the stack trace so it can be retrieved from the step function error (yeah its janky af but this won't be necessary soon when AWS adds first-class support for synchronously invoking step functions from APIG)
import CustomError from '../lib/errors';
export const handler = async (input, context, callback) => {
const error = new CustomError("Schedule is not a valid schedule expression", {statusCode: 400, code: 'InvalidSchedule'});
callback(error);
}
@colvint
Copy link

colvint commented Aug 27, 2020

Totally gross but apparently necessary if we want to obtain a step function result synchronously. Posting here on the off chance you’ve heard whether step functions support this natively yet?

@andrhamm
Copy link
Author

Totally gross but apparently necessary if we want to obtain a step function result synchronously. Posting here on the off chance you’ve heard whether step functions support this natively yet?

@colvint I haven't heard anything, though I'm not watching as closely as I was at the time of writing this. It is a bit much but it actually worked wayyy better than I expected. It was reasonably fast and I can't recall having any problems from it. Good luck!

@colvint
Copy link

colvint commented Aug 27, 2020

Totally gross but apparently necessary if we want to obtain a step function result synchronously. Posting here on the off chance you’ve heard whether step functions support this natively yet?

@colvint I haven't heard anything, though I'm not watching as closely as I was at the time of writing this. It is a bit much but it actually worked wayyy better than I expected. It was reasonably fast and I can't recall having any problems from it. Good luck!

Gonna need it -- thanks for this pattern. Cheers!

@colvint
Copy link

colvint commented Aug 28, 2020

Using your pattern as inspiration, ended up with:

const promisePoller = require('promise-poller').default // npm package for just this sort of nastiness
const { StepFunctions } = require('aws-sdk')

const { describeExecution } = new StepFunctions()
const TIMER_LABEL = 'step-function-poller'

const checkStateMachineExecution = executionArn => () =>
    new Promise(async (res, rej) => {
        const execution = await describeExecution({ executionArn }).promise()

        console.timeLog(TIMER_LABEL)
        console.dir({ execution })

        execution && execution.status === 'SUCCEEDED' ? res(execution) : rej(execution)
    })

const pollStateMachineExecutionWith  = pollOptions => executionArn =>
    promisePoller({ taskFn: checkStateMachineExecution(executionArn), progressCallback: console.dir, ...pollOptions })

const pollMoreSlowly = pollStateMachineExecutionWith({ interval: 500, retries: 5 })
const pollQuickly = pollStateMachineExecutionWith({ interval: 10, retries: 5 })

console.time(TIMER_LABEL)

Promise.all([
    pollQuickly(HAPPY_STATE_MACHINE_EXECUTION_ARN),
    pollMoreSlowly(SAD_STATE_MACHINE_EXECUTION_ARN),
])
.then(console.dir)
.catch(console.error)

Somewhat tidier but still not ideal. Upside: promise-poller gives us configurable retry and backoff with fewer moving parts.

Behavior is what we want:

image

@colvint
Copy link

colvint commented Aug 28, 2020

Bonus for those who think low-code is decidedly un-shameful and would like to draw their step functions (shout out Bret Victor): https://github.com/colvint/step-functions-draw.io (credit @sakazuki)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment