-
-
Save niv/7576e958ff0dce050a436c28934c504c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncnet, asyncdispatch, json, strutils, critbits, logging | |
type | |
SID* = uint | |
NatsClient* = ref object | |
address: string | |
port: Port | |
socket: AsyncSocket | |
lastSID: SID | |
subs: CritBitTree[SID] | |
# from INFO: | |
maxPayload: uint | |
connectFuture: Future[bool] | |
## Completes when the socket is up. | |
## Contains false if we should try again immediately (future replaced). | |
registered: bool | |
## True if the initial registration was sent. | |
registeredFuture: Future[bool] | |
## Completes when we registered and subscribed to all we want. | |
## Contains false if we should try again immediately (future replaced). | |
NatsMsg* = tuple | |
sid: uint | |
subject: string | |
replyTo: string | |
payload: string | |
ProtocolError* = object of Exception | |
const NewLine = "\r\n" | |
proc newNatsClient*(address: string, port: Port): Future[NatsClient] {.async.} | |
## Create a new NATS client and connect immediately. | |
proc subscribe*(self: NatsClient, subject: string): Future[SID] {.async.} | |
## Add a subscription to a subject. | |
## Returned future will block until the subscription was delivered. | |
proc push*(self: NatsClient, subject, payload: string) {.async.} | |
## Publish a message to NATS. | |
## Will block until a connection was made and we registered. | |
proc popMessage*(self: NatsClient): Future[NatsMsg] {.async.} | |
## Get the next message off the queue. | |
## Will block until a connection was made and we registered. | |
# Internal stuff | |
template expect(cond: bool, msg: string) = | |
if not cond: raise newException(ProtocolError, msg) | |
proc send(self: NatsClient, payload: string) {.async.} | |
## Send payload to server. Will error if connection was lost. | |
proc read(self: NatsClient, size: int): Future[string] {.async.} | |
## Read from server. Will error if connection was lost. | |
proc readLine(self: NatsClient): Future[string] {.async.} | |
## Read from server. Will error if connection was lost. | |
proc readCommand(self: NatsClient, expectCommand = ""): | |
Future[tuple[command: string, payload: string]] {.async.} | |
## Read from server. Will error if connection was lost. | |
proc parseInfo(self: NatsClient, payload: string) {.async.} | |
## Parse INFO line; if we are not registered yet, we will do so here. | |
proc connect(self: NatsClient) {.async.} | |
## Re/connect to the remote server. Will block until socket is open. | |
proc popMessageInternal(self: NatsClient): Future[NatsMsg] {.async.} | |
## Read a message off; raises exceptions. | |
proc newNatsClient*(address: string, port: Port): Future[NatsClient] {.async.} = | |
new(result) | |
result.address = address | |
result.port = port | |
result.connectFuture = newFuture[bool]("nats.connected") | |
result.registeredFuture = newFuture[bool]("nats.registered") | |
result.registered = false | |
await result.connect() | |
proc subscribe*(self: NatsClient, subject: string): Future[SID] {.async.} = | |
if self.subs.contains(subject): | |
return self.subs[subject] | |
let sid = self.lastSID | |
self.lastSID += 1 | |
self.subs[subject] = sid | |
if self.registered: | |
# We are already registered. Send out immediately. | |
# otherwise it would be lost | |
await self.send("SUB " & subject & " " & $sid) | |
result = sid | |
proc push*(self: NatsClient, subject, payload: string) {.async.} = | |
while true: | |
if await self.registeredFuture: break | |
await self.send("PUB " & subject & " " & $payload.len & NewLine & payload) | |
proc popMessage*(self: NatsClient): Future[NatsMsg] {.async.} = | |
while true: | |
let popfut = popMessageInternal(self) | |
yield popfut | |
if popfut.failed: | |
await self.connect() | |
else: | |
result = popfut.read | |
break | |
proc popMessageInternal(self: NatsClient): Future[NatsMsg] {.async.} = | |
while true: | |
let (cmd, payload) = await self.readCommand() | |
case cmd | |
of "+OK": discard # we turned these off. | |
of "+ERR": raise newException(ProtocolError, payload) | |
of "INFO": await self.parseInfo(payload) | |
of "PING": await self.send("PONG") | |
# MSG <subject> <sid> [reply-to] <#bytes>\r\n[payload]\r\n | |
of "MSG": | |
let parts = payload.split(" ") | |
expect(parts.len == 3 or parts.len == 4, "MSG incomplete or not understood") | |
let (subject, sid) = (parts[0], parts[1].parseUInt) | |
let (replyTo, size) = | |
if parts.len == 4: (parts[2], parts[3].parseInt) | |
else: ("", parts[2].parseInt) | |
let data = await self.read(size + 2) | |
return (sid: sid, subject: subject, replyTo: replyTo, payload: data[0..<data.len-2]).NatsMsg | |
else: | |
expect(false, "Unhandled protocol message: " & cmd & payload.escape) | |
proc connect(self: NatsClient) {.async.} = | |
if not isNil(self.socket) and not self.socket.isClosed: | |
return | |
self.socket = nil | |
# Tell all awaiters to retry with the new future. | |
let oldConnect = self.connectFuture | |
let oldReg = self.registeredFuture | |
if not oldConnect.finished: oldConnect.complete(false) | |
if not oldReg.finished: oldReg.complete(false) | |
self.connectFuture = newFuture[bool]("nats.connected") | |
self.registeredFuture = newFuture[bool]("nats.registered") | |
self.registered = false | |
var attempt = 0 | |
proc falloff(): int = min(5, attempt) * 200 | |
while true: | |
debug "nats: attempting to connect to ", self.address, ":", self.port.int | |
let fut = asyncnet.dial(self.address, self.port, buffered = true) | |
yield fut | |
if not fut.failed: | |
self.socket = fut.read | |
assert(not self.socket.isClosed) | |
break | |
error "nats: connection failed (" & fut.error.msg.split("\n")[0].strip & "), " & | |
"trying again in " & $falloff() & " ms" | |
attempt += 1 | |
await sleepAsync(falloff()) | |
self.connectFuture.complete(true) | |
info "nats: connection up" | |
proc parseInfo(self: NatsClient, payload: string) {.async.} = | |
let j = parseJson(payload) | |
self.maxPayload = uint j["max_payload"].getNum | |
if not self.registered: | |
let connect = %*{ | |
"verbose": false, | |
"pedantic": true | |
} | |
waitFor self.send("CONNECT " & $connect) | |
# Also register all subs we have (so far). | |
for sub, sid in self.subs: | |
waitFor self.send("SUB " & sub & " " & $sid) | |
debug "nats: Initial registration complete." | |
self.registered = true | |
self.registeredFuture.complete(true) | |
proc send(self: NatsClient, payload: string) {.async.} = | |
while true: | |
if await self.connectFuture: break | |
await self.socket.send(payload & NewLine) | |
debug "-> ", payload.escape | |
proc read(self: NatsClient, size: int): Future[string] {.async.} = | |
while true: | |
if await self.connectFuture: break | |
result = await self.socket.recv(size) | |
if result.len != size: | |
self.socket.close() # force close socket, clean up state. this is something screwy in asyncnet? | |
raise newException(IOError, "not enough data to read (socket closed?)") | |
debug "<- [", size, "]: ", result.escape | |
proc readLine(self: NatsClient): Future[string] {.async.} = | |
while true: | |
if await self.connectFuture: break | |
let p = await self.socket.recvLine() | |
if p.len == 0: | |
self.socket.close() # force close socket, clean up state. this is something screwy in asyncnet? | |
raise newException(IOError, "not enough data to read (socket closed?)") | |
result = p.strip | |
debug "<- ", result.escape | |
proc readCommand(self: NatsClient, expectCommand = ""): | |
Future[tuple[command: string, payload: string]] {.async.} = | |
let cmdline = await self.readLine() | |
let cmdrem = cmdline.strip.split(" ", 1) | |
if expectCommand != "": | |
expect(cmdrem[0].tolower == expectCommand.tolower, | |
"needed to read " & expectCommand & " but got " & cmdrem[0].escape) | |
result = (command: cmdrem[0], payload: (if cmdrem.len > 1: cmdrem[1] else: "")) | |
when isMainModule: | |
addHandler newConsoleLogger() | |
let c = waitFor newNatsClient("127.0.0.1", Port 4222) | |
proc test() {.async.} = | |
let sid = await c.subscribe("test") | |
while true: | |
let msg = await c.popMessage() | |
echo "GOT: ", msg | |
proc timer() {.async.} = | |
while true: | |
await c.push("test", "HELLO!!11") | |
await sleepAsync(1000) | |
asyncCheck test() | |
asyncCheck timer() | |
runForever() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment