Skip to content

Instantly share code, notes, and snippets.

@glureau-betclic
Last active May 6, 2020 21:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save glureau-betclic/ec1e86a3206112459ac659b226d817c1 to your computer and use it in GitHub Desktop.
Save glureau-betclic/ec1e86a3206112459ac659b226d817c1 to your computer and use it in GitHub Desktop.
TechYourChance / Concurrency Frameworks in Android are overrated / Kotlin+RxJava
// Original code: https://www.techyourchance.com/concurrency-frameworks-overrated-android/
import io.reactivex.Single
import io.reactivex.disposables.Disposable
import io.reactivex.functions.BiFunction
import io.reactivex.functions.Consumer
import io.reactivex.schedulers.Schedulers
import java.io.File
import java.io.IOException
import java.util.concurrent.CountDownLatch
import kotlin.random.Random
fun main() {
val latch = CountDownLatch(1)
UploadFilesUseCase()
.uploadFiles(Consumer { state ->
println("Resulting state $state received on " + Thread.currentThread().name)
latch.countDown()
})
latch.await()
}
class UploadFilesUseCase {
enum class OperationState { UPLOADED, FAILED }
private var disposable: Disposable? = null
fun uploadFiles(listenerOnUiThread: Consumer<OperationState>) {
if (disposable?.isRunning == true) {
// Log attempt
return
}
disposable = Single.zip(
ioSingleFrom(::processAndMergeFilesOfTypeA),
ioSingleFrom(::processAndMergeFilesOfTypeB),
BiFunction<File, File, File> { aResult: File, bResult: File ->
compressMergedFiles(aResult, bResult)
})
.flatMap(::uploadResultSingle)
.doOnEvent { _, _ -> deleteTempDir() } // Success or Error
.retry(2)
.map { OperationState.UPLOADED }
.onErrorReturnItem(OperationState.FAILED)
.observeOn(Schedulers.computation()) // Or .observeOn(AndroidSchedulers.mainThread()) when on Android
.subscribe(listenerOnUiThread)
}
}
private val Disposable.isRunning
get() = isDisposed.not()
// Code mapping a non Rx library to a Rx application. So not required if using Rx wrappers with your libs.
private fun <T> ioSingleFrom(method: () -> T) = Single.fromCallable(method).subscribeOn(Schedulers.io())
private fun processAndMergeFilesOfTypeA() = workAndReturnFile("A")
private fun processAndMergeFilesOfTypeB() = workAndReturnFile("B")
private fun compressMergedFiles(fileA: File, fileB: File) = workAndReturnFile("C")
private fun uploadResultSingle(compressedFile: File) = ioSingleFrom { uploadResult(compressedFile) }
// -------------------------------------------------------------------------------------
// Dummy implementations for you to be able to understand the threading
// -------------------------------------------------------------------------------------
fun workAndReturnFile(path: String): File {
println("Thread: " + Thread.currentThread().name + " start working on $path")
Thread.sleep(1000)
randomizedErrors(path)
println("Thread: " + Thread.currentThread().name + " $path DONE")
return File(path)
}
private fun uploadResult(compressedFile: File) {
println("Uploading on: " + Thread.currentThread().name + " ($compressedFile)")
Thread.sleep(2000)
randomizedErrors(compressedFile.path)
println("Uploaded from: " + Thread.currentThread().name + " ($compressedFile)")
}
private fun randomizedErrors(path: String) {
if (Random.nextInt(0, 3) == 0) {
println("-------- Unlucky crash appears while processing $path")
throw IOException("random error")
}
}
private fun deleteTempDir() {
println("Deleting files...")
}
/**
* Example of output:
*
Thread: RxCachedThreadScheduler-2 start working on B
Thread: RxCachedThreadScheduler-1 start working on A
Thread: RxCachedThreadScheduler-1 A DONE
-------- Unlucky crash appears while processing B
Deleting files...
Thread: RxCachedThreadScheduler-1 start working on A
Thread: RxCachedThreadScheduler-2 start working on B
Thread: RxCachedThreadScheduler-1 A DONE
Thread: RxCachedThreadScheduler-2 B DONE
Thread: RxCachedThreadScheduler-2 start working on C
Thread: RxCachedThreadScheduler-2 C DONE
Uploading on: RxCachedThreadScheduler-1 (C)
Uploaded from: RxCachedThreadScheduler-1 (C)
Deleting files...
Resulting state UPLOADED received on RxComputationThreadPool-1
*
* And if 3 unlucky errors:
*
Thread: RxCachedThreadScheduler-1 start working on A
Thread: RxCachedThreadScheduler-2 start working on B
-------- Unlucky crash appears while processing A
Thread: RxCachedThreadScheduler-2 B DONE
Deleting files...
Thread: RxCachedThreadScheduler-2 start working on A
Thread: RxCachedThreadScheduler-3 start working on B
-------- Unlucky crash appears while processing B
Thread: RxCachedThreadScheduler-2 A DONE
Deleting files...
Thread: RxCachedThreadScheduler-2 start working on B
Thread: RxCachedThreadScheduler-1 start working on A
Thread: RxCachedThreadScheduler-2 B DONE
Thread: RxCachedThreadScheduler-1 A DONE
Thread: RxCachedThreadScheduler-1 start working on C
Thread: RxCachedThreadScheduler-1 C DONE
Uploading on: RxCachedThreadScheduler-3 (C)
-------- Unlucky crash appears while processing C
Deleting files...
Resulting state FAILED received on RxComputationThreadPool-1
*/
@Zhuinden
Copy link

Zhuinden commented May 6, 2020

private fun <T> ioSingleFrom(method: () -> T) = Single.create<T> { emitter ->
    try {
        emitter.onSuccess(method())
    } catch (throwable: Throwable) {
        emitter.onError(throwable)
    }
}.subscribeOn(Schedulers.io())

this is equivalent to

private fun <T> ioSingleFrom(method: () -> T) = Single.fromCallable { method() }.subscribeOn(Schedulers.io())

You might even be able to replace it with just

private fun <T> ioSingleFrom(method: () -> T) = Single.fromCallable(method).subscribeOn(Schedulers.io())

@Zhuinden
Copy link

Zhuinden commented May 6, 2020

 if (disposable?.isDisposed == true) {

I don't think this is safe. Once you successfully run this method, it will forever be silently no-op.

I think the no-op should occur only while disposable is available, and otherwise set disposable to null in doFinally {.

@glureau-betclic
Copy link
Author

Damn, you're right, thanks! Next time I'll write some unit tests before posting a public gist 🤣

I had some issues with setting the nullability on disposable when I provide streams that can be consumed in multiple places. Sometimes a stream is completed (end of stream, RxAndroidLifecycle = terminal event), sometimes finishes with an error (terminal event), and sometimes it's manually disposed (dispose() -> no terminal event), and if the 2 methods doOnTerminate+doOnDispose are not defined to null the disposable, then it depends of the type of cancellation.

To avoid that, I usually prefer an extension function like

private val Disposable.isRunning
    get() = isDisposed.not()

And a condition like this:

    if (disposable?.isRunning == true) {
        // Log attempt
        return
    }

I find it more explicit on the intent (if it's already running, then stop), and I don't have to care about releasing a Disposable object (low memory footprint anyway).

What do you think?

Gist is now fixed, but for future readers, this is nothing more than a support for a Twitter conversation. 😉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment