Skip to content

Instantly share code, notes, and snippets.

@ingoogni
Last active June 14, 2022 08:03
Show Gist options
  • Save ingoogni/b29fde37f74216e57abf7ed7812d2a52 to your computer and use it in GitHub Desktop.
Save ingoogni/b29fde37f74216e57abf7ed7812d2a52 to your computer and use it in GitHub Desktop.
Experimenting with pub/sub bus, using asyncstreams in Nim
import std/[os, asyncdispatch, asyncstreams, strutils, oids, tables, options]
type
PSBusException* = object of Defect
Topic* = string
MsgDist* = enum
mtPS, #pub/sub
mtRR, #public request(/reply)
mtR #private reply
MsgKind = enum
msgString,
msgInt,
msgFloat,
msgBlob,
msgNull
MsgHeader* = object
topic*: Topic
replyId*: string
msgDist*: MsgDist
MsgBody* = object
case msgKind*: MsgKind
of msgString:
strMsg*: string
of msgInt:
intMsg*: int
of msgFloat:
floatMsg*: float64
of msgBlob:
blobMsg*: seq[byte]
of msgNull:
discard
Message* = object
header*: MsgHeader
body*: MsgBody
Channel* = object
stream*: FutureStream[Message]
name*: string
id*: string
subscription*: seq[Topic]
Bus* = ref object of RootObj
register*: Table[Channel.id, Channel]
subscriber*: Table[Topic, seq[Channel.id]]
fsBus*: FutureStream[Message]
#bus is the input channel/stream for the broker.
#all messages to be distributed should be written to this channel.
var bus* = Bus(fsBus: newFutureStream[Message](fromProc = "main"))
# kind of 'nicked' from tiny-sql https://github.com/GULPF/tiny_sqlite
proc toMsgBody*[T: string](msg: T): MsgBody =
MsgBody(msgKind: msgString, strMsg: msg)
proc fromMsgBody*(msgbody: MsgBody, T: typedesc[string]): string =
msgbody.strMsg
proc toMsgBody*[T: Ordinal](msg: T): MsgBody =
MsgBody(msgKind: msgInt, intMsg: msg)
proc fromMsgBody*(msgbody: MsgBody, T: typedesc[Ordinal]): T =
msgbody.intMsg.T
proc toMsgBody*[T: SomeFloat](msg: T): MsgBody =
MsgBody(msgKind: msgFloat, floatMsg: msg)
proc fromMsgBody*(msgbody: MsgBody, T: typedesc[SomeFloat]): float64 =
msgbody.floatMsg
proc toMsgBody*[T: seq[byte]](msg: T): MsgBody =
MsgBody(msgKind: msgBlob, blobMsg: msg)
proc fromMsgBody*(msgbody: MsgBody, T: typedesc[seq[byte]]): seq[byte] =
msgbody.blobMsg
proc toMsgBody*[T: Option](msg: T): MsgBody =
if msg.isNone:
MsgBody(msgKind: msgNull)
else:
toMsgBody(msg.get)
proc fromMsgBody*[T](msgbody: MsgBody, _: typedesc[Option[T]]): Option[T] =
if msgbody.msgKind == msgNull:
none(T)
else:
some(msgbody.fromMsgBody(T))
proc toMsgBody*[T: type(nil)](msg: T): MsgBody =
MsgBody(msgKind: msgNull)
proc toMsgBodys*(msgs: varargs[MsgBody, toMsgBody]): seq[MsgBody] =
@msgs
proc `$`*(msg: MsgBody): string =
result.add "MsgBody["
case msg.msgKind
of msgString: result.addQuoted msg.strMsg
of msgInt: result.add $msg.intMsg
of msgFloat: result.add $msg.floatMsg
of msgBlob: result.add "<blob>"
of msgNull: result.add "nil"
result.add "]"
proc `==`*(a, b: MsgBody): bool =
if a.msgKind != b.msgKind:
false
else:
case a.msgKind
of msgString: a.strMsg == b.strMsg
of msgInt: a.intMsg == b.intMsg
of msgFloat: a.floatMsg == b.floatMsg
of msgBlob: a.blobMsg == b.blobMsg
of msgNull: true
proc broker*(bus:Bus=bus){.async.}=
## Simple broker. It receives messages and sends it to subscribers based on
## the topic. If a message is of type "reply" it wil not look up all topic
## subscribers but send it directly to the channel.id in the message.
##
while true:
let (hasContent, msg) = await bus.fsBus.read
if hasContent:
if msg.header.msgDist == mtR:
await bus.register[msg.header.replyId].stream.write(msg)
else:
for id in bus.subscriber[msg.header.topic]:
await bus.register[id].stream.write(msg)
proc logSubscribe(channel: Channel, bus: Bus=bus)=
var logmsg = channel.name & " has id: " & channel.id & "\n"
if bus.register[channel.id].subscription != @[]:
logmsg &= "subscribed to channels:\n"
for subscription in bus.register[channel.id].subscription:
logmsg &= " " & subscription & "\n"
echo logmsg
proc pub*[T](topic:Topic, msg:T, replyId:string = "", msgDist:MsgDist = mtPS, bus:Bus=bus) {.async.} =
## Assembles the message header and body and posts the message to the broker.
##
## topic : The topic string for the message
## msg : The content of the mesage
## replyId : The channel.id, it is needed for the req/rep pattern with a
## direct reply to the sender of the request.
## msgDist : Message distribution pattern, pub/sub, req/rep or rep. (the proc
## that replies to a rep/req changes may change it to rep to create
## a 'private' message)
let header = MsgHeader(
topic: topic,
replyId: replyId,
msgDist: msgDist
)
let message = Message(
header: header,
body: toMsgBody msg
)
await bus.fsBus.write message
proc pubRep*[T](topic:Topic, msg:T, replyId:string, bus:Bus=bus){.async.}=
## publishes a private reply through the broker, to the channel with the id = replyId
await pub(topic, msg, replyId, msgDist = mtR)
proc pubPop*[T](topic:Topic, msg:T, replyId:string, bus:Bus=bus):Future[Message]{.async.} =
## publishes a message/request and waits for and retreives the message of a private response.
await pub(topic, msg, replyId, msgDist = mtRR)
await sleepAsync(0)
let (hasContent, returnMsg) = await bus.register[replyId].stream.read
if hasContent:
return returnMsg
proc subscribeChannel*(name:string, subscription:seq[Topic] = @[], bus:Bus=bus):Future[(bool, Channel)]{.async.} =
## Creates a channel, registers it and at the broker. Subscribes to messages.
##
## name: name of the proc that subscribes
## subscription: topics to listen for
##
let id = $genOid()
var channel = Channel(
name: name,
subscription: subscription,
stream: newFutureStream[Message](fromProc = name),
id: id
)
bus.register[id] = channel
for subscription in channel.subscription:
if subscription in bus.subscriber:
bus.subscriber[subscription].add channel.id
else:
bus.subscriber[subscription] = @[channel.id]
await pubRep("", channel.name, channel.id) #send message to self trough broker
await sleepAsync(0)
let (hasContent, msg) = await channel.stream.read
if hasContent and fromMsgBody(msg.body, string) == channel.name:
var subscribed = true
logSubscribe(channel)
return (subscribed, channel)
else:
raise newException(PSBusException, "Channel" & channel.name & "fails to subscribe.")
proc unsubscribeChannel*(channelId:string, bus:Bus=bus)=
for topic in bus.register[channelId].subscription:
for i, id in bus.subscriber[topic]:
if id == channelId:
bus.subscriber[topic].delete(i)
if bus.subscriber[topic].len == 0:
bus.subscriber.del(topic)
bus.register.del(channelId)
proc closeBus*(bus:Bus=bus)=
for channel in bus.register.values:
complete channel.stream
echo "complete : " & channel.name
clear bus.register
clear bus.subscriber
proc ctrlc*() {.noconv.}=
echo "Ctrl+C, shutting down"
closeBus() #MEH!
setControlCHook(ctrlc)
import std/[asyncdispatch, asyncstreams, random, strutils, times] #logging]
import psbus
randomize()
proc serveTime(bus:Bus=bus){.async, gcsafe.} =
## serves current time on request
let
res = await subscribeChannel("serveTime", @["req.time"])
channel = res[1]
var subscribed = res[0]
while subscribed:
let (hasContent, msg) = await channel.stream.read
if hasContent:
let message = now().utc.format("ddd, d MMM yyyy HH:mm:ss")
if msg.body == toMsgBody("time?"):
await "time.req".pubRep(message, msg.header.replyId)
elif msg.body == toMsgBody(10):
await "time.ten".pub(message)
if finished channel.stream:
subscribed = false
proc tenSeconds(bus:Bus=bus){.async, gcsafe.} =
## requests time every 10 seconds, does nothing with it.
## Only publishes to the bus but does not subscribe/receive
while true:
await "req.time".pub(10)
await sleepAsync(10000)
proc timeRequest(bus:Bus=bus){.async, gcsafe.} =
## requests current time and echo's it on reception. It does not listen to any public channel.
## It publishes a public request and receives a private reply to its own stream.
let
res = await subscribeChannel("timeRequest", @[""])
channel = res[1]
var subscribed = res[0]
while subscribed:
let interval = rand(10) * 1000
await sleepAsync(interval)
let msg = await "req.time".pubPop("time?", channel.id)
echo channel.name & " : " & msg.header.topic & " -> " & $msg.body & "\n"
if finished channel.stream:
subscribed = false
proc echoTime(bus:Bus=bus){.async, gcsafe.} =
## receives all messages with a topic string that starts with time.
## It does not 'see' he time.req from servTime as it is a private reply
let
res = await subscribeChannel("echoTime", @["time.req", "time.ten"])
channel = res[1]
var subscribed = res[0]
while subscribed:
let (hasContent, msg) = await channel.stream.read
if hasContent:
echo channel.name & " : " & msg.header.topic & " -> " & $msg.body & "\n"
if finished channel.stream:
subscribed = false
proc main (bus:Bus=bus){.async, gcsafe.} =
asyncCheck broker()
asyncCheck echoTime()
asyncCheck serveTime()
asyncCheck timeRequest()
asyncCheck tenSeconds()
asyncCheck main()
try: #MEH!
runForever()
except:
quit()
import std/[asyncdispatch, asyncstreams, os, tables, random, monotimes, sugar]
import psbus, tiny_sqlite
randomize()
#set up database
const
queryTable = {
"begin": "BEGIN;",
"commit": "COMMIT;",
"db.write.test": "INSERT INTO test VALUES($1);"
}.toTable
fn = "test.db"
if fileExists fn:
removeFile fn
let conn = openDatabase(fn, dbReadWrite)
conn.exec("PRAGMA journal_mode=WAL;") #set once (on DB creation)
conn.exec("CREATE TABLE IF NOT EXISTS test(val integer);")
conn.close()
proc dbWrite(path: string, queryTable:Table[string,string], bus: Bus=bus){.async.}=
let
dbWriteConn = openDatabase(fn, dbReadWrite, cacheSize = 0)
dbWriteStatement = collect(initTable(len(queryTable))):
for key, value in queryTable: {key: dbWriteConn.stmt(value)}
res = await subscribeChannel("dbWrite", @["db.write.test"])
channel = res[1]
var subscribed = res[0]
while subscribed:
var queueLen = len channel.stream
if queueLen > 0:
echo "BEGIN"
let s = getMonoTime()
exec dbWriteStatement["begin"]
while queuelen > 0:
let
(_, msg) = await channel.stream.read
data = toDbValues(fromMsgBody(msg.body, int))
dbWriteStatement["db.write.test"].exec(data)
dec queueLen
exec dbWriteStatement["commit"]
echo (getMonoTime() - s)
echo "COMMIT\n"
await sleepAsync(0)
elif finished channel.stream:
echo "finish : " & channel.name
for statement in dbWriteStatement.values:
finalize statement
echo "close : dbWriteConn"
close dbWriteConn
subscribed = false
proc poster(bus:Bus=bus){.async.} =
var n = 0
while n < 10 :
echo "n " & $n
let nInserts:int = rand(100000)
echo "number of inserts : " & $nInserts
var i = nInserts
echo "START"
let s = getMonoTime()
while i > 0:
await "db.write.test".pub(i)
dec i
echo (getMonoTime() - s)
echo "STOP"
await sleepAsync(0)
inc n
closeBus()
proc main (bus:Bus=bus){.async.} =
asyncCheck broker()
asyncCheck dbWrite(fn, queryTable)
asyncCheck poster()
asyncCheck main()
try: #MEH!
runForever()
except:
quit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment