Skip to content

Instantly share code, notes, and snippets.

@molenzwiebel
Created June 7, 2020 10:46
Show Gist options
  • Save molenzwiebel/0a46876665b99b2d7fde80b8ed0c262b to your computer and use it in GitHub Desktop.
Save molenzwiebel/0a46876665b99b2d7fde80b8ed0c262b to your computer and use it in GitHub Desktop.
Example Hyperion pipeline plugin in node.js
const zmq = require("zeromq");
// Connect to the plugin manager located at `managerHost`, identify
// ourselves as `id` and request connection information. Returns an
// object of { pull: <sub info>, push: <pub info> }.
async function retrieveConnectionDetails(id, managerHost) {
const sock = new zmq.Request();
await sock.connect(managerHost); // connect to plugin manager
// Request pull info.
await sock.send(JSON.stringify({ id, type: "pull" }));
const [pull] = await sock.receive();
// Request push info.
await sock.send(JSON.stringify({ id, type: "push" }));
const [push] = await sock.receive();
return {
pull: JSON.parse(pull),
push: JSON.parse(push)
};
}
const FIELD = "message"; // which field to uppercase
// Helper that connects/binds sock to the connection
// details in `details`, as sent by the plugin manager.
async function connectSocket(sock, details) {
console.dir(details);
if (details.isBind) {
await sock.bind(details.host);
} else {
await sock.connect(details.host);
}
}
// Endlessly pulls messages from the previous plugin,
// transforms them and pushes them to the next one.
async function runPullPushLoop(connectionDetails) {
const pull = new zmq.Pull();
await connectSocket(pull, connectionDetails.pull);
const push = new zmq.Push();
await connectSocket(push, connectionDetails.push);
// For every incoming message.
for await (const [msg] of pull) {
// Parse...
const obj = JSON.parse(msg);
// transform...
if (obj && obj[FIELD]) {
obj[FIELD] = obj[FIELD].toUpperCase();
}
// and push
await push.send(JSON.stringify(obj));
}
}
(async() => {
const details = await retrieveConnectionDetails("MyPipelinePlugin", "tcp://localhost:3000");
console.log("Starting my pipeline plugin...");
await runPullPushLoop(details);
// The loop is infinite, so we should never reach this.
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment