Skip to content

Instantly share code, notes, and snippets.

@klg71
Last active June 3, 2023 16:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save klg71/c672e06d590382121cb2c5448009eb5b to your computer and use it in GitHub Desktop.
Save klg71/c672e06d590382121cb2c5448009eb5b to your computer and use it in GitHub Desktop.
chat server with websockets and flows
MIT License
Copyright (c) [year] [fullname]
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
// MIT License see License file
package net.mayope.raids.chatserver.chat.websocket
import kotlinx.coroutines.flow.MutableSharedFlow
import net.mayope.raids.chatserver.chatclient.ChatMessage
import java.util.UUID
internal data class Room(val id: UUID,
val name: String,
val messages: MutableSharedFlow<ChatMessage>){
suspend fun sendMessage(chatMessage: ChatMessage) {
messages.emit(chatMessage)
}
}
// MIT License see License file
package net.mayope.raids.chatserver.chat.service
import kotlinx.coroutines.flow.MutableSharedFlow
import net.mayope.raids.chatserver.chat.websocket.Room
import net.mayope.raids.chatserver.chatclient.ChatMessage
import net.mayope.raids.common.getLogger
import org.springframework.stereotype.Service
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.PreDestroy
@Service
internal class RoomService(
private val userRoomHost: UserRoomHost,
) {
private val logger = getLogger(RoomService::class.java)
private val rooms = ConcurrentHashMap<UUID, Room>()
@PreDestroy
fun tearDownRooms() {
rooms.values.forEach {
closeRoom(it.id)
}
}
fun createRoom(id: UUID, name: String) {
val messageFlow = MutableSharedFlow<ChatMessage>()
rooms[id] = Room(id, name, messageFlow)
logger.info("Create new room: {}", name)
}
fun deleteAll() {
rooms.keys().toList().forEach {
userRoomHost.removeAllUsersFromRoom(it)
}
logger.info("Deleted all rooms")
}
fun closeRoom(id: UUID) {
userRoomHost.removeAllUsersFromRoom(id)
rooms.remove(id)
logger.info("Closed room: {}", id)
}
fun currentRooms() =
rooms.iterator().asSequence().toList().map {
it.value
}.associateBy { it.id }
fun roomsForIds(roomIds: Set<UUID>): Set<Room> {
val currentRooms = currentRooms()
return roomIds.map {
currentRooms[it] ?: error("Room with id: $it does not exist")
}.toSet()
}
}
// MIT License see License file
package net.mayope.raids.chatserver.chat.websocket
import com.fasterxml.jackson.databind.ObjectMapper
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import net.mayope.raids.chatserver.chatclient.ChatEvent
import net.mayope.raids.chatserver.chatclient.ChatEventType
import net.mayope.raids.chatserver.chatclient.ChatMessage
import org.springframework.web.socket.BinaryMessage
import org.springframework.web.socket.WebSocketSession
import java.util.UUID
internal suspend fun WebSocketSession.sendJoin(room: Room, objectMapper: ObjectMapper, sendMutex: Mutex) {
ChatEvent(room.name, room.id, "", "", ChatEventType.ROOM_JOIN).let {
objectMapper.writeValueAsBytes(it)
}.let {
BinaryMessage(it)
}.let {
sendMutex.withLock {
sendMessage(it)
}
}
}
internal suspend fun WebSocketSession.sendLeave(room: Room, objectMapper: ObjectMapper, sendMutex: Mutex) {
ChatEvent(room.name, room.id, "", "", ChatEventType.ROOM_JOIN).let {
objectMapper.writeValueAsBytes(it)
}.let {
BinaryMessage(it)
}.let {
if (isOpen) {
sendMutex.withLock {
sendMessage(it)
}
}
}
}
internal suspend fun WebSocketSession.sendMessage(room: Room,
objectMapper: ObjectMapper,
sendMutex: Mutex,
message: ChatMessage) {
ChatEvent(room.name, room.id, message.from, message.message, chatEventVolume = message.chatEventVolume)
.let {
objectMapper.writeValueAsBytes(it).let {
BinaryMessage(it)
}.let {
runBlocking {
sendMutex.withLock {
sendMessage(it)
}
}
}
}
}
internal suspend fun WebSocketSession.sendPrivateMessage(
userId: UUID,
objectMapper: ObjectMapper,
sendMutex: Mutex,
message: ChatMessage) {
ChatEvent("private", userId, message.from, message.message, chatEventVolume = message.chatEventVolume)
.let {
objectMapper.writeValueAsBytes(it).let {
BinaryMessage(it)
}.let {
runBlocking {
sendMutex.withLock {
sendMessage(it)
}
}
}
}
}
// MIT License see License file
package net.mayope.raids.chatserver.chat.websocket
import com.fasterxml.jackson.databind.ObjectMapper
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import net.mayope.raids.chatserver.chat.service.RoomService
import net.mayope.raids.chatserver.chat.service.UserRoomHost
import net.mayope.raids.chatserver.chatclient.ChatMessage
import net.mayope.raids.common.getLogger
import org.springframework.web.socket.WebSocketSession
import java.util.UUID
internal class UserInRoom(val room: Room, val receiveJob: Job)
internal class UserChatHandler(private val username: String,
private val userId: UUID,
private val objectMapper: ObjectMapper,
private val session: WebSocketSession,
private val scope: CoroutineScope,
private val userRoomHost: UserRoomHost,
private val roomService: RoomService,
private val privateInbox: SharedFlow<ChatMessage>) {
private val joinedRooms = mutableMapOf<UUID, UserInRoom>()
private val logger = getLogger(UserChatHandler::class.java)
private val sendMutex = Mutex()
init {
scope.launch {
roomWatcher()
onPrivateMessage()
}
}
private suspend fun roomWatcher() {
while (session.isOpen) {
updatePlayerRooms()
delay(1000)
}
}
private suspend fun updatePlayerRooms() {
userRoomHost.playerRooms(userId).let {
roomService.roomsForIds(it)
}.let {
updateRooms(it)
}
}
private suspend fun onPrivateMessage() {
privateInbox.collect {
session.sendPrivateMessage(userId, objectMapper, sendMutex, it)
}
}
suspend fun sendMessage(roomId: UUID, message: String) {
joinedRooms[roomId]?.room?.sendMessage(ChatMessage(username, message))
}
private suspend fun updateRooms(newRooms: Set<Room>) {
joinNewRooms(newRooms)
leaveMissingRooms(newRooms)
}
private suspend fun leaveMissingRooms(newRooms: Set<Room>) {
val newRoomIds = newRooms.map { it.id }
joinedRooms.filterNot {
newRoomIds.contains(it.key)
}.forEach {
leaveRoom(it.value.room)
}
}
private suspend fun joinNewRooms(newRooms: Set<Room>) {
newRooms.filterNot {
joinedRooms.contains(it.id)
}.forEach {
joinRoom(it)
}
}
private suspend fun joinRoom(room: Room) {
session.sendJoin(room, objectMapper, sendMutex)
logger.info("User: {} joined room: {}", username, room.id)
val receiveJob = scope.launch {
room.messages.collect {
session.sendMessage(room, objectMapper, sendMutex, it)
}
}
joinedRooms[room.id] = UserInRoom(room, receiveJob)
}
private suspend fun leaveRoom(room: Room) {
if (!joinedRooms.contains(room.id)) {
return
}
joinedRooms[room.id]?.receiveJob?.cancel()
session.sendLeave(room, objectMapper, sendMutex)
logger.info("User: {} left room: {}", username, room.id)
joinedRooms.remove(room.id)
}
suspend fun disconnect() {
scope.cancel()
joinedRooms.forEach {
leaveRoom(it.value.room)
}
logger.info("User: {} disconnected", username)
}
}
// MIT License see License file
package net.mayope.raids.chatserver.chat.service
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.springframework.stereotype.Service
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
@Service
internal class UserRoomHost {
private val playerRooms = ConcurrentHashMap<UUID, Set<UUID>>()
private val roomMutex = Mutex()
fun playerRooms(playerId: UUID) = playerRooms[playerId] ?: emptySet()
fun assignPlayerToRoom(roomId: UUID, userId: UUID) {
if (playerRooms[userId]?.contains(roomId) == true) {
return
}
runBlocking {
roomMutex.withLock {
val existingRooms = playerRooms[userId] ?: emptySet()
playerRooms[userId] = existingRooms + setOf(roomId)
}
}
}
fun removeUserFromRoom(roomId: UUID, userId: UUID) {
if (playerRooms[userId]?.contains(roomId) == false) {
return
}
runBlocking {
roomMutex.withLock {
val existingRooms = playerRooms[userId] ?: emptySet()
playerRooms[userId] = existingRooms.filter { it != roomId }.toSet()
}
}
}
fun removeAllUsersFromRoom(roomId: UUID) {
runBlocking {
roomMutex.withLock {
playerRooms.keys.forEach { userId ->
val existingRooms = playerRooms[userId] ?: emptySet()
playerRooms[userId] = existingRooms.filter { it != roomId }.toSet()
}
}
}
}
}
// MIT License see License file
package net.mayope.raids.chatserver.chat.websocket
import kotlinx.coroutines.CoroutineScope
import org.springframework.web.socket.WebSocketSession
import java.util.UUID
internal data class UserSession(val session: WebSocketSession,
val playerId: UUID,
val username: String,
val userChatHandler: UserChatHandler,
val scope: CoroutineScope)
// MIT License see License file
package net.mayope.raids.chatserver.chat.websocket
import com.fasterxml.jackson.databind.ObjectMapper
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.runBlocking
import net.mayope.raids.chatserver.chat.service.PrivateInboxHolder
import net.mayope.raids.chatserver.chat.service.RoomService
import net.mayope.raids.chatserver.chat.service.UserRoomHost
import net.mayope.raids.chatserver.chat.service.WorldRoomService
import net.mayope.raids.chatserver.chatclient.ChatMessage
import org.springframework.security.oauth2.jwt.JwtDecoder
import org.springframework.stereotype.Component
import org.springframework.web.socket.WebSocketSession
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
@Component
internal class UserSessionHolder(
private val objectMapper: ObjectMapper,
private val userRoomHost: UserRoomHost,
private val roomService: RoomService,
private val dispatcher: CoroutineDispatcher,
private val privateInboxHolder: PrivateInboxHolder,
private val decoder: JwtDecoder,
private val worldRoomService: WorldRoomService
) {
private val sessions = ConcurrentHashMap<UUID, UserSession>()
fun connectedUser(playerId: UUID) = sessions[playerId]
fun createSession(playerId: UUID, session: WebSocketSession) {
val scope = CoroutineScope(dispatcher)
val username = playerName(session)
val privateInbox = MutableSharedFlow<ChatMessage>()
val handler =
UserChatHandler(username, playerId, objectMapper, session, scope, userRoomHost, roomService, privateInbox)
privateInboxHolder.register(playerId, privateInbox)
UserSession(session, playerId, username, handler, scope).let {
sessions[playerId] = it
}
worldRoomService.addPlayer(playerId)
}
internal fun playerName(session: WebSocketSession) =
session.uri?.query?.split("&")?.associate {
it.split("=").let {
it.first() to it[1]
}
}?.get("authorization")?.let {
decoder.decode(it)
}?.getClaimAsString("preferred_username") ?: error("could not determine username")
fun tearDownSession(playerId: UUID) {
sessions[playerId]?.let {
runBlocking {
it.userChatHandler.disconnect()
}
it.scope.cancel()
}
privateInboxHolder.unregister(playerId)
sessions.remove(playerId)
worldRoomService.removePlayer(playerId)
}
}
// MIT License see License file
package net.mayope.raids.chatserver.chat.websocket
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import kotlinx.coroutines.runBlocking
import net.mayope.raids.chatserver.chat.service.PrivateInboxHolder
import net.mayope.raids.chatserver.chatclient.ChatMessage
import net.mayope.raids.chatserver.chatclient.MessageType
import net.mayope.raids.chatserver.chatclient.PostMessageEvent
import org.springframework.stereotype.Component
import org.springframework.web.socket.BinaryMessage
import org.springframework.web.socket.CloseStatus
import org.springframework.web.socket.WebSocketSession
import org.springframework.web.socket.handler.BinaryWebSocketHandler
import java.util.UUID
@Component
class WebSocketChatHandler internal constructor(
private val objectMapper: ObjectMapper,
private val userSessionHolder: UserSessionHolder,
private val privateInboxHolder: PrivateInboxHolder,
) : BinaryWebSocketHandler() {
override fun afterConnectionEstablished(session: WebSocketSession) {
userSessionHolder.createSession(playerId(session), session)
}
override fun afterConnectionClosed(session: WebSocketSession, status: CloseStatus) {
userSessionHolder.tearDownSession(playerId(session))
}
override fun handleBinaryMessage(session: WebSocketSession, message: BinaryMessage) {
val event: PostMessageEvent = objectMapper.readValue(message.payload.array())
val playerId = playerId(session)
runBlocking {
when (event.type) {
MessageType.ROOM ->
userSessionHolder.connectedUser(playerId)?.userChatHandler?.sendMessage(
event.targetId, event.message
)
MessageType.PRIVATE -> {
val playerName = userSessionHolder.connectedUser(playerId)?.username ?: return@runBlocking
privateInboxHolder.sendMessageTo(event.targetId, ChatMessage(playerName, event.message))
}
MessageType.COMMAND -> {
}
}
}
}
private fun playerId(session: WebSocketSession) =
session.principal?.name?.let {
UUID.fromString(it)
} ?: error("Player has no sessionId")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment