Skip to content

Instantly share code, notes, and snippets.

@niv

niv/nats.nim Secret

Created November 30, 2017 17:24
Show Gist options
  • Save niv/7576e958ff0dce050a436c28934c504c to your computer and use it in GitHub Desktop.
Save niv/7576e958ff0dce050a436c28934c504c to your computer and use it in GitHub Desktop.
import asyncnet, asyncdispatch, json, strutils, critbits, logging
type
SID* = uint
NatsClient* = ref object
address: string
port: Port
socket: AsyncSocket
lastSID: SID
subs: CritBitTree[SID]
# from INFO:
maxPayload: uint
connectFuture: Future[bool]
## Completes when the socket is up.
## Contains false if we should try again immediately (future replaced).
registered: bool
## True if the initial registration was sent.
registeredFuture: Future[bool]
## Completes when we registered and subscribed to all we want.
## Contains false if we should try again immediately (future replaced).
NatsMsg* = tuple
sid: uint
subject: string
replyTo: string
payload: string
ProtocolError* = object of Exception
const NewLine = "\r\n"
proc newNatsClient*(address: string, port: Port): Future[NatsClient] {.async.}
## Create a new NATS client and connect immediately.
proc subscribe*(self: NatsClient, subject: string): Future[SID] {.async.}
## Add a subscription to a subject.
## Returned future will block until the subscription was delivered.
proc push*(self: NatsClient, subject, payload: string) {.async.}
## Publish a message to NATS.
## Will block until a connection was made and we registered.
proc popMessage*(self: NatsClient): Future[NatsMsg] {.async.}
## Get the next message off the queue.
## Will block until a connection was made and we registered.
# Internal stuff
template expect(cond: bool, msg: string) =
if not cond: raise newException(ProtocolError, msg)
proc send(self: NatsClient, payload: string) {.async.}
## Send payload to server. Will error if connection was lost.
proc read(self: NatsClient, size: int): Future[string] {.async.}
## Read from server. Will error if connection was lost.
proc readLine(self: NatsClient): Future[string] {.async.}
## Read from server. Will error if connection was lost.
proc readCommand(self: NatsClient, expectCommand = ""):
Future[tuple[command: string, payload: string]] {.async.}
## Read from server. Will error if connection was lost.
proc parseInfo(self: NatsClient, payload: string) {.async.}
## Parse INFO line; if we are not registered yet, we will do so here.
proc connect(self: NatsClient) {.async.}
## Re/connect to the remote server. Will block until socket is open.
proc popMessageInternal(self: NatsClient): Future[NatsMsg] {.async.}
## Read a message off; raises exceptions.
proc newNatsClient*(address: string, port: Port): Future[NatsClient] {.async.} =
new(result)
result.address = address
result.port = port
result.connectFuture = newFuture[bool]("nats.connected")
result.registeredFuture = newFuture[bool]("nats.registered")
result.registered = false
await result.connect()
proc subscribe*(self: NatsClient, subject: string): Future[SID] {.async.} =
if self.subs.contains(subject):
return self.subs[subject]
let sid = self.lastSID
self.lastSID += 1
self.subs[subject] = sid
if self.registered:
# We are already registered. Send out immediately.
# otherwise it would be lost
await self.send("SUB " & subject & " " & $sid)
result = sid
proc push*(self: NatsClient, subject, payload: string) {.async.} =
while true:
if await self.registeredFuture: break
await self.send("PUB " & subject & " " & $payload.len & NewLine & payload)
proc popMessage*(self: NatsClient): Future[NatsMsg] {.async.} =
while true:
let popfut = popMessageInternal(self)
yield popfut
if popfut.failed:
await self.connect()
else:
result = popfut.read
break
proc popMessageInternal(self: NatsClient): Future[NatsMsg] {.async.} =
while true:
let (cmd, payload) = await self.readCommand()
case cmd
of "+OK": discard # we turned these off.
of "+ERR": raise newException(ProtocolError, payload)
of "INFO": await self.parseInfo(payload)
of "PING": await self.send("PONG")
# MSG <subject> <sid> [reply-to] <#bytes>\r\n[payload]\r\n
of "MSG":
let parts = payload.split(" ")
expect(parts.len == 3 or parts.len == 4, "MSG incomplete or not understood")
let (subject, sid) = (parts[0], parts[1].parseUInt)
let (replyTo, size) =
if parts.len == 4: (parts[2], parts[3].parseInt)
else: ("", parts[2].parseInt)
let data = await self.read(size + 2)
return (sid: sid, subject: subject, replyTo: replyTo, payload: data[0..<data.len-2]).NatsMsg
else:
expect(false, "Unhandled protocol message: " & cmd & payload.escape)
proc connect(self: NatsClient) {.async.} =
if not isNil(self.socket) and not self.socket.isClosed:
return
self.socket = nil
# Tell all awaiters to retry with the new future.
let oldConnect = self.connectFuture
let oldReg = self.registeredFuture
if not oldConnect.finished: oldConnect.complete(false)
if not oldReg.finished: oldReg.complete(false)
self.connectFuture = newFuture[bool]("nats.connected")
self.registeredFuture = newFuture[bool]("nats.registered")
self.registered = false
var attempt = 0
proc falloff(): int = min(5, attempt) * 200
while true:
debug "nats: attempting to connect to ", self.address, ":", self.port.int
let fut = asyncnet.dial(self.address, self.port, buffered = true)
yield fut
if not fut.failed:
self.socket = fut.read
assert(not self.socket.isClosed)
break
error "nats: connection failed (" & fut.error.msg.split("\n")[0].strip & "), " &
"trying again in " & $falloff() & " ms"
attempt += 1
await sleepAsync(falloff())
self.connectFuture.complete(true)
info "nats: connection up"
proc parseInfo(self: NatsClient, payload: string) {.async.} =
let j = parseJson(payload)
self.maxPayload = uint j["max_payload"].getNum
if not self.registered:
let connect = %*{
"verbose": false,
"pedantic": true
}
waitFor self.send("CONNECT " & $connect)
# Also register all subs we have (so far).
for sub, sid in self.subs:
waitFor self.send("SUB " & sub & " " & $sid)
debug "nats: Initial registration complete."
self.registered = true
self.registeredFuture.complete(true)
proc send(self: NatsClient, payload: string) {.async.} =
while true:
if await self.connectFuture: break
await self.socket.send(payload & NewLine)
debug "-> ", payload.escape
proc read(self: NatsClient, size: int): Future[string] {.async.} =
while true:
if await self.connectFuture: break
result = await self.socket.recv(size)
if result.len != size:
self.socket.close() # force close socket, clean up state. this is something screwy in asyncnet?
raise newException(IOError, "not enough data to read (socket closed?)")
debug "<- [", size, "]: ", result.escape
proc readLine(self: NatsClient): Future[string] {.async.} =
while true:
if await self.connectFuture: break
let p = await self.socket.recvLine()
if p.len == 0:
self.socket.close() # force close socket, clean up state. this is something screwy in asyncnet?
raise newException(IOError, "not enough data to read (socket closed?)")
result = p.strip
debug "<- ", result.escape
proc readCommand(self: NatsClient, expectCommand = ""):
Future[tuple[command: string, payload: string]] {.async.} =
let cmdline = await self.readLine()
let cmdrem = cmdline.strip.split(" ", 1)
if expectCommand != "":
expect(cmdrem[0].tolower == expectCommand.tolower,
"needed to read " & expectCommand & " but got " & cmdrem[0].escape)
result = (command: cmdrem[0], payload: (if cmdrem.len > 1: cmdrem[1] else: ""))
when isMainModule:
addHandler newConsoleLogger()
let c = waitFor newNatsClient("127.0.0.1", Port 4222)
proc test() {.async.} =
let sid = await c.subscribe("test")
while true:
let msg = await c.popMessage()
echo "GOT: ", msg
proc timer() {.async.} =
while true:
await c.push("test", "HELLO!!11")
await sleepAsync(1000)
asyncCheck test()
asyncCheck timer()
runForever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment