Skip to content

Instantly share code, notes, and snippets.

@wallyqs
Last active October 27, 2021 22:42
Show Gist options
  • Save wallyqs/c259c7c77c74880c487ff63a3268d12f to your computer and use it in GitHub Desktop.
Save wallyqs/c259c7c77c74880c487ff63a3268d12f to your computer and use it in GitHub Desktop.
NATS::Client and NATS::JetStream (nats-pure.rb v2.0.0)
require 'nats'
# Connect to server that has JetStream support, e.g.
#
# nats-server -js
#
# To include in Gemfile:
#
# source "https://rubygems.org"
#
# gem 'nats-pure', '2.0.0.pre.alpha'
#
nc = NATS.connect("localhost")
# Create JetStream context.
js = nc.jetstream
# Create Stream that will persist messages from foo subject.
begin
info = js.add_stream(name: "sample-stream", subjects: ["foo"])
rescue => e
puts "Error: #{e}"
end
# Send 10 messages and wait to get an ack that they have been persisted.
10.times do |i|
ack = js.publish("foo", "hello world: #{i}", timeout: 2)
puts "Published: #{ack.seq}"
end
# Create pull based consumer.
psub = js.pull_subscribe("foo", "psub")
# Fetch 3 messages from consumer.
msgs = psub.fetch(3)
msgs.each do |msg|
puts " ACK: Stream Seq: #{msg.metadata.sequence.stream} || Consumer Seq: #{msg.metadata.sequence.consumer}"
msg.ack
end
# Get latest consumer info.
cinfo = psub.consumer_info
puts "Consumer '#{cinfo.name}' Pending Messages: #{cinfo.num_pending}"
# Subscribe is now dispatched a NATS::Msg that may include headers
nc.subscribe("hello") do |msg|
puts "Received on '#{msg.subject}': Data: #{msg.data} || Header: #{msg.header}"
msg.respond("OK") if msg.reply
end
sub = nc.subscribe("hello")
# Can use publish to send a message with headers.
nc.publish("hello", header: { 'quux': 'quuz'})
nc.publish_msg(NATS::Msg.new(subject: "hello", data: "world", header:{ 'foo': 'bar'}))
# Request also supports publishing with headers.
msg = nc.request("hello", header: { 'a': 'b'})
puts "Response #{msg.data}"
msg = nc.request_msg(NATS::Msg.new(subject: "hello", data: "world!!!", header:{ 'foo': 'bar'}))
puts "Response #{msg.data}"
# Can also use iterator style to consume messages now.
msg = sub.next_msg
puts "Received on '#{msg.subject}': Data: #{msg.data} || Header: #{msg.header}"
msg = sub.next_msg(timeout: 2)
puts "Received on '#{msg.subject}': Data: #{msg.data} || Header: #{msg.header}"
begin
sub.next_msg(timeout: 1)
rescue NATS::Timeout => e
# puts "Timeout since no new messages yet: #{e}"
end
nc.flush
nc.close
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment