Skip to content

Instantly share code, notes, and snippets.

@TobyEalden
Created March 29, 2019 15:14
Show Gist options
  • Save TobyEalden/9bce0efb207669f15600d043e9a74163 to your computer and use it in GitHub Desktop.
Save TobyEalden/9bce0efb207669f15600d043e9a74163 to your computer and use it in GitHub Desktop.
Databot subscription
module.exports = (function() {
"use strict";
/**
* Databot input requirements:
* {
* queueId: "<dataset id to be monitored>",
* tdxServer: "e.g. http://tdx.nqm-1.com",
* ddpServer: "e.g. http://ddp.nqm-1.com"
* }
*/
const databot = function(input, output, context) {
const {TDXConnections, Tracker} = require("@nqminds/nqm-tdx-client");
const WebSocket = require("ws");
const tdxConfig = {
accessToken: context.authToken,
tdxServer: input.tdxServer,
ddpServer: input.ddpServer
};
const tdxConnections = new TDXConnections(tdxConfig, WebSocket);
// This will be called any time the subscription changes.
const processQueue = function(pending) {
if (pending) {
output.debug("pending %s", pending.id);
}
};
// Subscription query, status is pending.
const lookup = {status: "pending"};
// Subscription options, most recent document.
const options = {limit: 1, sort: {createdAt: -1}};
// Alternative 1 => track any changes to the subscription. The tracker will automatically re-run whenever
// the matched set changes.
const trackQueue = function() {
Tracker.autorun(() => {
// Start subscription.
const sub = tdxConnections.defaultTDX.subscribe("datasetData", [input.queueId, lookup, options]);
if (sub.ready()) {
// When subscription is ready, check queue - n.b. this will be called repeatedly as changes occur.
const pending = tdxConnections.defaultTDX.cache.data.findOne({_d: input.queueId, ...lookup}, options);
processQueue(pending);
}
});
};
// Alternative 2 => observer a cursor, gives fine grained control.
const observeQueue = function() {
// Start subscription.
const sub = tdxConnections.defaultTDX.subscribe("datasetData", [input.queueId, lookup, options]);
// Get a cursor.
const cursor = tdxConnections.defaultTDX.cache.data.find({_d: input.queueId, ...lookup}, options);
// Observe changes in the cursor.
cursor.observe({
added: (doc) => {
output.debug("doc added");
processQueue(doc);
},
changed: (doc) => {
output.debug("doc changed");
processQueue(doc);
},
removed: (doc) => {
output.debug("doc removed");
}
});
};
// This event fires once the authentication succeedes.
tdxConnections.on("user", (connection) => {
output.debug(`tdx connection ${connection.tdx.tdxServer} initialised for ${connection.user.username}`);
// observeQueue();
trackQueue();
});
// Something went wrong.
tdxConnections.on("error", (err) => {
output.debug("tdx client error [%s]", err.message);
});
// Initialise the TDX connection with the databot auth token.
return tdxConnections.setToken(context.authToken)
.catch((err) => {
output.abort(err.message);
});
}
const input = require("@nqminds/nqm-databot-utils").input;
input.pipe(databot);
}());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment