Skip to content

Instantly share code, notes, and snippets.

@sennett-lau
Last active July 19, 2024 17:28
Show Gist options
  • Save sennett-lau/2e045c4715ef8c859be9d116353861e2 to your computer and use it in GitHub Desktop.
Save sennett-lau/2e045c4715ef8c859be9d116353861e2 to your computer and use it in GitHub Desktop.
K6SocketIoBase.js

A JavaScript version of the andrew-delph/K6SocketIoBase.ts

A socket.io wrapper for k6/experimental/websockets package with promises. This workings on v4 of socket.io

See the list of current functions in K6SocketIoBase.ts

A simple test that waits for a message establish the ends an event myping, waits for the ack, sends another myping and waits for the ack again. It then closes the socket.

const responseType = {
open: 0,
close: 1,
ping: 2,
pong: 3,
message: 4,
upgrade: 5,
noop: 6,
}
const responseCode = {
connect: 0,
disconnect: 1,
event: 2,
ack: 3,
error: 4,
}
module.exports = { responseType, responseCode }
const { responseCode, responseType } = require('./constants.js')
const {
checkResponse,
getArrayFromRequest,
getCallbackId,
} = require('./socket.io.js')
const { uuidv4: uuid } = require('https://jslib.k6.io/k6-utils/1.4.0/index.js')
const { setTimeout, clearTimeout } = require('k6/experimental/timers')
const { check } = require('k6')
class K6SocketIoBase {
constructor(url, params = {}, max_time = 0) {
this.socket = null
this.callbackCount = 0
this.connected = false
this.onConnect = undefined
this.ackCallbackMap = {}
this.eventMessageHandleMap = {}
this.waitingEventMap = {}
this.url = url
this.max_time = max_time
this.params = params
}
connect() {
}
on(event, callback) {
}
parseMessage(message) {
}
setSocket(socket) {
this.socket = socket
this.on('message', (msg) => {
this.handleMessage(this.parseMessage(msg))
})
let max_time_timeout
if (this.max_time != 0) {
max_time_timeout = setTimeout(() => {
this.close()
}, this.max_time)
}
this.on('error', (error) => {
console.log('error.')
check(false, {error: (r) => r})
this.socket.close()
})
this.on('close', () => {
clearTimeout(max_time_timeout)
this.failWaitingEvents()
})
}
listen() {
this.on('open', () => {
})
}
close() {
this.socket.close()
}
setOnConnect(callback) {
this.onConnect = callback
}
setOnError(callback) {
this.on('error', callback)
}
handleMessage(msg) {
const response = checkResponse(msg)
const type = response.type
const code = response.code
if (type == responseType.open) {
this.socket.send('40')
return
}
switch (code) {
case responseCode.connect: {
if (this.onConnect != null) this.onConnect()
this.connected = true
break
}
case responseCode.ack: {
const msgObject = getArrayFromRequest(msg)
const callbackId = getCallbackId(msg)
const callback = this.ackCallbackMap[callbackId]
if (callback != undefined) {
delete this.ackCallbackMap[callbackId]
callback(msgObject)
}
break
}
case responseCode.event: {
const msgObject = getArrayFromRequest(msg)
const event = msgObject[0]
const message = msgObject[1]
const callbackId = getCallbackId(msg)
const callback = !Number.isNaN(callbackId)
? (data) => {
this.sendAck(callbackId, data)
}
: undefined
const eventMessageHandle = this.eventMessageHandleMap[event]
if (eventMessageHandle != undefined) {
eventMessageHandle(message, callback)
} else {
if (event == 'message' || event == 'activeCount') break
console.log('no eventMessageHandle:', event)
}
break
}
}
}
setEventMessageHandle(event, handler) {
this.eventMessageHandleMap[event] = handler
}
send(event, data, callback) {
if (callback == null) {
this.socket.send(
`${responseType.message}${responseCode.event}['${event}',${JSON.stringify(data)}]`,
)
} else {
this.callbackCount++
this.ackCallbackMap[this.callbackCount] = callback
this.socket.send(
`${responseType.message}${responseCode.event}${this.callbackCount}['${event}',${JSON.stringify(data)}]`,
)
}
}
sendAck(callbackId, data) {
this.socket.send(
`${responseType.message}${responseCode.ack}${callbackId}[${JSON.stringify(
data,
)}]`,
)
}
expectMessage(event, timeout = 0) {
const startTime = Date.now()
const waitingEventId = uuid()
const wrapper = this
return new Promise((resolve, reject) => {
wrapper.waitingEventMap[waitingEventId] = reject
const eventMessageHandle = (data, callback) => {
const elapsed = Date.now() - startTime
const isSuccess = elapsed < timeout
delete wrapper.waitingEventMap[waitingEventId]
if (isSuccess || timeout == 0) {
resolve({data, callback, elapsed})
} else {
reject(`timeout reached for ${event}`)
}
}
wrapper.eventMessageHandleMap[event] = eventMessageHandle
})
}
sendWithAck(event, data, timeout = 0) {
const startTime = Date.now()
const waitingEventId = uuid()
const wrapper = this
return new Promise(function (resolve, reject) {
wrapper.waitingEventMap[waitingEventId] = reject
wrapper.send(event, data, (callbackData) => {
const elapsed = Date.now() - startTime
const isSuccess = elapsed < timeout
delete wrapper.waitingEventMap[waitingEventId]
if (isSuccess || timeout == 0) {
resolve({data: callbackData, elapsed})
} else {
reject(`timeout reached`)
}
})
})
}
failWaitingEvents() {
for (const waitingEvent of Object.values(this.waitingEventMap)) {
waitingEvent('failed wait event.')
}
}
}
module.exports = { K6SocketIoBase }
const { K6SocketIoBase } = require('./K6SocketIoBase.js')
const { WebSocket } = require('k6/ws')
class K6SocketIoExp extends K6SocketIoBase {
connect() {
const socketIo = this
socketIo.setSocket(new WebSocket(this.url))
this.socket.addEventListener('open', () => {
})
}
on(event, callback) {
this.socket.addEventListener(event, callback)
}
parseMessage(message) {
return message.data
}
}
module.exports = { K6SocketIoExp }
const { K6SocketIoExp } = require('./K6SocketIoExp.js')
export const options = {
vus: 100,
duration: '1m',
}
export default () => {
const domain = 'localhost:8888'
const url = `ws://${domain}/socket.io/?EIO=4&transport=websocket`
const socket = new K6SocketIoExp(url)
socket.setOnConnect(() => {
socket
.expectMessage('established')
.catch((error) => {
return Promise.reject(error)
})
.then((data) => {
return socket.sendWithAck('myping', {})
})
.catch((error) => {
return Promise.reject(error)
})
.then((data) => {
return socket.sendWithAck('myping', {})
})
.catch(() => {})
.finally(() => {
socket.close()
})
})
socket.connect()
}
const checkResponse = (response) => {
return { type: parseInt(response[0]), code: parseInt(response[1]) }
}
const getCallbackId = (response) => {
return parseInt(response.slice(2))
}
const getArrayFromRequest = (response) => {
const match = /\[.+\]/
const parsedResponse = response.match(match)
return parsedResponse ? JSON.parse(parsedResponse[0]) : []
}
module.exports = { checkResponse, getCallbackId, getArrayFromRequest }
@andreav
Copy link

andreav commented Jul 19, 2024

Thank you for the update.
However I'm experiencing the same error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment