Last active
October 16, 2023 09:56
-
-
Save ingoogni/d040454fbe0a339e01a808a25af3b5bc to your computer and use it in GitHub Desktop.
Experimenting with Server Sent Events in Nim
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncdispatch, asynchttpserver, asyncnet, nativesockets, net | |
import logging, oids, strformat, strutils, tables, times | |
#import sse_db | |
type SSEClient = object | |
fs: FutureStream[string] | |
clientClosed: int | |
req: Request | |
var | |
sseClients {.global.}: Table[int, SSEClient] | |
fsInput {.global.} = newFutureStream[string](fromProc = "main") | |
consoleLog = newConsoleLogger() | |
addHandler(consoleLog) | |
proc logConnect(req: Request, resp:int, lenResp:int) {.async.} = | |
var msg = $int(req.client.getFd) & " " | |
let (peerIp, port) = req.client.getPeerAddr() | |
msg &= peerIp & ":" & $port & " - " | |
let username = if req.url.username == "": "-" else:req.url.username | |
msg &= username | |
msg &= " [" & now().utc.format("d/MMM/yyyy:HH:mm:ss") & "] " | |
msg &= "\"" & $req.reqMethod & " " | |
msg &= req.url.path & " " | |
msg &= req.protocol.orig & "\" " | |
msg &= $resp & " " | |
msg &= $lenResp | |
info msg | |
proc msgSSEDisconnect(req: Request):string = | |
var msg = $int(req.client.getFd) & " " | |
let (peerIp, port) = req.client.getPeerAddr() | |
msg &= peerIp & ":" & $port & " - " | |
let username = if req.url.username == "": "-" else:req.url.username | |
msg &= username | |
msg &= " [" & now().utc.format("d/MMM/yyyy:HH:mm:ss") & "] " | |
msg &= "\"SSE CLOSE " | |
msg &= req.url.path & " " | |
msg &= req.protocol.orig & "\" " | |
result = msg | |
proc sseHeaders(): HttpHeaders {.inline.} = | |
let now = now().utc.format("ddd, d MMM yyyy HH:mm:ss") | |
result = { | |
"Date": &"{now} GMT", | |
"Access-Control-Allow-Origin": "*", | |
"Connection": "keep-alive", | |
"Content-type": "text/event-stream; charset=utf-8", | |
"Cache-Control": "no-cache", | |
"X-Accel-Buffering": "no", | |
"Content-Length": "" | |
}.newHttpHeaders() | |
proc error404(req: Request) {.async, gcsafe.}= | |
## eek | |
let path = req.url.path | |
let msg = "path {path} not found".fmt | |
await logConnect(req, 404, len(msg)) | |
await req.respond(Http404, msg) | |
proc initSSEClient(req:Request){.async.}= | |
let fd = int(req.client.getFd) | |
sseClients[fd] = SSEClient( | |
fs : newFutureStream[string](fromProc = "main"), | |
clientClosed : 0, | |
req: req | |
) | |
await sseClients[fd].fs.write("retry: 5000\n\n") | |
await sseClients[fd].fs.write("event: connect\ndata: connected {fd}\n\n".fmt) | |
proc sseTime(interval: float = 10){.async.} = | |
## a date-time dataprovider, for test purposes | |
## fs: FutureStream to write the date-time to. | |
## interval: in seconds, default = 10s. | |
while true: | |
let now = now().utc.format("ddd, d MMM yyyy HH:mm:ss") | |
await fsInput.write("event: time\ndata: {now}".fmt) | |
await sleepAsync(interval*1000) | |
proc sseHeartbeat(interval: float = 15){.async.} = | |
## A server sent event heartbeat is sent so that the connection does not stay | |
## idle for too long. The heartbeat is just a commentline without content. | |
## fs: FutureStream to write the heartbeat to. | |
## interval: in seconds, default = 15s. | |
while true: | |
await sleepAsync(interval*1000) | |
await fsInput.write(":\n\n") | |
proc sseFanOut() {.async, gcsafe.}= #unsafe! | |
## Distributes incoming data from the stream over the clients. It is the | |
## callback for fsInput asyncstream. | |
## As every message passes here, this is where an message-id is added. | |
## fs: FutureStream to write the heartbeat to. | |
var | |
data: string | |
while true: | |
let (hasContent, payload) = await fsInput.read | |
if hasContent: | |
if not startsWith(payload, ":"): | |
let id = $genOid() | |
data = "{payload}\nid: {id}\n\n".fmt | |
#put(id, data) | |
else: | |
data = payload | |
if len(sseClients) > 0: | |
for k in sseClients.keys: | |
await sseClients[k].fs.write(data) | |
else: | |
poll() | |
proc input(req: Request) {.async, gcsafe.} = | |
# HTTP POST here in SSE format(without id). Data will be distributed to listening clients | |
# One message per POST. | |
# Post as the event ID is added later on. | |
await fsInput.write(req.body) | |
await req.respond(Http200, "") | |
proc pruneClients(fd: AsyncFD):bool {.gcsafe.}= | |
let k = int(fd) | |
if sseClients.hasKey(k): | |
let msg = msgSSEDisconnect(sseClients[k].req) | |
sseClients.del(k) | |
fd.closeSocket() | |
info(msg) | |
return true | |
proc sse(req: Request) {.async, gcsafe.} = | |
## sse emitter. | |
let | |
fd = int(req.client.getFd) | |
id = req.headers.getOrDefault("last-event-id", default = @[""].HttpHeaderValues) | |
var | |
fds: seq[SocketHandle] | |
hasContent: bool | |
data : string | |
await req.respond(Http200, "", sseHeaders()) | |
await initSSEClient(req) | |
await logConnect(req, 200, 0) | |
#SSE opens a single unidirectional channel, so reads fail. | |
#If reads don't fail, the client is/has disconnected. | |
addRead(req.client.getFd.AsyncFD, pruneClients) | |
#if id != "": | |
# await req.client.send(getdownfrom(id)) | |
while not req.client.isClosed: | |
(hasContent, data) = await sseClients[fd].fs.read | |
if hasContent: | |
fds = @[req.client.getFd] | |
await req.client.send("{data}".fmt) | |
#echo fd | |
proc dispatch(req: Request){.async, gcsafe.} = | |
let path = req.url.path | |
case path: | |
#of "/index/": | |
# await index(req) | |
of "/sse/": | |
await sse(req) | |
of "/input/": | |
await input(req) | |
else: | |
await error404(req) | |
proc main {.async.} = | |
#two "internal" dataproviders | |
asyncCheck sseHeartbeat(5) | |
asyncCheck sseTime(1) | |
asyncCheck sseFanOut() | |
var server = newAsyncHttpServer(reuseAddr = false, reusePort = false) | |
server.listen(Port(8088), "192.168.1.4") | |
echo "asynchttpserver listening at 192.168.1.4:8088" | |
while true: | |
if server.shouldAcceptRequest(): | |
await server.acceptRequest(dispatch) | |
else: | |
poll() | |
asyncCheck main() | |
runForever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment