Skip to content

Instantly share code, notes, and snippets.

@scorredoira
Created October 7, 2021 09:38
Show Gist options
  • Save scorredoira/03c8b55094f48fd0e92763eddc0e3aaa to your computer and use it in GitHub Desktop.
Save scorredoira/03c8b55094f48fd0e92763eddc0e3aaa to your computer and use it in GitHub Desktop.
import * as web from "lib/web"
import * as env from "lib/env"
import * as orm from "lib/orm"
let upgrader = websocket.newUpgrader()
let webSockets: WebSocketManager
function init() {
webSockets = new WebSocketManager()
}
web.addRoute({
method: "GET",
url: "//core/ws/connect.json",
filter: web.adminFilter,
handler: c => {
let session = env.getSession()
let con = upgrader.upgrade(c.request)
let tenant = env.currentTenantName()
webSockets.add(tenant, session.userId, con)
}
})
web.addRoute({
method: "GET",
url: "//core/ws/users.json",
filter: web.adminFilter,
handler: c => {
let tenant = env.currentTenantName()
let ids = webSockets.getUsers(tenant)
let users
if (ids) {
users = orm.query("SELECT id, name FROM user WHERE id IN ?", ids)
} else {
users = []
}
c.response.writeJSON(users)
}
})
class WebSocket {
conn: websocket.Connection
private mutex: sync.Mutex
constructor(conn: websocket.Connection) {
this.conn = conn
this.mutex = sync.newMutex()
}
writeJSON(data: any) {
this.mutex.lock()
defer(this.mutex.unlock)
this.conn.writeJSON(data)
}
}
class WebSocketManager {
private sockets: Map<Map<WebSocket[]>>
private mutex: sync.Mutex
constructor() {
this.sockets = {}
this.mutex = sync.newMutex()
}
writeJSON(tenant: string, userIds: number[], data: any) {
let tenantWebSockets = this.sockets[tenant]
if (!tenantWebSockets) {
return
}
for (let userId of userIds) {
let userWebSockets = tenantWebSockets[userId]
if (!userWebSockets) {
continue
}
for (let ws of userWebSockets) {
go(() => ws.writeJSON(data))
}
}
}
getUsers(tenant: string) {
let tenantWebSockets = this.sockets[tenant]
if (!tenantWebSockets) {
return
}
return Object.keys(tenantWebSockets)
}
add(tenant: string, userId: number, conn: websocket.Connection) {
let userWebSockets: WebSocket[]
this.mutex.lock()
try {
let tenantWebSockets = this.sockets[tenant]
if (!tenantWebSockets) {
tenantWebSockets = {}
this.sockets[tenant] = tenantWebSockets
}
userWebSockets = tenantWebSockets[userId]
if (!userWebSockets) {
userWebSockets = []
tenantWebSockets[userId] = userWebSockets
}
} finally {
this.mutex.unlock()
}
let ws = new WebSocket(conn)
userWebSockets.push(ws)
// Ping the cliente at an interval to make sure that the connection is still valid.
// If not, remove it from the list.
while (true) {
try {
ws.writeJSON({ code: "ping" })
time.sleep(2 * time.Second)
} catch (err) {
console.log("CLOSE", err.message)
userWebSockets.remove(ws)
ws.conn.close()
break
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment