Skip to content

Instantly share code, notes, and snippets.

@nickhsharp
Created March 17, 2017 01:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nickhsharp/43ed4e36d7c89a5be8ec5354fa1d1534 to your computer and use it in GitHub Desktop.
Save nickhsharp/43ed4e36d7c89a5be8ec5354fa1d1534 to your computer and use it in GitHub Desktop.
AWS Step Functions: Hacks & Shims P.3 - Sub Steps with Activity Poller and SubStepCloser
"use strict";
const _get = require("lodash.get");
const AWS = require("aws-sdk");
const step = new AWS.StepFunctions({
region: "us-west-2",
});
const labmda = new AWS.Lambda({
region: "us-west-2",
});
const pollerName = "SubStepPoller";
const polling = {};
function _gatherInputPathData(raw) {
let inputPath = _get(raw, "meta.instructions.InputPath");
// if no InputPath is sent, then we default to all
if(!inputPath || inputPath = "$") {
return raw;
}
// the lodash get function is pretty much JsonPath
// enough for the purposes of our examples, but it doesn't like the $. at the beginning
if(inputPath[0] == "$") {
inputPath = inputPath.substr(1);
}
return _get(raw, inputPath, {});
}
// handle the activity which was returned from the poll
function _handler(activity) {
// no taskToken means the activity poll just timed out, but their's nothing to do
if (!activity.taskToken) {
// returning will allow the loop to trigger another 60s long poll
return Promise.resolve();
}
// JSON parse the input
let raw;
try {
raw = JSON.parse(activity.input);
} catch(e) {
// activity poller expects an object not a primitive
return Promise.reject(new Error("ActivityPoller requires an object as its input."));
}
// gather the input data obeying the $.meta.instructions.InputPath
let input = _gatherInputPathData(raw);
// gather the target resource
let target = _get(raw, "meta.instructions.Resource");
// create the standardized Step Function payload
let payload = {
initial: input,
meta: {
name: uuid(),
stepArn: target,
taskToken: taskToken,
parentStep: {
name: _get(raw, 'meta.name'),
stepArn: _get(raw, 'meta.stepArn'),
}
}
};
// invoke Step Starter
return lambda.invoke({
FunctionName: 'StepStarter',
InvocationType: 'Event',
Payload: JSON.stringify({
input: payload,
target: target
}),
}).promise();
}
// do the actual poll.
function _doPoll(arn) {
step.getActivityTask({
activityArn: arn,
workerName: pollerName,
}).promise().then(handler).then(() => {
// after the handler is done handing it off to StepStarter
// if we are still polling then trigger up another
// use nextTick to break the stack so you don't overflow
if(polling[arn]) {
process.nextTick(() => {
_doPoll(arn);
});
}
}).catch((err) => {
// failures in the activityPoller handler need to be sent back up
// to stepFunctions to avoid activities lost in limbo.
step.sendTaskFailure({
taskToken: event.meta.taskToken,
error: err.name,
cause: err.message,
})
})
}
// start polling for a specific activity arn
function start(arn) {
if(!polling[arn]) {
polling[arn] = true;
_doPoll(arn);
}
}
// stop polling for a specific activity arn
// don't stop immediately, due to a bug in Activities that would be bad
// just don't trigger the next poll after your current one is done
function stop(arn) {
polling[arn] = false;
}
module.exports = {
start,
stop
}
"use strict";
const AWS = require('aws-sdk');
const step = new AWS.StepFunctions({
region: 'us-west-2',
});
module.exports.handler = (event, context, callback) => {
// if execution was not a Sub Step then no-op return;
if(!event.meta || !event.meta.taskToken) {
return callback(null);
}
// default success condition is true
let method = "sendTaskSuccess";
if(event.meta.success === false) {
method = "sendTaskFailure";
} else {
// set onto the returned event, because Choice states are frustratingly strict
event.meta.success = true;
}
// invoke the Success or Failure of the Activity from the Parent Step Function
return step[method]({
taskToken: event.meta.taskToken,
output: JSON.stringify(event.report),
}).promise().then(() => {
callack(null, event);
}).catch((err) => {
callback(err);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment