Skip to content

Instantly share code, notes, and snippets.

@ingoogni
Last active October 16, 2023 09:56
Show Gist options
  • Save ingoogni/d040454fbe0a339e01a808a25af3b5bc to your computer and use it in GitHub Desktop.
Save ingoogni/d040454fbe0a339e01a808a25af3b5bc to your computer and use it in GitHub Desktop.
Experimenting with Server Sent Events in Nim
import asyncdispatch, asynchttpserver, asyncnet, nativesockets, net
import logging, oids, strformat, strutils, tables, times
#import sse_db
type SSEClient = object
fs: FutureStream[string]
clientClosed: int
req: Request
var
sseClients {.global.}: Table[int, SSEClient]
fsInput {.global.} = newFutureStream[string](fromProc = "main")
consoleLog = newConsoleLogger()
addHandler(consoleLog)
proc logConnect(req: Request, resp:int, lenResp:int) {.async.} =
var msg = $int(req.client.getFd) & " "
let (peerIp, port) = req.client.getPeerAddr()
msg &= peerIp & ":" & $port & " - "
let username = if req.url.username == "": "-" else:req.url.username
msg &= username
msg &= " [" & now().utc.format("d/MMM/yyyy:HH:mm:ss") & "] "
msg &= "\"" & $req.reqMethod & " "
msg &= req.url.path & " "
msg &= req.protocol.orig & "\" "
msg &= $resp & " "
msg &= $lenResp
info msg
proc msgSSEDisconnect(req: Request):string =
var msg = $int(req.client.getFd) & " "
let (peerIp, port) = req.client.getPeerAddr()
msg &= peerIp & ":" & $port & " - "
let username = if req.url.username == "": "-" else:req.url.username
msg &= username
msg &= " [" & now().utc.format("d/MMM/yyyy:HH:mm:ss") & "] "
msg &= "\"SSE CLOSE "
msg &= req.url.path & " "
msg &= req.protocol.orig & "\" "
result = msg
proc sseHeaders(): HttpHeaders {.inline.} =
let now = now().utc.format("ddd, d MMM yyyy HH:mm:ss")
result = {
"Date": &"{now} GMT",
"Access-Control-Allow-Origin": "*",
"Connection": "keep-alive",
"Content-type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Content-Length": ""
}.newHttpHeaders()
proc error404(req: Request) {.async, gcsafe.}=
## eek
let path = req.url.path
let msg = "path {path} not found".fmt
await logConnect(req, 404, len(msg))
await req.respond(Http404, msg)
proc initSSEClient(req:Request){.async.}=
let fd = int(req.client.getFd)
sseClients[fd] = SSEClient(
fs : newFutureStream[string](fromProc = "main"),
clientClosed : 0,
req: req
)
await sseClients[fd].fs.write("retry: 5000\n\n")
await sseClients[fd].fs.write("event: connect\ndata: connected {fd}\n\n".fmt)
proc sseTime(interval: float = 10){.async.} =
## a date-time dataprovider, for test purposes
## fs: FutureStream to write the date-time to.
## interval: in seconds, default = 10s.
while true:
let now = now().utc.format("ddd, d MMM yyyy HH:mm:ss")
await fsInput.write("event: time\ndata: {now}".fmt)
await sleepAsync(interval*1000)
proc sseHeartbeat(interval: float = 15){.async.} =
## A server sent event heartbeat is sent so that the connection does not stay
## idle for too long. The heartbeat is just a commentline without content.
## fs: FutureStream to write the heartbeat to.
## interval: in seconds, default = 15s.
while true:
await sleepAsync(interval*1000)
await fsInput.write(":\n\n")
proc sseFanOut() {.async, gcsafe.}= #unsafe!
## Distributes incoming data from the stream over the clients. It is the
## callback for fsInput asyncstream.
## As every message passes here, this is where an message-id is added.
## fs: FutureStream to write the heartbeat to.
var
data: string
while true:
let (hasContent, payload) = await fsInput.read
if hasContent:
if not startsWith(payload, ":"):
let id = $genOid()
data = "{payload}\nid: {id}\n\n".fmt
#put(id, data)
else:
data = payload
if len(sseClients) > 0:
for k in sseClients.keys:
await sseClients[k].fs.write(data)
else:
poll()
proc input(req: Request) {.async, gcsafe.} =
# HTTP POST here in SSE format(without id). Data will be distributed to listening clients
# One message per POST.
# Post as the event ID is added later on.
await fsInput.write(req.body)
await req.respond(Http200, "")
proc pruneClients(fd: AsyncFD):bool {.gcsafe.}=
let k = int(fd)
if sseClients.hasKey(k):
let msg = msgSSEDisconnect(sseClients[k].req)
sseClients.del(k)
fd.closeSocket()
info(msg)
return true
proc sse(req: Request) {.async, gcsafe.} =
## sse emitter.
let
fd = int(req.client.getFd)
id = req.headers.getOrDefault("last-event-id", default = @[""].HttpHeaderValues)
var
fds: seq[SocketHandle]
hasContent: bool
data : string
await req.respond(Http200, "", sseHeaders())
await initSSEClient(req)
await logConnect(req, 200, 0)
#SSE opens a single unidirectional channel, so reads fail.
#If reads don't fail, the client is/has disconnected.
addRead(req.client.getFd.AsyncFD, pruneClients)
#if id != "":
# await req.client.send(getdownfrom(id))
while not req.client.isClosed:
(hasContent, data) = await sseClients[fd].fs.read
if hasContent:
fds = @[req.client.getFd]
await req.client.send("{data}".fmt)
#echo fd
proc dispatch(req: Request){.async, gcsafe.} =
let path = req.url.path
case path:
#of "/index/":
# await index(req)
of "/sse/":
await sse(req)
of "/input/":
await input(req)
else:
await error404(req)
proc main {.async.} =
#two "internal" dataproviders
asyncCheck sseHeartbeat(5)
asyncCheck sseTime(1)
asyncCheck sseFanOut()
var server = newAsyncHttpServer(reuseAddr = false, reusePort = false)
server.listen(Port(8088), "192.168.1.4")
echo "asynchttpserver listening at 192.168.1.4:8088"
while true:
if server.shouldAcceptRequest():
await server.acceptRequest(dispatch)
else:
poll()
asyncCheck main()
runForever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment