Skip to content

Instantly share code, notes, and snippets.

@diegoeche
Created October 25, 2017 15:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save diegoeche/fc144dcf0007e3561fb5776fca0d4bd6 to your computer and use it in GitHub Desktop.
Save diegoeche/fc144dcf0007e3561fb5776fca0d4bd6 to your computer and use it in GitHub Desktop.
const axios = require('axios')
const sleep = require('sleep')
const base64 = require('base-64')
const appId = process.env["GEENY_APPLICATION_ID"]
const host = process.env["GEENY_APPLICATION_BROKER_URL"]
const brokerConfig = {
appId: appId,
messageTypeId: '54121087-14f1-4c2a-835f-117681618cc9', // incoming Develco messageType
iteratorType: 'LATEST',
maxBatchSize: 10
}
// Simple wrapper to send a message to the central log
function log (message) {
process.send({ cmd: "log", message: message, date: Date() })
}
async function request (method, url, data) {
try {
const response = await axios.request({
method: method,
url: url,
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
data: data
})
return response.data
} catch (err) { throw err }
}
async function getShards () {
try {
const url = `${host}/${brokerConfig.appId}/messageType/${brokerConfig.messageTypeId}`
const response = await request('get', url)
return response.shards
} catch (err) { throw err }
}
async function getIterator (shardId) {
try {
const data = {
shardId: shardId,
iteratorType: brokerConfig.iteratorType,
maxBatchSize: brokerConfig.maxBatchSize
}
const iterator = await request('post', `${host}/${brokerConfig.appId}/messageType/${brokerConfig.messageTypeId}/iterator`, data)
return iterator.shardIterator
} catch (err) {
log(`Error in getIterator, ERROR: ${err.message}`)
throw err
}
}
async function getMessages (iterator, processData) {
try {
const url = `${host}/${brokerConfig.appId}/messageType/${brokerConfig.messageTypeId}/iterator/${iterator}`
const data = await request('get', url)
if (data.messages.length > 0) {
for (let message of data.messages) {
let parsedData = {
userId: message.userId,
thingId: message.thingId,
data: base64.decode(message.payload)
}
processData(parsedData)
}
sleep.msleep(250)
} else {
sleep.sleep(1)
}
getMessages(data.nextIterator, processData)
} catch (err) {
log(`Error in getIterator: ${err.message}`)
throw err
}
}
async function start (processData) {
try {
const shards = await getShards()
const iterator = await getIterator(shards[0].shardId)
await getMessages(iterator, processData)
} catch (err) {
log(`Error at start(), ERROR: ${err.message}`)
sleep.sleep(1)
start()
}
}
start(function (data) {
process.send({ cmd: "add", message: JSON.stringify(data) })
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment