Created
September 22, 2021 16:47
-
-
Save abueide/948073fe8102b65d0a55c8e0446b163a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.abysl.realms.client.network | |
import com.abysl.realms.shared.services.Service | |
import io.ktor.network.selector.* | |
import io.ktor.network.sockets.* | |
import io.ktor.network.tls.* | |
import io.ktor.utils.io.* | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.flow.* | |
import java.net.ConnectException | |
@OptIn(ExperimentalCoroutinesApi::class) | |
class ClientConnectionService( | |
settings: ClientSettings = ClientSettings(), | |
initStart: Boolean = false): Service(initStart) { | |
private val _socket = aSocket(ActorSelectorManager(Dispatchers.IO)) | |
private val _settings = MutableStateFlow(settings) | |
private val _udp: MutableStateFlow<ConnectedDatagramSocket?> = MutableStateFlow(null) | |
private val _udpWrite: StateFlow<ByteWriteChannel?> = runBlocking { | |
_udp.mapLatest { it?.openWriteChannel() }.stateIn(CoroutineScope(Dispatchers.IO)) | |
} | |
private val _udpRead: StateFlow<ByteReadChannel?> = runBlocking { | |
_udp.mapLatest { it?.openReadChannel() }.stateIn(CoroutineScope(Dispatchers.IO)) | |
} | |
private val _tcp: MutableStateFlow<Socket?> = MutableStateFlow(null) | |
private val _tcpWrite: StateFlow<ByteWriteChannel?> = runBlocking { | |
_tcp.mapLatest { it?.openWriteChannel() }.stateIn(CoroutineScope(Dispatchers.IO)) | |
} | |
private val _tcpRead: StateFlow<ByteReadChannel?> = runBlocking { | |
_tcp.mapLatest { it?.openReadChannel() }.stateIn(CoroutineScope(Dispatchers.IO)) | |
} | |
val connected: StateFlow<Boolean> = runBlocking { | |
flow { | |
if (isRunning) { | |
emit(checkConnected()) | |
} else { | |
emit(false) | |
} | |
}.stateIn(CoroutineScope(Dispatchers.IO)) | |
} | |
val availableUdpPackets = packetFlow(_udpRead) | |
val availableTcpPackets = packetFlow(_tcpRead) | |
override fun start() { | |
super.start() | |
connectSockets() | |
connected.onEach { | |
if(!it && isRunning){ | |
disconnectSockets() | |
connectSockets() | |
} | |
}.launchIn(CoroutineScope(Dispatchers.IO)) | |
} | |
fun connect(settings: ClientSettings? = null) { | |
if (settings != null) _settings.value = settings | |
stop() | |
start() | |
} | |
fun sendTcp(bytes: ByteArray) { | |
CoroutineScope(Dispatchers.IO).launch { | |
_tcpWrite.value ?: println("Failed to send packet, connection not established") | |
_tcpWrite.value?.writeShort(bytes.size.toShort()) | |
_tcpWrite.value?.writeFully(bytes) | |
_udpWrite.value?.flush() | |
} | |
} | |
fun sendUdp(bytes: ByteArray) { | |
println(connected.value) | |
CoroutineScope(Dispatchers.IO).launch { | |
_udpWrite.value ?: println("Failed to send packet, connection") | |
_udpWrite.value?.writeShort(bytes.size.toShort()) | |
_udpWrite.value?.writeFully(bytes) | |
_udpWrite.value?.flush() | |
} | |
} | |
private fun connectSockets() { | |
CoroutineScope(Dispatchers.IO).launch { | |
while (isRunning && !connected.value) { | |
try { | |
_tcp.value = _socket.tcp().connect(_settings.value.address) | |
_udp.value = _socket.udp().connect(_settings.value.address) | |
} catch (e: ConnectException) { | |
println("Couldn't connect to socket, trying again in 5 seconds") | |
disconnectSockets() | |
} | |
delay(5000) | |
} | |
} | |
} | |
private fun disconnectSockets(){ | |
_tcp.value?.close() | |
_udp.value?.close() | |
_tcp.value = null | |
_udp.value = null | |
} | |
private fun packetFlow(readFlow: Flow<ByteReadChannel?>) = flow { | |
readFlow.mapLatest { readChannel -> | |
while (isRunning) { | |
if (readChannel != null && readChannel.availableForRead > 0) { | |
val size = readChannel.readShort() | |
val packet = ByteArray(size.toInt()) | |
readChannel.readFully(packet) | |
emit(packet) | |
} | |
} | |
} | |
} | |
fun checkConnected(): Boolean { | |
if(anyNull( | |
_tcp.value, | |
_tcpRead.value, | |
_tcpWrite.value, | |
_udp.value, | |
_udpRead.value, | |
_udpWrite.value, | |
)) return false | |
return _tcpRead.value?.isClosedForRead != true | |
&& _tcpWrite.value?.isClosedForWrite != true | |
&& _udpWrite.value?.isClosedForWrite != true | |
} | |
private fun anyNull(vararg values: Any?): Boolean{ | |
return values.any { it == null } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment