Last active
June 14, 2022 08:03
-
-
Save ingoogni/b29fde37f74216e57abf7ed7812d2a52 to your computer and use it in GitHub Desktop.
Experimenting with pub/sub bus, using asyncstreams in Nim
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import std/[os, asyncdispatch, asyncstreams, strutils, oids, tables, options] | |
type | |
PSBusException* = object of Defect | |
Topic* = string | |
MsgDist* = enum | |
mtPS, #pub/sub | |
mtRR, #public request(/reply) | |
mtR #private reply | |
MsgKind = enum | |
msgString, | |
msgInt, | |
msgFloat, | |
msgBlob, | |
msgNull | |
MsgHeader* = object | |
topic*: Topic | |
replyId*: string | |
msgDist*: MsgDist | |
MsgBody* = object | |
case msgKind*: MsgKind | |
of msgString: | |
strMsg*: string | |
of msgInt: | |
intMsg*: int | |
of msgFloat: | |
floatMsg*: float64 | |
of msgBlob: | |
blobMsg*: seq[byte] | |
of msgNull: | |
discard | |
Message* = object | |
header*: MsgHeader | |
body*: MsgBody | |
Channel* = object | |
stream*: FutureStream[Message] | |
name*: string | |
id*: string | |
subscription*: seq[Topic] | |
Bus* = ref object of RootObj | |
register*: Table[Channel.id, Channel] | |
subscriber*: Table[Topic, seq[Channel.id]] | |
fsBus*: FutureStream[Message] | |
#bus is the input channel/stream for the broker. | |
#all messages to be distributed should be written to this channel. | |
var bus* = Bus(fsBus: newFutureStream[Message](fromProc = "main")) | |
# kind of 'nicked' from tiny-sql https://github.com/GULPF/tiny_sqlite | |
proc toMsgBody*[T: string](msg: T): MsgBody = | |
MsgBody(msgKind: msgString, strMsg: msg) | |
proc fromMsgBody*(msgbody: MsgBody, T: typedesc[string]): string = | |
msgbody.strMsg | |
proc toMsgBody*[T: Ordinal](msg: T): MsgBody = | |
MsgBody(msgKind: msgInt, intMsg: msg) | |
proc fromMsgBody*(msgbody: MsgBody, T: typedesc[Ordinal]): T = | |
msgbody.intMsg.T | |
proc toMsgBody*[T: SomeFloat](msg: T): MsgBody = | |
MsgBody(msgKind: msgFloat, floatMsg: msg) | |
proc fromMsgBody*(msgbody: MsgBody, T: typedesc[SomeFloat]): float64 = | |
msgbody.floatMsg | |
proc toMsgBody*[T: seq[byte]](msg: T): MsgBody = | |
MsgBody(msgKind: msgBlob, blobMsg: msg) | |
proc fromMsgBody*(msgbody: MsgBody, T: typedesc[seq[byte]]): seq[byte] = | |
msgbody.blobMsg | |
proc toMsgBody*[T: Option](msg: T): MsgBody = | |
if msg.isNone: | |
MsgBody(msgKind: msgNull) | |
else: | |
toMsgBody(msg.get) | |
proc fromMsgBody*[T](msgbody: MsgBody, _: typedesc[Option[T]]): Option[T] = | |
if msgbody.msgKind == msgNull: | |
none(T) | |
else: | |
some(msgbody.fromMsgBody(T)) | |
proc toMsgBody*[T: type(nil)](msg: T): MsgBody = | |
MsgBody(msgKind: msgNull) | |
proc toMsgBodys*(msgs: varargs[MsgBody, toMsgBody]): seq[MsgBody] = | |
@msgs | |
proc `$`*(msg: MsgBody): string = | |
result.add "MsgBody[" | |
case msg.msgKind | |
of msgString: result.addQuoted msg.strMsg | |
of msgInt: result.add $msg.intMsg | |
of msgFloat: result.add $msg.floatMsg | |
of msgBlob: result.add "<blob>" | |
of msgNull: result.add "nil" | |
result.add "]" | |
proc `==`*(a, b: MsgBody): bool = | |
if a.msgKind != b.msgKind: | |
false | |
else: | |
case a.msgKind | |
of msgString: a.strMsg == b.strMsg | |
of msgInt: a.intMsg == b.intMsg | |
of msgFloat: a.floatMsg == b.floatMsg | |
of msgBlob: a.blobMsg == b.blobMsg | |
of msgNull: true | |
proc broker*(bus:Bus=bus){.async.}= | |
## Simple broker. It receives messages and sends it to subscribers based on | |
## the topic. If a message is of type "reply" it wil not look up all topic | |
## subscribers but send it directly to the channel.id in the message. | |
## | |
while true: | |
let (hasContent, msg) = await bus.fsBus.read | |
if hasContent: | |
if msg.header.msgDist == mtR: | |
await bus.register[msg.header.replyId].stream.write(msg) | |
else: | |
for id in bus.subscriber[msg.header.topic]: | |
await bus.register[id].stream.write(msg) | |
proc logSubscribe(channel: Channel, bus: Bus=bus)= | |
var logmsg = channel.name & " has id: " & channel.id & "\n" | |
if bus.register[channel.id].subscription != @[]: | |
logmsg &= "subscribed to channels:\n" | |
for subscription in bus.register[channel.id].subscription: | |
logmsg &= " " & subscription & "\n" | |
echo logmsg | |
proc pub*[T](topic:Topic, msg:T, replyId:string = "", msgDist:MsgDist = mtPS, bus:Bus=bus) {.async.} = | |
## Assembles the message header and body and posts the message to the broker. | |
## | |
## topic : The topic string for the message | |
## msg : The content of the mesage | |
## replyId : The channel.id, it is needed for the req/rep pattern with a | |
## direct reply to the sender of the request. | |
## msgDist : Message distribution pattern, pub/sub, req/rep or rep. (the proc | |
## that replies to a rep/req changes may change it to rep to create | |
## a 'private' message) | |
let header = MsgHeader( | |
topic: topic, | |
replyId: replyId, | |
msgDist: msgDist | |
) | |
let message = Message( | |
header: header, | |
body: toMsgBody msg | |
) | |
await bus.fsBus.write message | |
proc pubRep*[T](topic:Topic, msg:T, replyId:string, bus:Bus=bus){.async.}= | |
## publishes a private reply through the broker, to the channel with the id = replyId | |
await pub(topic, msg, replyId, msgDist = mtR) | |
proc pubPop*[T](topic:Topic, msg:T, replyId:string, bus:Bus=bus):Future[Message]{.async.} = | |
## publishes a message/request and waits for and retreives the message of a private response. | |
await pub(topic, msg, replyId, msgDist = mtRR) | |
await sleepAsync(0) | |
let (hasContent, returnMsg) = await bus.register[replyId].stream.read | |
if hasContent: | |
return returnMsg | |
proc subscribeChannel*(name:string, subscription:seq[Topic] = @[], bus:Bus=bus):Future[(bool, Channel)]{.async.} = | |
## Creates a channel, registers it and at the broker. Subscribes to messages. | |
## | |
## name: name of the proc that subscribes | |
## subscription: topics to listen for | |
## | |
let id = $genOid() | |
var channel = Channel( | |
name: name, | |
subscription: subscription, | |
stream: newFutureStream[Message](fromProc = name), | |
id: id | |
) | |
bus.register[id] = channel | |
for subscription in channel.subscription: | |
if subscription in bus.subscriber: | |
bus.subscriber[subscription].add channel.id | |
else: | |
bus.subscriber[subscription] = @[channel.id] | |
await pubRep("", channel.name, channel.id) #send message to self trough broker | |
await sleepAsync(0) | |
let (hasContent, msg) = await channel.stream.read | |
if hasContent and fromMsgBody(msg.body, string) == channel.name: | |
var subscribed = true | |
logSubscribe(channel) | |
return (subscribed, channel) | |
else: | |
raise newException(PSBusException, "Channel" & channel.name & "fails to subscribe.") | |
proc unsubscribeChannel*(channelId:string, bus:Bus=bus)= | |
for topic in bus.register[channelId].subscription: | |
for i, id in bus.subscriber[topic]: | |
if id == channelId: | |
bus.subscriber[topic].delete(i) | |
if bus.subscriber[topic].len == 0: | |
bus.subscriber.del(topic) | |
bus.register.del(channelId) | |
proc closeBus*(bus:Bus=bus)= | |
for channel in bus.register.values: | |
complete channel.stream | |
echo "complete : " & channel.name | |
clear bus.register | |
clear bus.subscriber | |
proc ctrlc*() {.noconv.}= | |
echo "Ctrl+C, shutting down" | |
closeBus() #MEH! | |
setControlCHook(ctrlc) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import std/[asyncdispatch, asyncstreams, random, strutils, times] #logging] | |
import psbus | |
randomize() | |
proc serveTime(bus:Bus=bus){.async, gcsafe.} = | |
## serves current time on request | |
let | |
res = await subscribeChannel("serveTime", @["req.time"]) | |
channel = res[1] | |
var subscribed = res[0] | |
while subscribed: | |
let (hasContent, msg) = await channel.stream.read | |
if hasContent: | |
let message = now().utc.format("ddd, d MMM yyyy HH:mm:ss") | |
if msg.body == toMsgBody("time?"): | |
await "time.req".pubRep(message, msg.header.replyId) | |
elif msg.body == toMsgBody(10): | |
await "time.ten".pub(message) | |
if finished channel.stream: | |
subscribed = false | |
proc tenSeconds(bus:Bus=bus){.async, gcsafe.} = | |
## requests time every 10 seconds, does nothing with it. | |
## Only publishes to the bus but does not subscribe/receive | |
while true: | |
await "req.time".pub(10) | |
await sleepAsync(10000) | |
proc timeRequest(bus:Bus=bus){.async, gcsafe.} = | |
## requests current time and echo's it on reception. It does not listen to any public channel. | |
## It publishes a public request and receives a private reply to its own stream. | |
let | |
res = await subscribeChannel("timeRequest", @[""]) | |
channel = res[1] | |
var subscribed = res[0] | |
while subscribed: | |
let interval = rand(10) * 1000 | |
await sleepAsync(interval) | |
let msg = await "req.time".pubPop("time?", channel.id) | |
echo channel.name & " : " & msg.header.topic & " -> " & $msg.body & "\n" | |
if finished channel.stream: | |
subscribed = false | |
proc echoTime(bus:Bus=bus){.async, gcsafe.} = | |
## receives all messages with a topic string that starts with time. | |
## It does not 'see' he time.req from servTime as it is a private reply | |
let | |
res = await subscribeChannel("echoTime", @["time.req", "time.ten"]) | |
channel = res[1] | |
var subscribed = res[0] | |
while subscribed: | |
let (hasContent, msg) = await channel.stream.read | |
if hasContent: | |
echo channel.name & " : " & msg.header.topic & " -> " & $msg.body & "\n" | |
if finished channel.stream: | |
subscribed = false | |
proc main (bus:Bus=bus){.async, gcsafe.} = | |
asyncCheck broker() | |
asyncCheck echoTime() | |
asyncCheck serveTime() | |
asyncCheck timeRequest() | |
asyncCheck tenSeconds() | |
asyncCheck main() | |
try: #MEH! | |
runForever() | |
except: | |
quit() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import std/[asyncdispatch, asyncstreams, os, tables, random, monotimes, sugar] | |
import psbus, tiny_sqlite | |
randomize() | |
#set up database | |
const | |
queryTable = { | |
"begin": "BEGIN;", | |
"commit": "COMMIT;", | |
"db.write.test": "INSERT INTO test VALUES($1);" | |
}.toTable | |
fn = "test.db" | |
if fileExists fn: | |
removeFile fn | |
let conn = openDatabase(fn, dbReadWrite) | |
conn.exec("PRAGMA journal_mode=WAL;") #set once (on DB creation) | |
conn.exec("CREATE TABLE IF NOT EXISTS test(val integer);") | |
conn.close() | |
proc dbWrite(path: string, queryTable:Table[string,string], bus: Bus=bus){.async.}= | |
let | |
dbWriteConn = openDatabase(fn, dbReadWrite, cacheSize = 0) | |
dbWriteStatement = collect(initTable(len(queryTable))): | |
for key, value in queryTable: {key: dbWriteConn.stmt(value)} | |
res = await subscribeChannel("dbWrite", @["db.write.test"]) | |
channel = res[1] | |
var subscribed = res[0] | |
while subscribed: | |
var queueLen = len channel.stream | |
if queueLen > 0: | |
echo "BEGIN" | |
let s = getMonoTime() | |
exec dbWriteStatement["begin"] | |
while queuelen > 0: | |
let | |
(_, msg) = await channel.stream.read | |
data = toDbValues(fromMsgBody(msg.body, int)) | |
dbWriteStatement["db.write.test"].exec(data) | |
dec queueLen | |
exec dbWriteStatement["commit"] | |
echo (getMonoTime() - s) | |
echo "COMMIT\n" | |
await sleepAsync(0) | |
elif finished channel.stream: | |
echo "finish : " & channel.name | |
for statement in dbWriteStatement.values: | |
finalize statement | |
echo "close : dbWriteConn" | |
close dbWriteConn | |
subscribed = false | |
proc poster(bus:Bus=bus){.async.} = | |
var n = 0 | |
while n < 10 : | |
echo "n " & $n | |
let nInserts:int = rand(100000) | |
echo "number of inserts : " & $nInserts | |
var i = nInserts | |
echo "START" | |
let s = getMonoTime() | |
while i > 0: | |
await "db.write.test".pub(i) | |
dec i | |
echo (getMonoTime() - s) | |
echo "STOP" | |
await sleepAsync(0) | |
inc n | |
closeBus() | |
proc main (bus:Bus=bus){.async.} = | |
asyncCheck broker() | |
asyncCheck dbWrite(fn, queryTable) | |
asyncCheck poster() | |
asyncCheck main() | |
try: #MEH! | |
runForever() | |
except: | |
quit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment