Skip to content

Instantly share code, notes, and snippets.

@deepj
Created April 15, 2020 18:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save deepj/9d2626b76b3ea617eae3c3d7e7483ac0 to your computer and use it in GitHub Desktop.
Save deepj/9d2626b76b3ea617eae3c3d7e7483ac0 to your computer and use it in GitHub Desktop.
# frozen_string_literal: true
require 'async/io'
require 'async/io/stream'
require 'async/pool/controller'
require 'async/await'
module Async
module NATS
def self.local_endpoint
Async::IO::Endpoint.tcp('localhost', 4222)
end
class Client
include Async::Await
def initialize(endpoint = NATS.local_endpoint, **options)
@endpoint = endpoint
@pool = connect(**options)
end
def ping
call("PING")
end
def call(*arguments)
@pool.acquire do |connection|
connection.write_lines(arguments)
connection.flush
return connection.read_response
end
end
private
def connect(**options)
Async::Pool::Controller.wrap(**options) do
peer = @endpoint.connect
peer.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
stream = IO::Stream.new(peer)
@protocol = Async::IO::Protocol::Line.new(stream, CLRF)
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment