Skip to content

Instantly share code, notes, and snippets.

@abueide
Created September 22, 2021 16:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abueide/948073fe8102b65d0a55c8e0446b163a to your computer and use it in GitHub Desktop.
Save abueide/948073fe8102b65d0a55c8e0446b163a to your computer and use it in GitHub Desktop.
package com.abysl.realms.client.network
import com.abysl.realms.shared.services.Service
import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.network.tls.*
import io.ktor.utils.io.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.net.ConnectException
@OptIn(ExperimentalCoroutinesApi::class)
class ClientConnectionService(
settings: ClientSettings = ClientSettings(),
initStart: Boolean = false): Service(initStart) {
private val _socket = aSocket(ActorSelectorManager(Dispatchers.IO))
private val _settings = MutableStateFlow(settings)
private val _udp: MutableStateFlow<ConnectedDatagramSocket?> = MutableStateFlow(null)
private val _udpWrite: StateFlow<ByteWriteChannel?> = runBlocking {
_udp.mapLatest { it?.openWriteChannel() }.stateIn(CoroutineScope(Dispatchers.IO))
}
private val _udpRead: StateFlow<ByteReadChannel?> = runBlocking {
_udp.mapLatest { it?.openReadChannel() }.stateIn(CoroutineScope(Dispatchers.IO))
}
private val _tcp: MutableStateFlow<Socket?> = MutableStateFlow(null)
private val _tcpWrite: StateFlow<ByteWriteChannel?> = runBlocking {
_tcp.mapLatest { it?.openWriteChannel() }.stateIn(CoroutineScope(Dispatchers.IO))
}
private val _tcpRead: StateFlow<ByteReadChannel?> = runBlocking {
_tcp.mapLatest { it?.openReadChannel() }.stateIn(CoroutineScope(Dispatchers.IO))
}
val connected: StateFlow<Boolean> = runBlocking {
flow {
if (isRunning) {
emit(checkConnected())
} else {
emit(false)
}
}.stateIn(CoroutineScope(Dispatchers.IO))
}
val availableUdpPackets = packetFlow(_udpRead)
val availableTcpPackets = packetFlow(_tcpRead)
override fun start() {
super.start()
connectSockets()
connected.onEach {
if(!it && isRunning){
disconnectSockets()
connectSockets()
}
}.launchIn(CoroutineScope(Dispatchers.IO))
}
fun connect(settings: ClientSettings? = null) {
if (settings != null) _settings.value = settings
stop()
start()
}
fun sendTcp(bytes: ByteArray) {
CoroutineScope(Dispatchers.IO).launch {
_tcpWrite.value ?: println("Failed to send packet, connection not established")
_tcpWrite.value?.writeShort(bytes.size.toShort())
_tcpWrite.value?.writeFully(bytes)
_udpWrite.value?.flush()
}
}
fun sendUdp(bytes: ByteArray) {
println(connected.value)
CoroutineScope(Dispatchers.IO).launch {
_udpWrite.value ?: println("Failed to send packet, connection")
_udpWrite.value?.writeShort(bytes.size.toShort())
_udpWrite.value?.writeFully(bytes)
_udpWrite.value?.flush()
}
}
private fun connectSockets() {
CoroutineScope(Dispatchers.IO).launch {
while (isRunning && !connected.value) {
try {
_tcp.value = _socket.tcp().connect(_settings.value.address)
_udp.value = _socket.udp().connect(_settings.value.address)
} catch (e: ConnectException) {
println("Couldn't connect to socket, trying again in 5 seconds")
disconnectSockets()
}
delay(5000)
}
}
}
private fun disconnectSockets(){
_tcp.value?.close()
_udp.value?.close()
_tcp.value = null
_udp.value = null
}
private fun packetFlow(readFlow: Flow<ByteReadChannel?>) = flow {
readFlow.mapLatest { readChannel ->
while (isRunning) {
if (readChannel != null && readChannel.availableForRead > 0) {
val size = readChannel.readShort()
val packet = ByteArray(size.toInt())
readChannel.readFully(packet)
emit(packet)
}
}
}
}
fun checkConnected(): Boolean {
if(anyNull(
_tcp.value,
_tcpRead.value,
_tcpWrite.value,
_udp.value,
_udpRead.value,
_udpWrite.value,
)) return false
return _tcpRead.value?.isClosedForRead != true
&& _tcpWrite.value?.isClosedForWrite != true
&& _udpWrite.value?.isClosedForWrite != true
}
private fun anyNull(vararg values: Any?): Boolean{
return values.any { it == null }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment