Skip to content

Instantly share code, notes, and snippets.

@abueide
Created September 22, 2021 16:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abueide/f086f6e0cc00c5bd8daf793cc66e0b96 to your computer and use it in GitHub Desktop.
Save abueide/f086f6e0cc00c5bd8daf793cc66e0b96 to your computer and use it in GitHub Desktop.
package com.abysl.realms.server.network.services
import com.abysl.realms.server.network.ServerSettings
import com.abysl.realms.shared.services.Service
import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.network.tls.certificates.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.io.File
import java.util.concurrent.ConcurrentLinkedQueue
class ServerConnectionService(settings: ServerSettings = ServerSettings(), start: Boolean = false) : Service(start) {
private val _socket = aSocket(ActorSelectorManager(Dispatchers.IO))
private val _settings = MutableStateFlow(settings)
private val _udp = _socket.udp().bind(settings.address)
private val _tcp = _socket.tcp().bind(settings.address)
private val _connections = MutableStateFlow(listOf<Socket>())
private val _unprocessedPackets = ConcurrentLinkedQueue<ByteArray>()
val unprocessedPackets = flow {
while (_unprocessedPackets.size > 0) {
emit(_unprocessedPackets.poll())
}
}
override fun start() {
super.start()
acceptConnections()
listenUdp()
}
// Accepts connections and passes off sockets to be read on a new coroutine
private fun acceptConnections() {
CoroutineScope(Dispatchers.IO).launch {
while (isRunning) {
val tcpSocket = _tcp.accept()
CoroutineScope(Dispatchers.IO).launch {
listenTcp(tcpSocket)
}
}
}
}
// Takes a socket and listens to it
private fun listenTcp(socket: Socket) {
CoroutineScope(Dispatchers.IO).launch {
val readChannel = socket.openReadChannel()
while (isRunning) {
if (readChannel.availableForRead > 0) {
val size = readChannel.readShort().toInt()
val packet = ByteArray(size)
readChannel.readFully(packet)
_unprocessedPackets.add(packet)
}
}
}
}
// Listens to the udp socket
private fun listenUdp() {
CoroutineScope(Dispatchers.IO).launch {
while (isRunning) {
val dg = _udp.receive()
val size = dg.packet.readShort().toInt()
val packet = ByteArray(size)
dg.packet.readFully(packet)
_unprocessedPackets.add(packet)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment