Skip to content

Instantly share code, notes, and snippets.

@sphrak
Last active January 6, 2023 14:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sphrak/d3a60f33b3ba8f51a77be5fb77bf0e27 to your computer and use it in GitHub Desktop.
Save sphrak/d3a60f33b3ba8f51a77be5fb77bf0e27 to your computer and use it in GitHub Desktop.
ktor websocket flow retry mechanism
@Serialized
data class MessageDto(
@SerialName(value = "id")
val id: String,
@SerialName(value = "message")
val message: String
)
class MessageService constructor(
private val httpClient: HttpClient
) {
private val sendMessageFlow: Flow<MessageDto> = MutableSharedFlow()
fun sendMessage(messageDto) {
sendMessageFlow.emit(messageDto)
}
fun messages(): Flow<MessageDto> = flow {
httpClient.wss(
method = HttpMethod.Get,
host = getHostname(),
port = getPort(),
path = "/v1/messages"
) {
coroutineScope {
sendMessageFlow
.conflate()
.map(::sendSerialized)
.launchIn(this)
while (true) {
emit(receiveDeserialized<MessageDto>())
}
}
}
}.retryWithBackoff()
.flowOn(coroutineDispatcher)
}
inline val defaultExceptions: Set<Class<out Exception>>
get() = setOf(
ClosedReceiveChannelException::class.java,
ConnectException::class.java,
UnresolvedAddressException::class.java
)
/**
* Retries a `Flow<T>` indefinitely with exponential backoff
*
* @param delay -- delay in seconds between retries
* @param delayAtMost -- maximum delay in seconds after which this value is used to delay
*/
public fun <T> Flow<T>.retryWithBackoff(
delay: Int = 1,
delayAtMost: Int = 10,
exceptions: Set<Class<out Exception>> = defaultExceptions
): Flow<T> {
var _delay = delay
return retryWhen { cause, attempt ->
val shouldRetry = exceptions.any { (cause::class.java == it) }
if (shouldRetry) {
delay(_delay.seconds.inWholeMilliseconds)
val next = _delay * 2
_delay = if (next < delayAtMost) {
next
} else delayAtMost
true
} else false
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment