Skip to content

Instantly share code, notes, and snippets.

@mdiin
Created May 22, 2024 06:45
Show Gist options
  • Save mdiin/34cbeb29700911b1c22270626fde1d01 to your computer and use it in GitHub Desktop.
Save mdiin/34cbeb29700911b1c22270626fde1d01 to your computer and use it in GitHub Desktop.
NATS durable consumer debugging
package io.o.s.n
import io.nats.client.JetStreamReader
import io.nats.client.JetStreamSubscription
import io.nats.client.Message
import org.slf4j.LoggerFactory
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Component
@Component
class ContinuousSubscriptionPuller {
private val log = LoggerFactory.getLogger(this::class.java)
private fun readNextMessage(rdr: JetStreamReader): Message? =
try {
rdr.nextMessage(0)
} catch (ex: IllegalStateException) {
log.warn(
"Connection was closed, subscriber is shutting down. See https://github.com/nats-io/nats.java/issues/602#issuecomment-1409448389"
)
null
}
@Async
fun run(sub: JetStreamSubscription, block: (Message) -> Unit) {
log.info("SUB: Beginning {} {}", sub.subject, sub.queueName)
val rdr = sub.reader(10, 9)
var msg = readNextMessage(rdr)
while (msg != null) {
log.info("SUB: Message received")
if (msg.isJetStream) {
log.info("JS MSG")
} else if (msg.isStatusMessage) {
log.info("STATUS: {}", msg.status)
} else {
log.info("???: {}", String(msg.data))
}
block(msg)
msg.ack()
msg = readNextMessage(rdr)
}
}
}
package io.o.s.n.s
import io.nats.client.Message
import io.nats.client.PullSubscribeOptions
import io.o.s.n.ContinuousSubscriptionPuller
import java.nio.charset.StandardCharsets
import java.time.Duration
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.DisposableBean
import org.springframework.stereotype.Component
@Component
class SurveyPublishedToRespondentSubscriber(
private val subscriptionPuller: ContinuousSubscriptionPuller,
) : DisposableBean {
private val connection = Nats.connect(...)
private val pullOptions = PullSubscribeOptions.builder().durable("surveys").build()
private val subject = "survey.*.published.*"
private val subscription = connection.jetStream().subscribe(subject, pullOptions)
init {
connection.flush(Duration.ofSeconds(1))
subscriptionPuller.run(subscription) { msg -> handleMessage(msg) }
}
private fun handleMessage(msg: Message) {
log.debug("Message received: {}", msg)
// ... work ending in msg.ack() or msg.nak()
}
override fun destroy() {
connection.drain(Duration.ofMinutes(5))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment