Skip to content

Instantly share code, notes, and snippets.

@kakai248
Last active August 22, 2020 08:21
Embed
What would you like to do?
class UploadFilesUseCase(
private val schedulerProvider: SchedulerProvider,
private val httpManager: HttpManager
) {
private var operation: Completable? = null
fun uploadFiles(): Completable = synchronized(this) {
operation
?: (doUploadFiles()
.doFinally { operation = null }
.cache()
.also { operation = it })
}
private fun doUploadFiles(): Completable =
Singles
.zip(
processAndMergeFilesOfTypeA().subscribeOn(schedulerProvider.io),
processAndMergeFilesOfTypeB().subscribeOn(schedulerProvider.io)
)
.flatMap { (fileA, fileB) -> compressMergedFiles(fileA, fileB) }
.flatMap(::uploadFileToServer)
.ignoreElement()
.doOnComplete(::deleteTempDir)
.doOnError { deleteTempDir() }
.retry(MAX_RETRIES)
.observeOn(schedulerProvider.ui)
private fun uploadFileToServer(archive: File) =
httpManager.uploadFiles(archive)
.map { response ->
if (response.code / 100 != 2) {
throw OperationFailedException()
}
}
private fun processAndMergeFilesOfTypeA(): Single<File> = Single.just(File(""))
private fun processAndMergeFilesOfTypeB(): Single<File> = Single.just(File(""))
private fun compressMergedFiles(fileA: File, fileB: File): Single<File> = Single.just(File(""))
private fun deleteTempDir() {}
companion object {
private const val MAX_RETRIES = 3L
}
}
class HttpManager {
fun uploadFiles(archive: File): Single<Response> = Single.just(Response(200, byteArrayOf()))
}
class Response(
val code: Int,
val body: ByteArray
)
class OperationFailedException : Throwable()
@Dimezis
Copy link

Dimezis commented May 6, 2020

A more reactive alternative without synchronization and extra state. Kind of a pseudo code sketch of an idea, so forgive me if something is not really working.

class UploadFilesUseCase(
    private val schedulerProvider: SchedulerProvider,
    private val httpManager: HttpManager
) {

    private val sharedStream = uploadStream().share()

    fun uploadFiles(): Completable = sharedStream.ignoreElements()

    private fun uploadStream(): Observable<Unit> =
        Singles
            .zip(
                processAndMergeFilesOfTypeA().subscribeOn(schedulerProvider.io),
                processAndMergeFilesOfTypeB().subscribeOn(schedulerProvider.io)
            )
            .flatMap { (fileA, fileB) -> compressMergedFiles(fileA, fileB) }
            .flatMap(::uploadFileToServer)
            .toObservable()
            .doOnComplete(::deleteTempDir)
            .doOnError { deleteTempDir() }
            .retry(MAX_RETRIES)
            .observeOn(schedulerProvider.ui)

    private fun uploadFileToServer(archive: File) =
        httpManager.uploadFiles(archive)
            .map { response ->
                if (response.code() / 100 != 2) {
                    throw Exception()
                }
            }

    private fun processAndMergeFilesOfTypeA(): Single<File> = Single.just(File(""))

    private fun processAndMergeFilesOfTypeB(): Single<File> = Single.just(File(""))

    private fun compressMergedFiles(fileA: File, fileB: File): Single<File> = Single.just(File(""))

    private fun deleteTempDir() {}

    companion object {
        private const val MAX_RETRIES = 3L
    }
}

@techyourchance
Copy link

Looks really dope, even though I don't know enough RxJava to make sure it implements the same requirements.
Could you also add hooks for logging of exceptions (potentially thrown during processing) and of concurrent invocation attemps?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment