Skip to content

Instantly share code, notes, and snippets.

@ATizik
Last active May 6, 2020 20:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ATizik/af0f19f2182d0b2b96a24afbebd154f1 to your computer and use it in GitHub Desktop.
Save ATizik/af0f19f2182d0b2b96a24afbebd154f1 to your computer and use it in GitHub Desktop.
Coroutines with Flow example
class UploadFilesUsecase() {
private val MAX_RETRIES = 3L
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
private val atomic = atomic<Flow<File>?>(null)
suspend fun uploadFiles(): Flow<File> {
atomic.value?.let { return it }
val channel = ConflatedBroadcastChannel<File>()
atomic.value = channel.asFlow()
val fileA = flow { emit(processAndMergeFilesOfTypeA()) }
.retry(MAX_RETRIES)
.cacheConflate(scope)
val fileB = flow { emit(processAndMergeFilesOfTypeB()) }
.retry(MAX_RETRIES)
.cacheConflate(scope)
val result = fileA.zip(fileB) { a, b -> listOf(a, b) }
.map { compressMergedFiles(it) }.retry(MAX_RETRIES).cacheConflate(scope)
.onEach {
uploadFileToServer(it)
channel.send(it)
}.retry(MAX_RETRIES)
.onCompletion {
atomic.value = null
deleteTempDir()
}
return result
}
private suspend fun uploadFileToServer(archive: File) = suspendCoroutine<Int> { cont ->
HttpManager.uploadFiles(archive,
onDone = { code: Int, body: ByteArray ->
if (code / 100 == 2) {
cont.resume(code)
} else {
cont.resumeWithException(Throwable())
}
},
onFailure = {
cont.resumeWithException(Throwable())
}
)
}
private suspend fun processAndMergeFilesOfTypeA(): File = TODO()
private suspend fun processAndMergeFilesOfTypeB(): File = TODO()
private suspend fun compressMergedFiles(files: List<File>): File = TODO()
fun deleteTempDir(): Unit = TODO()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment