Skip to content

Instantly share code, notes, and snippets.

@ethereumdegen
Last active April 29, 2023 22:07
Show Gist options
  • Save ethereumdegen/53eecf33a7da4ed11ccfee9d3279f8b0 to your computer and use it in GitHub Desktop.
Save ethereumdegen/53eecf33a7da4ed11ccfee9d3279f8b0 to your computer and use it in GitHub Desktop.
A singleton websocket server and client implementation with app-wide pub and sub
/*
Websocket Client
let connect = await connectSocketClient( beamServerUrl )
//once the server connects us , try to join a room
subscribeToMessageType('init', async (message:string):Promise<string|undefined>=>{
let join = await joinRooms(['pendingtx'])
return undefined
})
//we can listen for messages from the server and do things
subscribeToMessageType('pendingtx', async (message:string)=>{
//do stuff here !!
return JSON.stringify({type:"pong",data:"hi"})
})
// we can send messages to the server
emitMessage( JSON.stringify({ type:'hello', data:'hi' })
*/
import { io , Socket} from "socket.io-client";
let socket: Socket<any>|undefined = undefined
let socketUid:string|undefined = undefined
require('dotenv').config()
const WEBSOCKET_ACCESS_KEY = process.env.WEBSOCKET_ACCESS_KEY
export let isConnected = false
//add a way for another module to subscribe to messages of a certain type
//and provide a callback function to handle the message
let messageSubscriptions:Map<string,Array<(msg:string)=>Promise<string|undefined>>> = new Map()
export function subscribeToMessageType(messageType:string, handler:(msg:string)=>Promise<string|undefined>){
let existingHandlers = messageSubscriptions.get(messageType)
if(existingHandlers){
existingHandlers.push(handler)
}else{
messageSubscriptions.set(messageType, [handler])
}
}
export function unsubscribeAllToMessageType(messageType:string){
messageSubscriptions.set(messageType, [])
}
export async function handleReceivedMessage(message:string): Promise<void>
{
console.log("received: %s", message);
let parsedMessage = JSON.parse(message)
let type = parsedMessage.type
let handlers = messageSubscriptions.get(type)
if(!handlers) return
for(let handler of handlers){
let response = await handler(message)
if(response && socket){
socket.emit("message", response);
}
}
}
export async function emitMessage(message:string){
if(socket){
socket.emit("message", message);
}
}
export async function connect(serverUrl: string) : Promise<any>{
socket = io( serverUrl );
socket.on("connect", () => {
console.log("connected");
isConnected = true
if(WEBSOCKET_ACCESS_KEY){
socket?.emit("authenticate", WEBSOCKET_ACCESS_KEY);
}
});
socket.on("init", async (data:string) => {
socketUid = data
await handleReceivedMessage(JSON.stringify({type:"init",data:socketUid}))
});
socket.on("message", async (data:string) => {
try{
await handleReceivedMessage(data)
}catch(e){
console.error(e)
}
});
socket.on("event", (data:string) => {
console.log(data);
});
socket.on("disconnect", () => {
console.log("disconnected");
isConnected = false
});
return socket;
}
export async function joinRooms(roomArray:string[]){
if(socket){
socket.emit("join", JSON.stringify(roomArray));
}else{
console.error("socket not initialized")
}
}
export async function leaveRooms(roomArray:string[]){
if(socket){
socket.emit("leave", JSON.stringify(roomArray));
}else{
console.error("socket not initialized")
}
}
/*
Websocket Server
startWebsocketServer()
// we can listen for client messages
subscribeToMessageType('add_pending_tx', async (message:any) : Promise<string|undefined>=> {
let txData = JSON.parse(message)
let txDataStringified = JSON.stringify( txData.data )
//push to redis
pushTxDataToRedis(txDataStringified)
return undefined
})
//we can broadcast to a room
broadcastToRoom(roomName, JSON.stringify({
type: roomName,
data: txDataStringified
}))
https://socket.io/docs/v4/namespaces/
https://socket.io/docs/v3/rooms/
*/
import { Server } from "socket.io";
require('dotenv').config()
const WEBSOCKET_ACCESS_KEY = process.env.WEBSOCKET_ACCESS_KEY
const accessKeyRequired = typeof WEBSOCKET_ACCESS_KEY != undefined
const PORT = 8010
const server = new Server({ /* options */ });
//parsed message
export interface ISocketPayload{
type:string
data?:any
}
interface UserDefinition {
userId:string
accessKey:string|undefined
}
let messageSubscriptions:Map<string,Array<(msg:string)=>Promise<string|undefined>>> = new Map()
export function subscribeToMessageType(messageType:string, handler:(msg:string)=>Promise<string|undefined>){
let existingHandlers = messageSubscriptions.get(messageType)
if(existingHandlers){
existingHandlers.push(handler)
}else{
messageSubscriptions.set(messageType, [handler])
}
}
export function unsubscribeAllToMessageType(messageType:string){
messageSubscriptions.set(messageType, [])
}
export async function handleInboundMessage(message:string, socket:any): Promise<string|undefined>
{
console.log("received: %s", message);
let parsedMessage = JSON.parse(message)
let type = parsedMessage.type
let handlers = messageSubscriptions.get(type)
if(!handlers) return
for(let handler of handlers){
let response = await handler(message)
if(response && socket){
socket.emit("message", response);
}
}
}
//a function that will broadcast a message when called
export function broadcast(message:string){
server.emit("message", message);
}
//a function that will broadcast a message to a room when called
export function broadcastToRoom(room:string, message:string){
server.to(room).emit("message", message);
}
let users:any = {} //map of user id to socket id
export function start(){
server.on("connection", (socket) => {
let userId = (socket.id)
users[userId] = {
userId, accessKey:undefined
}
socket.emit("init", userId);
socket.on("authenticate", (accessKey:string) => {
if(accessKeyRequired){
if(accessKey == WEBSOCKET_ACCESS_KEY){
users[userId].accessKey = accessKey
console.log('authed user for socket')
}else{
socket.disconnect()
}
}
})
socket.on("message", async (message,arg) => {
if(accessKeyRequired){
if(!users[userId].accessKey){
socket.disconnect()
return
}
}
console.log('got msg', message)
let response = await handleInboundMessage(message, socket);
if(response){
socket.emit("message", response);
}
})
socket.on("rooms", () => {
socket.emit("rooms", socket.rooms)
})
//join and leave rooms
socket.on("join", (roomArrayStringified) => {
if(accessKeyRequired){
if(!users[userId].accessKey){
socket.disconnect()
return
}
}
let roomArray = JSON.parse(roomArrayStringified);
socket.join(roomArray);
console.log("joined rooms", roomArray);
})
socket.on("leave", (roomArrayStringified) => {
let roomArray = JSON.parse(roomArrayStringified);
socket.leave(roomArray);
console.log("leaving rooms", roomArray);
})
});
server.on("connection_error", (err) => {
console.log(err.req); // the request object
console.log(err.code); // the error code, for example 1
console.log(err.message); // the error message, for example "Session ID unknown"
console.log(err.context); // some additional error context
});
server.listen(PORT);
console.log(`WebSocket server is running on ws://localhost:${PORT} with access key ${WEBSOCKET_ACCESS_KEY || '<none>'}`);
return server
}
export default server ;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment