Skip to content

Instantly share code, notes, and snippets.

@jthomas
Last active May 14, 2019 09:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jthomas/327214b107cc5b5a572eab8199f73e75 to your computer and use it in GitHub Desktop.
Save jthomas/327214b107cc5b5a572eab8199f73e75 to your computer and use it in GitHub Desktop.
Example showing how to handle intermittant action failures for large number of invocations using Redis
"use strict";
// What percentage of invocations should randomly fail?
const ERROR_RATE = 0.25
// Random delay to results being returned (0 -> 10 seconds)
const DELAY_MS = Math.random() * 10000
function should_fail () {
return Math.random() < ERROR_RATE
}
function main(params) {
if (should_fail()) throw new Error('failed!')
const sum = params.a + params.b
return new Promise(resolve => {
setTimeout(() => {
resolve({ sum })
}, DELAY_MS)
})
}
const redis = require('redis')
const jobs = require('./job_store.js')
client = redis.createClient()
client.on("error", function (err) {
console.log("Error " + err);
})
const openwhisk = require('openwhisk');
const options = {
apihost: <INSERT_API_HOST>,
api_key: <INSERT_API_KEY>
}
const ow = openwhisk(options)
const rand_int = () => Math.floor(Math.random() * 100)
;(async () => {
// 100 invocations, 1 second between polls, 15 second maximum polling time, 15 maximum retries
const max_duration = 1000 * 15
const maximum_retries = 5
const invocations = 10
const action = 'sum'
// Generate input data for N invocations.
const input = new Array(invocations).fill(null).map(() => {
return { a: rand_int(), b: rand_int() }
})
// Fire invocation and store activation details in Redis
const job_ids = input.map(async params => {
const id = await jobs.create(client, action, params, maximum_retries)
console.log(`created job (#${id}) for ${action} with params: ${JSON.stringify(params)}`)
const expires = (Date.now()) + max_duration
const activation = await ow.actions.invoke({name: action, params})
console.log(`invoked action for job (#${id}), expires at: ${expires}`)
await jobs.store_activation(client, id, activation.activationId, expires)
return id
})
const results = await Promise.all(job_ids)
console.log('job ids created for invocations', results)
})();
"use strict";
const ACTIVE_JOBS = 'active_activations'
const JOB_COUNTER = 'job_counter'
// Generate UUID for jobs
const next_id = async client => {
return new Promise((resolve, reject) => {
client.incr(JOB_COUNTER, (err, result) => {
if (err) return reject(err)
resolve(result)
})
})
}
// Create job in Redis for invocation.
// Store action name, invocation parameters and retries available.
// Return jobs id.
const create = async (client, action, params, retries) => {
return new Promise(async (resolve, reject) => {
const id = await next_id(client)
const job_key = `${id}_details`
const fields = [
'action', action,
'params', JSON.stringify(params),
'retries', retries
]
client.hmset(job_key, fields, (err, res) => {
if (err) return reject(err)
resolve(id)
})
})
}
// Return list of active invocation jobs
const active = async client => {
return new Promise((resolve, reject) => {
client.smembers(ACTIVE_JOBS, (err, jobs) => {
if (err) return reject(err)
resolve(jobs)
})
})
}
// Return in-flight activation for invocation job.
// This is the last member of the activation list for a job.
const latest_activation = async (client, job) => {
const JOB_ACTIVATIONS = `${job}_activations`
return new Promise((resolve, reject) => {
client.lrange(JOB_ACTIVATIONS, -1, -1, (err, jobs) => {
if (err) return reject(err)
resolve(jobs[0])
})
})
}
// Has max polling time for an activation elapsed?
const activation_expired = async (client, job) => {
return new Promise((resolve, reject) => {
const JOB_FINISH_KEY = `${job}_expiry`
client.get(JOB_FINISH_KEY, (err, expiry) => {
if (err) return reject(err)
const expired = Date.now() > parseInt(expiry, 10)
resolve(expired)
})
})
}
// Retrieve all job details
const details = async (client, job) => {
return new Promise((resolve, reject) => {
const job_key = `${job}_details`
client.hgetall(job_key, (err, details) => {
if (err) return reject(err)
resolve(details)
})
})
}
// Store new activation details for a job.
// All job to active jobs set. Push latest activation to job activation queue.
// Update expiry time for job activation polling.
const store_activation = async (client, job, activation_id, expiry) => {
return new Promise((resolve, reject) => {
const multi = client.multi()
multi.sadd(ACTIVE_JOBS, job)
const JOB_ACTIVATIONS = `${job}_activations`
multi.rpush(JOB_ACTIVATIONS, activation_id)
const JOB_FINISH_KEY = `${job}_expiry`
multi.set(JOB_FINISH_KEY, expiry)
multi.exec((err, replies) => {
if (err) return reject(err)
resolve()
})
})
}
// Does job have invocation retries remaining?
const has_retries = async (client, job) => {
return new Promise((resolve, reject) => {
const job_key = `${job}_details`
client.hget(job_key, 'retries', (err, retries) => {
if (err) return reject(err)
resolve(parseInt(retries, 10) > 0)
})
})
}
// Decrement retries available for a job.
const used_retry = async (client, job) => {
return new Promise((resolve, reject) => {
const job_key = `${job}_details`
client.hincrby(job_key, 'retries', -1, (err) => {
if (err) return reject(err)
resolve()
})
})
}
// Job has finished, store action result and remove job from active set.
const has_finished = async (client, job, result = null) => {
return new Promise((resolve, reject) => {
const multi = client.multi()
const job_key = `${job}_details`
multi.hset(job_key, 'result', JSON.stringify(result))
multi.srem(ACTIVE_JOBS, job)
multi.exec((err, replies) => {
if (err) return reject(err)
resolve()
})
})
}
module.exports = { create, active, latest_activation, activation_expired,
details, store_activation, has_retries, used_retry, has_finished }
"use strict";
const redis = require('redis')
const jobs = require('./job_store.js')
const max_duration = 1000 * 15
const client = redis.createClient()
client.on("error", function (err) {
console.log("Error " + err);
})
// Set up Apache OpenWhisk Client SDK
const openwhisk = require('openwhisk');
const options = {
apihost: <INSERT_API_HOST>,
api_key: <INSERT_API_KEY>
}
const ow = openwhisk(options)
// Set up Apache OpenWhisk Client SDK
const openwhisk = require('openwhisk');
const options = {
apihost: 'eu-gb.functions.cloud.ibm.com',
api_key: '53f1c23a-1022-48c5-9c16-cfd8b229db04:aPHwGYsyopyNUyoIZgNyJP1hJ1zpi9g9uLmSlJpsKoB6aTrmGLi6a8nsO32pFGFB'
}
const ow = openwhisk(options)
// Retrieve activation result for identifier.
// Returns null if 404 is returned, otherwise error is thrown.
const retrieve_activation = async id => {
let result = null
try {
result = await ow.activations.get({ name: id })
console.log(`activation result (${id}) now available!`)
} catch (err) {
if (err.statusCode !== 404) {
throw err
}
console.log(`activation result (${id}) is not available yet.`)
}
return result
}
// Retry invocation using stored parameters.
// If job retries are exhausted, remove active job for this request.
// Otherwise, retry invocation and store new activation details
const retry_job = async (client, id) => {
if (await jobs.has_retries(client, id)) {
console.log(`retrying job (#${id})`)
const expires = (Date.now()) + max_duration
const details = await jobs.details(client, id)
const activation = await ow.actions.invoke({name: details.action, params: JSON.parse(details.params)})
console.log(`invoked action for job (#${id}), expires at: ${expires}`)
await jobs.store_activation(client, id, activation.activationId, expires)
await jobs.used_retry(client, id)
} else {
console.log(`job (#${id}) has no more retries`)
await jobs.has_finished(client, id)
}
}
// Retrieve all in-flight invocations from Redis and check for results.
// For each invocation, based on activation result active job might
// be either finished, retried or left to complete.
const main = async () => {
const active_jobs = await jobs.active(client)
console.log('retrieved the following active jobs:', active_jobs)
for (let job of active_jobs) {
const activation_id = await jobs.latest_activation(client, job)
console.log(`job (#${job}) latest activation => ${activation_id}`)
const result = await retrieve_activation(activation_id)
// if result is unavailable and polling deadline reached -> retry job.
// if result is unavailable and polling deadline not reached -> do nothing.
if (!result) {
console.log(`job (#${job}) activation is not available`)
if (await jobs.activation_expired(client, job)) {
console.log(`job (#${job}) activation has expired`)
await retry_job(client, job)
}
return
}
console.log(`job (#${job}) activation result is available`)
// if result is available and succeeded -> finish job and store result.
// if result is available and failed -> retry job
if (result.response.success) {
console.log(`job (#${job}) activation was successful`)
await jobs.has_finished(client, job, result.response.result)
} else {
console.log(`job (#${job}) activation was not successful`)
await retry_job(client, job)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment