Skip to content

Instantly share code, notes, and snippets.

@aricart
Last active May 23, 2019 16:35
Show Gist options
  • Save aricart/c2b85e0f9fbda69993194e5cbea8643b to your computer and use it in GitHub Desktop.
Save aricart/c2b85e0f9fbda69993194e5cbea8643b to your computer and use it in GitHub Desktop.
stan queue debug
> mkdir test
// copy the files to this dir
> npm install

in one terminal run the server in protocol debug mode:

> nats-streaming-server -SDV

start a sub.js:

> cd test
> node sub.js -i a -q a -d dn foo
STAN connected!
subscription inbox: _INBOX.SDL2FM0MX5GE8WDQ40HKW3
subscribed to foo

publish a message:

> node pub -n 1 foo
STAN connected!
publishing 0

start a second subscriber, this one will "misbehave" and not ack:

> node sub.js -D -i b -q a -d dn foo
-D was provided, will drop messages
STAN connected!
subscription inbox: _INBOX.5NC1C50TBYWVCB5NXL0UM6
subscribed to foo

Publish some new messages:

> node pub -n 3 foo
STAN connected!
publishing 0
publishing 1
publishing 2

on the server, should start seeing messages like:

[14005] 2019/05/23 11:11:18.307310 [DBG] STREAM: [Client:a] Connected (Inbox=_INBOX.SDL2FM0MX5GE8WDQ40HKHI)
[14005] 2019/05/23 11:11:18.311437 [INF] STREAM: Channel "foo" has been created
[14005] 2019/05/23 11:11:18.311564 [DBG] STREAM: [Client:a] Started new durable queue subscription, subject=foo, inbox=_INBOX.SDL2FM0MX5GE8WDQ40HKW3, queue=dn:a, subid=1, sending from beginning, seq=1

which shows the first client connecting.

When you publish a message:

[14005] 2019/05/23 11:11:22.224586 [DBG] STREAM: [Client:node-stan-pub] Connected (Inbox=_INBOX.9SGJ8W1AN5Z47H407IBRWJ)
[14005] 2019/05/23 11:11:22.228726 [TRC] STREAM: [Client:node-stan-pub] Received message from publisher subj=foo guid=9SGJ8W1AN5Z47H407IBSNH
# Next you'll see the server delivering:
[14005] 2019/05/23 11:11:22.228755 [TRC] STREAM: [Client:a] Delivering msg to subid=1, subject=foo, seq=1
# acking of the publisher
[14005] 2019/05/23 11:11:22.228772 [TRC] STREAM: [Client:node-stan-pub] Acking Publisher subj=foo guid=9SGJ8W1AN5Z47H407IBSNH
# publisher going away
[14005] 2019/05/23 11:11:22.229837 [DBG] STREAM: [Client:node-stan-pub] Closed (Inbox=_INBOX.9SGJ8W1AN5Z47H407IBRWJ)
# Next the streaming server processes the ack from the subscriber
[14005] 2019/05/23 11:11:22.735270 [TRC] STREAM: [Client:a] Processing ack for subid=1, subject=foo, seq=1

Add a new member to the queue, this one will not acknowledge messages

[14005] 2019/05/23 11:13:13.805154 [DBG] STREAM: [Client:b] Connected (Inbox=_INBOX.5NC1C50TBYWVCB5NXL0UFD)
[14005] 2019/05/23 11:13:13.808750 [DBG] STREAM: [Client:b] Added member to durable queue subscription, subject=foo, inbox=_INBOX.5NC1C50TBYWVCB5NXL0UM6, queue=dn:a, subid=2

New messages published:

[14005] 2019/05/23 11:13:23.003457 [DBG] STREAM: [Client:node-stan-pub] Connected (Inbox=_INBOX.ZX9WWJ8WI2EFS1X0HHCBJN)
[14005] 2019/05/23 11:13:23.007871 [TRC] STREAM: [Client:node-stan-pub] Received message from publisher subj=foo guid=ZX9WWJ8WI2EFS1X0HHCC2T
[14005] 2019/05/23 11:13:23.007886 [TRC] STREAM: [Client:node-stan-pub] Received message from publisher subj=foo guid=ZX9WWJ8WI2EFS1X0HHCC6N
[14005] 2019/05/23 11:13:23.007897 [TRC] STREAM: [Client:node-stan-pub] Received message from publisher subj=foo guid=ZX9WWJ8WI2EFS1X0HHCCAH
[14005] 2019/05/23 11:13:23.007919 [TRC] STREAM: [Client:a] Delivering msg to subid=1, subject=foo, seq=2
[14005] 2019/05/23 11:13:23.007942 [TRC] STREAM: [Client:node-stan-pub] Acking Publisher subj=foo guid=ZX9WWJ8WI2EFS1X0HHCC2T
[14005] 2019/05/23 11:13:23.007955 [TRC] STREAM: [Client:node-stan-pub] Acking Publisher subj=foo guid=ZX9WWJ8WI2EFS1X0HHCC6N
# Sending the message to be bad publisher
[14005] 2019/05/23 11:13:23.007965 [TRC] STREAM: [Client:b] Delivering msg to subid=2, subject=foo, seq=3
[14005] 2019/05/23 11:13:23.008026 [TRC] STREAM: [Client:node-stan-pub] Acking Publisher subj=foo guid=ZX9WWJ8WI2EFS1X0HHCCAH
[14005] 2019/05/23 11:13:23.009258 [DBG] STREAM: [Client:node-stan-pub] Closed (Inbox=_INBOX.ZX9WWJ8WI2EFS1X0HHCBJN)
[14005] 2019/05/23 11:13:23.510196 [TRC] STREAM: [Client:a] Processing ack for subid=1, subject=foo, seq=2
[14005] 2019/05/23 11:13:23.510211 [TRC] STREAM: [Client:a] Delivering msg to subid=1, subject=foo, seq=4
[14005] 2019/05/23 11:13:24.012722 [TRC] STREAM: [Client:a] Processing ack for subid=1, subject=foo, seq=4
# Ack times out, the server now re-delivers to the first subscriber
[14005] 2019/05/23 11:13:28.008787 [TRC] STREAM: [Client:a] Redelivering msg to subid=1, subject=foo, seq=3
[14005] 2019/05/23 11:13:28.512242 [TRC] STREAM: [Client:a] Processing ack for subid=1, subject=foo, seq=3
```
{
"name": "test",
"version": "1.0.0",
"description": "",
"main": "pub.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"minimist": "^1.2.0",
"node-nats-streaming": "^0.2.2"
}
}
'use strict';
const STAN = require('node-nats-streaming');
const nuid = require('nuid');
const argv = require('minimist')(process.argv.slice(2));
const cluster_id = argv.c || "test-cluster";
const client_id = argv.i || "node-stan-pub";
const server = argv.s || 'nats://localhost:4222';
let count = argv.n || 10;
count = parseInt(count, 10);
const subject = argv._[0];
if (!subject) {
usage();
}
function usage() {
console.log('pub [-c clusterId] [-i clientId] [-s server] [-n count] <subject>');
process.exit();
}
const sc = STAN.connect(cluster_id, client_id, server);
sc.on('connect', function() {
console.log("STAN connected!");
// publish some messages to the stream - these would come
// from some other process
let sent = 0;
for(let i=0; i < count; i++) {
console.log('publishing', i)
sc.publish(subject, (i+1) + "", function(err) {
if(err) {
console.log(err);
process.exit(1);
}
sent++;
if(sent == count) {
sc.close();
}
});
}
});
sc.on('error', function(reason) {
console.log(reason);
});
#!/usr/bin/env node
/* eslint-disable no-console, no-process-exit */
'use strict';
const STAN = require('node-nats-streaming');
const nuid = require('nuid');
const argv = require('minimist')(process.argv.slice(2));
const cluster_id = argv.c || "test-cluster";
const client_id = argv.i || "node-stan-pub";
const durable_name = argv.d || "worker";
const server = argv.s || 'nats://localhost:4222';
let maxWait = argv.m || "5000";
maxWait = parseInt(maxWait, 10);
const queueGroup = argv.q || "";
const noAck = argv.D || false;
const subject = argv._[0];
if (!subject) {
usage();
}
if(noAck) {
console.log("-D was provided, will drop messages");
}
function usage() {
console.log('sub [-c clusterId] [-i clientId] [-s server] [-q queueGroup] [-d durableName] [-m maxWaitMillis] [-D (drop messages)] <subject>');
process.exit();
}
const sc = STAN.connect(cluster_id, client_id, server);
sc.on('connect', function() {
console.log("STAN connected!");
const opts = sc.subscriptionOptions();
opts.setDurableName(durable_name);
opts.setDeliverAllAvailable();
opts.setManualAckMode(true);
// if message not processed (acknowledged) in 5 seconds, re-deliver it
opts.setAckWait(maxWait);
opts.setMaxInFlight(1);
// create a subscriber that will act as a queue worker, potentially
// dividing up the queue into multiple workers via a queueGoup
const subscription = sc.subscribe(subject, queueGroup, opts);
console.log("subscription inbox:", subscription.inbox);
subscription.on('error', (err) => {
console.log(`subscription for ${subject} raised an error: ${err}`);
});
subscription.on('unsubscribed', () => {
console.log(`unsubscribed to ${subject}`);
});
subscription.on('ready', (sub) => {
console.log(`subscribed to ${subject}`);
});
subscription.on('message', (msg) => {
if(noAck) {
console.log("> dropping", `[${msg.getSequence()}]`, msg.getData());
return;
}
console.log("> processing", `[${msg.getSequence()}]`, msg.getData());
setTimeout(() => {
console.log("< done processing", `[${msg.getSequence()}]`);
msg.ack();
}, 500);
});
});
sc.on('error', function(reason) {
console.log(reason);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment