Skip to content

Instantly share code, notes, and snippets.

@VitalyPeryatin
Created September 10, 2019 15:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save VitalyPeryatin/7dbd10c12cc9741024c52c017cd5e5d4 to your computer and use it in GitHub Desktop.
Save VitalyPeryatin/7dbd10c12cc9741024c52c017cd5e5d4 to your computer and use it in GitHub Desktop.
Socket
package ru.energyhouse.energyhouse.data.network.socket.connection_states
import ru.energyhouse.energyhouse.data.network.socket.ConnectResult
import ru.energyhouse.energyhouse.domain.socket.model.SocketAddress
import java.io.IOException
import java.net.InetSocketAddress
import java.net.Socket
abstract class AbstractConnectionState(private val socket: Socket) {
abstract fun connect(): ConnectResult
abstract fun getSocketAddress(): SocketAddress
abstract fun getSuccessMessage(): String
abstract fun getFailMessage(): String
protected fun tryConnectSocketByAddress(host: String, port: Int): ConnectResult {
return try {
socket.connect(InetSocketAddress(host, port), 1000)
ConnectResult(
status = ConnectResult.Status.SUCCESS,
message = getSuccessMessage()
)
} catch (e: IOException) {
ConnectResult(
status = ConnectResult.Status.FAIL,
message = getFailMessage()
)
}
}
}
package ru.energyhouse.energyhouse.data.network.socket.connection_states
import ru.energyhouse.energyhouse.data.network.socket.ConnectResult
import ru.energyhouse.energyhouse.data.repositories.socket.SocketAddressRepository
import ru.energyhouse.energyhouse.domain.socket.model.SocketAddress
import java.net.Socket
class GlobalConnection(
socket: Socket,
private val socketAddressRepository: SocketAddressRepository
) : AbstractConnectionState(socket) {
override fun connect(): ConnectResult {
val socketAddress = getSocketAddress()
return tryConnectSocketByAddress(
host = socketAddress.ip,
port = socketAddress.port
)
}
override fun getSocketAddress(): SocketAddress =
socketAddressRepository.getGlobalAddress()
override fun getSuccessMessage(): String {
return ""
}
override fun getFailMessage(): String {
return ""
}
}
package ru.energyhouse.energyhouse.data.network.socket.connection_states
import ru.energyhouse.energyhouse.data.network.socket.ConnectResult
import ru.energyhouse.energyhouse.data.repositories.socket.SocketAddressRepository
import ru.energyhouse.energyhouse.domain.socket.model.SocketAddress
import java.net.Socket
class LocalConnection(
socket: Socket,
private val socketAddressRepository: SocketAddressRepository
) : AbstractConnectionState(socket) {
override fun connect(): ConnectResult {
val socketAddress = getSocketAddress()
return tryConnectSocketByAddress(
host = socketAddress.ip,
port = socketAddress.port
)
}
override fun getSuccessMessage(): String {
return ""
}
override fun getFailMessage(): String {
return ""
}
override fun getSocketAddress(): SocketAddress =
socketAddressRepository.getLocalAddress()
}
package ru.energyhouse.energyhouse.data.network.socket.connection_states
import ru.energyhouse.energyhouse.data.network.socket.ConnectResult
import ru.energyhouse.energyhouse.domain.socket.model.SocketAddress
import java.net.Socket
class NoNetworkConnection(socket: Socket) : AbstractConnectionState(socket), TerminalState {
override fun connect(): ConnectResult {
return ConnectResult(
status = ConnectResult.Status.FAIL,
message = getFailMessage()
)
}
override fun getSocketAddress(): SocketAddress = SocketAddress()
override fun getSuccessMessage(): String = ""
override fun getFailMessage(): String = ""
}
package ru.energyhouse.energyhouse.data.network.socket.connection_states
interface TerminalState
package ru.energyhouse.energyhouse.data.network.socket
import com.google.gson.annotations.SerializedName
import ru.energyhouse.energyhouse.utils.consts.CONNECT_VALUE
data class ConnectModel(
@SerializedName("operation")
val command: String = CONNECT_VALUE,
var token: String = ""
)
package ru.energyhouse.energyhouse.data.network.socket
data class ConnectResult(
var status: Status = Status.FAIL,
var message: String = ""
) {
enum class Status { IN_PROCESS, SUCCESS, FAIL }
}
package ru.energyhouse.energyhouse.data.network.socket.managers
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable
abstract class AbstractManager {
private var disposables = CompositeDisposable()
protected fun saveDisposable(disposable: Disposable) =
disposables.add(disposable)
protected fun releaseDisposables() =
disposables.clear()
}
package ru.energyhouse.energyhouse.data.network.socket.managers
import android.annotation.SuppressLint
import android.util.Log
import com.google.gson.Gson
import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.subjects.PublishSubject
import org.json.JSONException
import org.json.JSONObject
import ru.energyhouse.energyhouse.data.api.authentication.AuthenticateResultResponse
import ru.energyhouse.energyhouse.data.api.authentication.AuthenticationResponse
import ru.energyhouse.energyhouse.data.network.socket.ConnectModel
import ru.energyhouse.energyhouse.data.network.socket.MessageReader
import ru.energyhouse.energyhouse.data.repositories.socket.SocketApplicationDataSource
import ru.energyhouse.energyhouse.data.repositories.user_token.UserTokenRepository
import ru.energyhouse.energyhouse.domain.socket.SocketInteractor
import ru.energyhouse.energyhouse.utils.consts.*
import ru.energyhouse.energyhouse.utils.helpers.HashGeneratorHelper
class AuthenticationManager(
private val reader: MessageReader,
private val userTokenRepository: UserTokenRepository
) : AbstractManager() {
enum class AuthStatus { SUCCESS, IN_PROCESS, END }
private val gson = Gson()
private val connectResultSubject = PublishSubject.create<String>()
private val socketInteractor = SocketInteractor(SocketApplicationDataSource())
fun launchAuthListeners() {
subscribeAuthMessageListener()
subscribeResultMessageListener()
}
fun setInProcessStatus() {
authStatus = AuthStatus.IN_PROCESS
}
@SuppressLint("CheckResult")
private fun subscribeAuthMessageListener() {
saveDisposable(
reader.getMessageObservable()
.filter { message ->
Log.d("AuthBug", "Filter for connectByToken()")
isAuthenticationResponseMessage(message)
}
.map { gson.fromJson(it, AuthenticationResponse::class.java) }
.subscribeBy(
onNext = { authenticationResponse ->
Log.d("AuthBug", "connectByToken()")
connectByToken(authenticationResponse.token)
}
)
)
}
private fun connectByToken(tokenFromServer: String) {
val token = HashGeneratorHelper.getMd5(tokenFromServer +
userTokenRepository.getIntermediateHash())
socketInteractor.sendMessage(getConnectModel(token))
}
private fun getConnectModel(token: String): String =
gson.toJson(ConnectModel(token = token))
@SuppressLint("CheckResult")
private fun subscribeResultMessageListener() {
saveDisposable(
reader.getMessageObservable()
.filter { message ->
Log.d("AuthBug", "Filter with status")
isAuthenticationResultMessage(message)
}
.subscribeBy(
onNext = { message ->
if (hasMessageSuccessStatus(message)) {
saveCorrectLoginPasswordHash()
authStatus = AuthStatus.SUCCESS
finishAuthListeners()
}
connectResultSubject.onNext(message)
}
)
)
}
private fun saveCorrectLoginPasswordHash() {
val hash = userTokenRepository.getIntermediateHash() ?: ""
userTokenRepository.saveTotalHash(hash)
}
fun finishAuthListeners() {
Log.d("AuthBug", "release auth Disposables")
releaseDisposables()
}
fun resetAuth() {
authStatus = AuthStatus.END
}
private fun hasMessageSuccessStatus(message: String): Boolean {
try {
if (isAuthenticationResultMessage(message)) {
val result = gson.fromJson(message, AuthenticateResultResponse::class.java)
return result.status == STATUS_OK_VALUE
}
} catch (e: JSONException) {
e.printStackTrace()
}
return false
}
companion object {
var authStatus = AuthStatus.END
private set
fun isAuthenticationResponseMessage(message: String): Boolean {
try {
val jsonObject = JSONObject(message)
return jsonObject.has(OPERATION_KEY) &&
jsonObject.getString(OPERATION_KEY) == AUTHORIZATION_TOKEN_VALUE
&& jsonObject.has(TOKEN_KEY)
} catch (e: JSONException) {
e.printStackTrace()
}
return false
}
fun isAuthRequestMessage(message: String): Boolean {
try {
val jsonObject = JSONObject(message)
return jsonObject.has(OPERATION_KEY) &&
(jsonObject.getString(OPERATION_KEY) == CONNECT_VALUE)
} catch (e: JSONException) {
e.printStackTrace()
}
return false
}
fun isNotAuthResponseMessage(message: String): Boolean =
!isAuthenticationResponseMessage(message)
fun isNotAuthRequestMessage(message: String): Boolean =
!isAuthRequestMessage(message)
fun isAuthenticationResultMessage(message: String): Boolean {
try {
val jsonObject = JSONObject(message)
return jsonObject.has(OPERATION_KEY) &&
(jsonObject.getString(OPERATION_KEY) == AUTHORIZATION_VALUE ||
jsonObject.getString(OPERATION_KEY) == CONNECT_VALUE) &&
jsonObject.has(STATUS_KEY)
} catch (e: JSONException) {
e.printStackTrace()
}
return false
}
}
}
package ru.energyhouse.energyhouse.data.network.socket.managers
import android.util.Log
import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers
import io.reactivex.subjects.PublishSubject
import ru.energyhouse.energyhouse.data.network.socket.MessageReader
class CommandManager : AbstractManager() {
private val messagePoolSubject = PublishSubject.create<String>()
private var reader: MessageReader? = null
fun launchMessageListeners(reader: MessageReader) {
this.reader = reader
subscribeMessagePool()
}
private fun subscribeMessagePool() {
if (reader != null) {
reader!!
.getMessageObservable()
.subscribeOn(Schedulers.io())
.filter { message ->
Log.d("AuthBug", "Filter for CommandManager")
AuthenticationManager.isNotAuthResponseMessage(message)
}
.subscribe(messagePoolSubject)
}
}
fun getMessageObservable(): Observable<String> =
messagePoolSubject
}
package ru.energyhouse.energyhouse.data.network.socket
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import ru.energyhouse.energyhouse.data.network.socket.managers.AuthenticationManager
class MessageAccessController {
suspend fun waitPermissionToSend() = GlobalScope.launch {
while (isPendingMessage()) {
}
}.join()
companion object {
fun isPendingMessage(): Boolean =
AuthenticationManager.authStatus == AuthenticationManager.AuthStatus.IN_PROCESS
fun isAuthorized(): Boolean =
AuthenticationManager.authStatus == AuthenticationManager.AuthStatus.SUCCESS
}
}
package ru.energyhouse.energyhouse.data.network.socket
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import ru.energyhouse.energyhouse.data.network.socket.SocketTransceiver.END_DELIMITER
import ru.energyhouse.energyhouse.data.network.socket.SocketTransceiver.START_DELIMITER
import ru.energyhouse.energyhouse.data.network.socket.managers.AuthenticationManager
import java.io.DataInputStream
import java.net.Socket
import java.net.SocketException
import java.nio.charset.StandardCharsets
import kotlin.coroutines.CoroutineContext
class MessageReader(private val socket: Socket) : CoroutineScope {
override val coroutineContext: CoroutineContext = Dispatchers.IO
private val messageSubject = PublishSubject.create<String>()
init {
startMessageListening()
}
fun getMessageObservable(): Observable<String> =
messageSubject.share()
private fun startMessageListening() = launch {
try {
val buffer = ByteArray(20000)
var messageQuery: String
var messageCache = ""
val byteArrayList = mutableListOf<Byte>()
var read: Int
val dataInputStream = DataInputStream(socket.getInputStream())
while (true) {
read = dataInputStream.read(buffer)
if (read == -1) break
byteArrayList.addAll(buffer.sliceArray(0 until read).asList())
messageQuery = String(byteArrayList.toByteArray(), StandardCharsets.UTF_8)
if (hasCompletedMessage(messageQuery)) {
byteArrayList.clear()
messageCache += messageQuery
while (hasCompletedMessage(messageCache)) {
messageCache = getLeftTrimmedMessage(messageCache)
val message = getMessageBody(messageCache)
delayIfIsAuthenticationMessage(message)
messageSubject.onNext(message)
messageCache = getNextPartOfMessage(messageCache)
}
}
}
} catch (e: SocketException) {
SocketTransceiver.closeConnection()
e.printStackTrace()
}
}
/* Необходима задержка для первого сообщения, чтобы наблюдатели,
получающие сообщения, успели подписаться */
private fun delayIfIsAuthenticationMessage(message: String) {
if (AuthenticationManager.isAuthenticationResponseMessage(message)) {
Thread.sleep(500)
}
}
private fun hasCompletedMessage(message: String): Boolean {
return message.contains(START_DELIMITER) && message.contains(END_DELIMITER)
}
private fun getLeftTrimmedMessage(message: String): String {
return message.substring(message.indexOf(START_DELIMITER))
}
private fun getMessageBody(message: String): String {
val starIndex = message.indexOf(START_DELIMITER)
val sharpIndex = message.indexOf(END_DELIMITER)
return message.substring(starIndex + 1, sharpIndex)
}
private fun getNextPartOfMessage(message: String): String {
return message.substring(message.indexOf(END_DELIMITER) + 1)
}
}
package ru.energyhouse.energyhouse.data.network.socket
import ru.energyhouse.energyhouse.data.network.socket.SocketTransceiver.END_DELIMITER
import ru.energyhouse.energyhouse.data.network.socket.SocketTransceiver.START_DELIMITER
import ru.energyhouse.energyhouse.data.repositories.socket.MessageSendingResult
import java.io.BufferedWriter
import java.io.OutputStreamWriter
import java.net.Socket
import java.net.SocketException
class MessageWriter(socket: Socket) {
private var bufferedWriter: BufferedWriter? = null
init {
bufferedWriter = BufferedWriter(OutputStreamWriter(socket.getOutputStream()))
}
fun send(message: String): MessageSendingResult {
return try {
trySend(message)
MessageSendingResult.SUCCESS
} catch (e: SocketException) {
e.printStackTrace()
MessageSendingResult.FAIL
}
}
private fun trySend(message: String) {
bufferedWriter?.run {
write(START_DELIMITER + message + END_DELIMITER)
flush()
} ?: throw SocketException()
}
fun closeConnection() {
bufferedWriter = null
}
}
package ru.energyhouse.energyhouse.data.network.socket
import android.arch.lifecycle.Observer
import ru.energyhouse.energyhouse.data.network.connection.ConnectionLiveData
import ru.energyhouse.energyhouse.data.network.socket.connection_states.*
import ru.energyhouse.energyhouse.data.repositories.socket.SocketAddressRepository
import java.net.Socket
class SocketConnection {
var connectionLive: ConnectionLiveData? = null
var hasNetworkConnection = true
private set
var socket: Socket = Socket()
private set
private val socketAddressRepository = SocketAddressRepository()
private val connectionStateObserver =
Observer<ConnectionLiveData.ServerConnectionType> { serverConnectionType ->
hasNetworkConnection =
serverConnectionType == ConnectionLiveData.ServerConnectionType.ONLINE
}
fun setConnectionStateListener(connectionLiveData: ConnectionLiveData) {
connectionLive = connectionLiveData
connectionLive?.observeForever(connectionStateObserver)
}
fun connect(): ConnectResult {
updateSocket()
val initConnectionState = LocalConnection(socket, socketAddressRepository)
return connectWithConnectionState(connectionState = initConnectionState)
}
private fun updateSocket() {
socket = Socket()
}
private fun connectWithConnectionState(connectionState: AbstractConnectionState): ConnectResult {
val connectResult = connectionState.connect()
return if (hasNextState(connectResult, connectionState)) {
val newState = updateState(connectionState)
connectWithConnectionState(newState)
} else {
connectResult
}
}
private fun hasNextState(
connectResult: ConnectResult,
state: AbstractConnectionState
): Boolean =
isNotTerminalStateWithFailStatus(connectResult, state)
private fun isNotTerminalStateWithFailStatus(
connectResult: ConnectResult,
state: AbstractConnectionState
): Boolean =
state !is TerminalState && connectResult.status == ConnectResult.Status.FAIL
private fun updateState(currentState: AbstractConnectionState): AbstractConnectionState {
updateSocket()
return when {
currentState is LocalConnection && hasNetworkConnection -> GlobalConnection(
socket,
socketAddressRepository
)
currentState is LocalConnection && !hasNetworkConnection -> NoNetworkConnection(socket)
else -> NoNetworkConnection(socket)
}
}
}
package ru.energyhouse.energyhouse.data.network.socket
import android.arch.lifecycle.Observer
import android.util.Log
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
import io.reactivex.subjects.Subject
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import ru.energyhouse.energyhouse.data.network.connection.ConnectionLiveData
import ru.energyhouse.energyhouse.data.network.socket.managers.AuthenticationManager
import ru.energyhouse.energyhouse.data.network.socket.managers.CommandManager
import ru.energyhouse.energyhouse.data.repositories.socket.MessageSendingResult
import ru.energyhouse.energyhouse.data.repositories.user_token.UserTokenRepository
import java.io.IOException
import java.net.Socket
object SocketTransceiver {
const val START_DELIMITER = "<"
const val END_DELIMITER = ">"
private var socket: Socket? = null
private var messageWriter: MessageWriter? = null
private var authorizationManager: AuthenticationManager? = null
private var commandManager: CommandManager = CommandManager()
private val connectionStateSubject = PublishSubject.create<ConnectResult>()
private val synchObjectKey = Any()
private val socketConnection = SocketConnection()
private val connectionObserver = Observer<ConnectionLiveData.ServerConnectionType> {
when (it) {
ConnectionLiveData.ServerConnectionType.ONLINE -> openConnectionIfClosed()
else -> closeConnection()
}
}
var connectionLive: ConnectionLiveData? = null
fun openConnectionIfClosed() = GlobalScope.launch(Dispatchers.IO) {
synchronized(synchObjectKey) {
if (!isOpenConnection()) {
openConnection()
}
}
}
private fun isOpenConnection(): Boolean {
return socket != null && !socket!!.isClosed && socket!!.isConnected
}
fun openConnection() = GlobalScope.launch(Dispatchers.IO) {
connectionStateSubject.onNext(
ConnectResult(
status = ConnectResult.Status.IN_PROCESS
)
)
val connectResult = socketConnection.connect()
Log.d("AuthBug", "socket connected")
connectionStateSubject.onNext(connectResult)
if (connectResult.status == ConnectResult.Status.SUCCESS) {
Log.d("AuthBug", "saveAndApplySocket")
saveAndApplySocket(socketConnection.socket)
}
}
private fun saveAndApplySocket(socket: Socket) {
this.socket = socket
messageWriter = MessageWriter(socket)
val messageReader = MessageReader(socket)
initAuthorizationManager(messageReader)
initCommandManager(messageReader)
}
fun setConnectionStateLiveData(connectionLiveData: ConnectionLiveData) {
connectionLive = connectionLiveData
connectionLiveData.run {
removeObserver(connectionObserver)
// observeForever(connectionObserver)
}
socketConnection.setConnectionStateListener(connectionLive!!)
}
fun getConnectionStateSubject(): Subject<ConnectResult> =
connectionStateSubject
fun getMessagePool(): Observable<String> =
commandManager.getMessageObservable()
private fun initAuthorizationManager(messageReader: MessageReader) {
Log.d("AuthBug", "iintAuthorizationManager")
authorizationManager = AuthenticationManager(
messageReader, UserTokenRepository()
)
authorizationManager?.setInProcessStatus()
authorizationManager?.launchAuthListeners()
}
private fun initCommandManager(messageReader: MessageReader) =
commandManager.launchMessageListeners(messageReader)
fun sendMessageToServer(message: String): MessageSendingResult {
return if (isAuthInProcess() && isNotAuthRequestMessage(message)) {
Log.d("AuthBug", "WAIT_AUTH mode ($message)")
MessageSendingResult.WAIT_AUTH
} else {
Log.d("AuthBug", "NO WAIT_AUTH mode. Message sending... ($message)")
messageWriter?.send(message) ?: MessageSendingResult.FAIL
}
}
private fun isAuthInProcess(): Boolean =
AuthenticationManager.authStatus == AuthenticationManager.AuthStatus.IN_PROCESS
private fun isNotAuthRequestMessage(message: String): Boolean =
AuthenticationManager.isNotAuthRequestMessage(message)
fun closeConnection() {
if (isOpenConnection()) {
try {
socket?.close()
} catch (e: IOException) {
e.printStackTrace()
}
messageWriter?.closeConnection()
authorizationManager?.finishAuthListeners()
authorizationManager?.resetAuth()
socket = null
}
}
fun hasNetworkConnection(): Boolean =
connectionLive?.value == ConnectionLiveData.ServerConnectionType.ONLINE
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment