Created
March 17, 2017 01:25
-
-
Save nickhsharp/43ed4e36d7c89a5be8ec5354fa1d1534 to your computer and use it in GitHub Desktop.
AWS Step Functions: Hacks & Shims P.3 - Sub Steps with Activity Poller and SubStepCloser
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
const _get = require("lodash.get"); | |
const AWS = require("aws-sdk"); | |
const step = new AWS.StepFunctions({ | |
region: "us-west-2", | |
}); | |
const labmda = new AWS.Lambda({ | |
region: "us-west-2", | |
}); | |
const pollerName = "SubStepPoller"; | |
const polling = {}; | |
function _gatherInputPathData(raw) { | |
let inputPath = _get(raw, "meta.instructions.InputPath"); | |
// if no InputPath is sent, then we default to all | |
if(!inputPath || inputPath = "$") { | |
return raw; | |
} | |
// the lodash get function is pretty much JsonPath | |
// enough for the purposes of our examples, but it doesn't like the $. at the beginning | |
if(inputPath[0] == "$") { | |
inputPath = inputPath.substr(1); | |
} | |
return _get(raw, inputPath, {}); | |
} | |
// handle the activity which was returned from the poll | |
function _handler(activity) { | |
// no taskToken means the activity poll just timed out, but their's nothing to do | |
if (!activity.taskToken) { | |
// returning will allow the loop to trigger another 60s long poll | |
return Promise.resolve(); | |
} | |
// JSON parse the input | |
let raw; | |
try { | |
raw = JSON.parse(activity.input); | |
} catch(e) { | |
// activity poller expects an object not a primitive | |
return Promise.reject(new Error("ActivityPoller requires an object as its input.")); | |
} | |
// gather the input data obeying the $.meta.instructions.InputPath | |
let input = _gatherInputPathData(raw); | |
// gather the target resource | |
let target = _get(raw, "meta.instructions.Resource"); | |
// create the standardized Step Function payload | |
let payload = { | |
initial: input, | |
meta: { | |
name: uuid(), | |
stepArn: target, | |
taskToken: taskToken, | |
parentStep: { | |
name: _get(raw, 'meta.name'), | |
stepArn: _get(raw, 'meta.stepArn'), | |
} | |
} | |
}; | |
// invoke Step Starter | |
return lambda.invoke({ | |
FunctionName: 'StepStarter', | |
InvocationType: 'Event', | |
Payload: JSON.stringify({ | |
input: payload, | |
target: target | |
}), | |
}).promise(); | |
} | |
// do the actual poll. | |
function _doPoll(arn) { | |
step.getActivityTask({ | |
activityArn: arn, | |
workerName: pollerName, | |
}).promise().then(handler).then(() => { | |
// after the handler is done handing it off to StepStarter | |
// if we are still polling then trigger up another | |
// use nextTick to break the stack so you don't overflow | |
if(polling[arn]) { | |
process.nextTick(() => { | |
_doPoll(arn); | |
}); | |
} | |
}).catch((err) => { | |
// failures in the activityPoller handler need to be sent back up | |
// to stepFunctions to avoid activities lost in limbo. | |
step.sendTaskFailure({ | |
taskToken: event.meta.taskToken, | |
error: err.name, | |
cause: err.message, | |
}) | |
}) | |
} | |
// start polling for a specific activity arn | |
function start(arn) { | |
if(!polling[arn]) { | |
polling[arn] = true; | |
_doPoll(arn); | |
} | |
} | |
// stop polling for a specific activity arn | |
// don't stop immediately, due to a bug in Activities that would be bad | |
// just don't trigger the next poll after your current one is done | |
function stop(arn) { | |
polling[arn] = false; | |
} | |
module.exports = { | |
start, | |
stop | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
const AWS = require('aws-sdk'); | |
const step = new AWS.StepFunctions({ | |
region: 'us-west-2', | |
}); | |
module.exports.handler = (event, context, callback) => { | |
// if execution was not a Sub Step then no-op return; | |
if(!event.meta || !event.meta.taskToken) { | |
return callback(null); | |
} | |
// default success condition is true | |
let method = "sendTaskSuccess"; | |
if(event.meta.success === false) { | |
method = "sendTaskFailure"; | |
} else { | |
// set onto the returned event, because Choice states are frustratingly strict | |
event.meta.success = true; | |
} | |
// invoke the Success or Failure of the Activity from the Parent Step Function | |
return step[method]({ | |
taskToken: event.meta.taskToken, | |
output: JSON.stringify(event.report), | |
}).promise().then(() => { | |
callack(null, event); | |
}).catch((err) => { | |
callback(err); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment