Skip to content

Instantly share code, notes, and snippets.

@kakai248 kakai248/UseCase.kt

Last active May 8, 2020
Embed
What would you like to do?
class UploadFilesUseCase(
private val schedulerProvider: SchedulerProvider,
private val httpManager: HttpManager
) {
private var operation: Completable? = null
fun uploadFiles(): Completable = synchronized(this) {
operation
?: (doUploadFiles()
.doFinally { operation = null }
.cache()
.also { operation = it })
}
private fun doUploadFiles(): Completable =
Singles
.zip(
processAndMergeFilesOfTypeA().subscribeOn(schedulerProvider.io),
processAndMergeFilesOfTypeB().subscribeOn(schedulerProvider.io)
)
.flatMap { (fileA, fileB) -> compressMergedFiles(fileA, fileB) }
.flatMap(::uploadFileToServer)
.ignoreElement()
.doOnComplete(::deleteTempDir)
.doOnError { deleteTempDir() }
.retry(MAX_RETRIES)
.observeOn(schedulerProvider.ui)
private fun uploadFileToServer(archive: File) =
httpManager.uploadFiles(archive)
.map { response ->
if (response.code / 100 != 2) {
throw OperationFailedException()
}
}
private fun processAndMergeFilesOfTypeA(): Single<File> = Single.just(File(""))
private fun processAndMergeFilesOfTypeB(): Single<File> = Single.just(File(""))
private fun compressMergedFiles(fileA: File, fileB: File): Single<File> = Single.just(File(""))
private fun deleteTempDir() {}
companion object {
private const val MAX_RETRIES = 3L
}
}
class HttpManager {
fun uploadFiles(archive: File): Single<Response> = Single.just(Response(200, byteArrayOf()))
}
class Response(
val code: Int,
val body: ByteArray
)
class OperationFailedException : Throwable()
@Dimezis

This comment has been minimized.

Copy link

Dimezis commented May 6, 2020

A more reactive alternative without synchronization and extra state. Kind of a pseudo code sketch of an idea, so forgive me if something is not really working.

class UploadFilesUseCase(
    private val schedulerProvider: SchedulerProvider,
    private val httpManager: HttpManager
) {

    private val sharedStream = uploadStream().share()

    fun uploadFiles(): Completable = sharedStream.ignoreElements()

    private fun uploadStream(): Observable<Unit> =
        Singles
            .zip(
                processAndMergeFilesOfTypeA().subscribeOn(schedulerProvider.io),
                processAndMergeFilesOfTypeB().subscribeOn(schedulerProvider.io)
            )
            .flatMap { (fileA, fileB) -> compressMergedFiles(fileA, fileB) }
            .flatMap(::uploadFileToServer)
            .toObservable()
            .doOnComplete(::deleteTempDir)
            .doOnError { deleteTempDir() }
            .retry(MAX_RETRIES)
            .observeOn(schedulerProvider.ui)

    private fun uploadFileToServer(archive: File) =
        httpManager.uploadFiles(archive)
            .map { response ->
                if (response.code() / 100 != 2) {
                    throw Exception()
                }
            }

    private fun processAndMergeFilesOfTypeA(): Single<File> = Single.just(File(""))

    private fun processAndMergeFilesOfTypeB(): Single<File> = Single.just(File(""))

    private fun compressMergedFiles(fileA: File, fileB: File): Single<File> = Single.just(File(""))

    private fun deleteTempDir() {}

    companion object {
        private const val MAX_RETRIES = 3L
    }
}
@techyourchance

This comment has been minimized.

Copy link

techyourchance commented May 6, 2020

Looks really dope, even though I don't know enough RxJava to make sure it implements the same requirements.
Could you also add hooks for logging of exceptions (potentially thrown during processing) and of concurrent invocation attemps?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.