Skip to content

Instantly share code, notes, and snippets.

@JUSTINMKAUFMAN
Created July 1, 2019 17:12
Show Gist options
  • Save JUSTINMKAUFMAN/032e3d004350d9a4dae2cd767085753a to your computer and use it in GitHub Desktop.
Save JUSTINMKAUFMAN/032e3d004350d9a4dae2cd767085753a to your computer and use it in GitHub Desktop.
Swift SSE EventSource Server for Vapor
import Foundation
import NIO
import Vapor
/**
SSE EventSource server implementation for Swift Vapor 3
Usage:
```
// Create a route for all incoming SSE connection
// requests and save a reference to the new stream
router.get("sse", String.parameter) { request -> SSEStream<SSEEvent> in
let stream = SSEStream<SSEEvent>(on: request)
self.streams.add(stream)
return stream
}
// Schedule an event to be published on the stream
let event = SSEEvent(type: .put, id: eventId, data: eventData)
stream.schedule(event)
```
*/
enum SSEEventType: String, Content {
case put, ping, patch, delete
}
public struct SSEEvent: Content {
let type: SSEEventType
let id: String?
let data: String?
init(type: SSEEventType,
id: String? = nil,
data: String? = nil) {
self.type = type
self.id = id
self.data = data
}
}
class SSEStream<ResponseType>: ResponseEncodable where ResponseType: Encodable {
private let chunkedStream: HTTPChunkedStream
private let allocator: ByteBufferAllocator
private let response: Response
private let request: Request
init(on request: Request) {
self.request = request
response = Response(using: request)
chunkedStream = HTTPChunkedStream(on: request)
response.http.headers.add(name: .contentType, value: "text/event-stream")
response.http.headers.add(name: .transferEncoding, value: "chunked")
response.http.headers.add(name: .cacheControl, value: "no-cache")
response.http.headers.add(name: .connection, value: "keep-alive")
response.http.headers.add(name: .accessControlAllowOrigin, value: "*")
response.http.status = .ok
response.http.body = chunkedStream.convertToHTTPBody()
allocator = ByteBufferAllocator()
}
func encode(for req: Request) throws -> EventLoopFuture<Response> {
return req.future(response)
}
public func schedule(_ event: SSEEvent,
initialDelay: TimeAmount = .seconds(0),
repeatInterval: TimeAmount? = nil) {
if let repeatInterval = repeatInterval {
response.eventLoop.scheduleRepeatedTask(initialDelay: initialDelay, delay: repeatInterval) { [unowned self] task -> Future<Void> in
guard !self.chunkedStream.isClosed else {
task.cancel()
return self.request.future()
}
let response = self.eventFuture(for: event)
response.whenFailure { _ in task.cancel() }
return response
}
} else {
request.eventLoop.scheduleTask(in: initialDelay) { [unowned self] () -> Future<Void> in
guard !self.chunkedStream.isClosed else { return self.request.future() }
return self.eventFuture(for: event)
}
}
}
@discardableResult
func succeed(with content: ResponseType) throws -> Future<Void> {
let encoder = JSONEncoder()
let data = try encoder.encode(content)
let stream = chunkedStream
var contentBuffer = allocator.buffer(capacity: data.count)
contentBuffer.write(bytes: data)
return stream.eventLoop.submit {
stream.write(.chunk(contentBuffer))
.then { stream.write(.end) }
}.then { $0 }
}
}
private extension SSEStream {
func startPulse(for request: Request) {
var keepAliveBuffer: ByteBuffer = allocator.buffer(capacity: 1)
keepAliveBuffer.write(string: "\n")
request.eventLoop.scheduleRepeatedTask(initialDelay: .seconds(0), delay: .seconds(0)) { [unowned self] task -> Future<Void> in
guard !self.chunkedStream.isClosed else {
task.cancel()
return request.future()
}
let response = self.chunkedStream.write(.chunk(keepAliveBuffer))
response.whenFailure { _ in task.cancel() }
return response
}
}
@discardableResult
func eventFuture(for event: SSEEvent) -> Future<Void> {
let data: [UInt8] = Array(payload(for: event).utf8)
var contentBuffer = allocator.buffer(capacity: data.count)
contentBuffer.write(bytes: data)
let stream = chunkedStream
return stream.eventLoop.submit {
stream.write(.chunk(contentBuffer))
}.then { $0 }
}
func payload(for event: SSEEvent) -> String {
var payload: String = ""
if let id = event.id { payload += "id:\(id)\n" }
if let eventType = event.type { payload += "event:\(eventType)\n" }
payload += "data:\(event.data ?? "")\n\n"
return payload
}
}
final class SSEStreamManager<T: Encodable> {
private var streams: [SSEStream<T>] = []
private let queue = DispatchQueue(label: "SSE-\(T.self)-Queue")
func add(_ stream: SSEStream<T>) {
queue.async { [weak self] in
self?.streams.append(stream)
}
}
func pop() -> SSEStream<T>? {
return queue.sync { [weak self] in
guard let self = self, !self.streams.isEmpty else { return nil }
return self.streams.removeFirst()
}
}
func publish(_ event: T) {
streams.forEach { stream in
if let event = event as? SSEEvent {
stream.schedule(event)
}
}
}
}
@fappelman
Copy link

fappelman commented Oct 8, 2020

Thanks this was really useful for me. There is a compilation error on line 137 (event.type is not optional).

if let eventType = event.type { payload += "event:\(eventType)\n" }

So initially I added the line

payload += "id:\(id)\n"

unconditionally. With that in place it didn't work. I never got a callback. After removing the line altogether it works.

What is the intention of that line? Should it work when this line is included?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment