Skip to content

Instantly share code, notes, and snippets.

@satyamagarwal
Created July 29, 2019 15:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save satyamagarwal/336f02954fa4dc369fdea6524f593d1a to your computer and use it in GitHub Desktop.
Save satyamagarwal/336f02954fa4dc369fdea6524f593d1a to your computer and use it in GitHub Desktop.
val LOG: Logger = LoggerFactory.getLogger("ImportLogger")
sealed class Errors : Throwable() {
object NotFound : Errors()
}
fun get(): IO<List<User>> {
return dataSource1
.openConnection()
.bracketCase(
release = { handle, exitCase -> handle.closeConnection(exitCase) },
use = { handle -> userRepository.getUsers(handle) }
)
.flatMap { list ->
when {
list.isEmpty() -> IO.raiseError(Errors.NotFound)
else -> IO.just(list)
}
}
}
fun insert(user: User): IO<Profile> {
return dataSource2
.beginTransaction()
.bracketCase(
release = { handle, exitCase -> handle.closeTransaction(exitCase) },
use = { handle -> profileRepository.insert(user).raiseIfNull { Errors.NotFound } }
)
}
fun import() {
unsafe {
val allInsertedUsers: List<List<Profile>> = runBlocking {
IO
.fx {
val userBatches: List<List<User>> =
get()
.bind()
.sortedBy { user -> user.id }
.chunked(1000)
val insertedUsers: List<List<User>> = userBatches
.parTraverse { userBatch ->
LOG.info("Inserting users from ${userBatch.first().id} to ${userBatch.last().id}")
userBatch.parTraverse { user -> insert(user) }
}
.bind()
insertedUsers
}
}
allInsertedUsers
}
}
@satyamagarwal
Copy link
Author

satyamagarwal commented Jul 29, 2019

After playing and refactoring a lot, I was able to make this program which works as it should.
But There are few things that I don't understand on why its working ( or how it is even working).

  1. On line #38, bind's implementation takes me to arrow.typeclasses.MonadContinuation, whose field context defaults to EmptyCoroutineContext. Should I assume that this executed with Dispatchers.Default and that is not on the current thread?

  2. On line #43, I am traversing over a list of lists, which I assume is asynchronous, and then I traverse over each list, that operation I assume also asynchronous(separately) but I assume each chunk will finish and then the next chunk will be processed. Is it this true ?

  3. Is it the same if I don't make any chunks and process one big list with just one parTraverse ?

Next question is based on assumption that each chunk is process after the previous finishes.
4. I assume Log at line #44 should somehow be bound with the operation to print when the chunk actually starts to process. I couldn't figure out how. Can you help me ?

  1. fx's NonBlocking uses coroutine's Dispatcher.Default in the arrow implementation, How can I run this operation in coroutine's Dispatchers.IO ?

  2. After I call the function in my main() method, main() method does not exits. I was expecting after the computation is finished, it should. Can you help me understand why this does not happen?

  3. I want to convert unsafe and runBlocking blocks to function chain. Is it possible to achieve that. I tried many things by reading their code implementation like using arrow.fx.IO.unsafeRun().run { ... }, but I could not make it work. Can you please help me?

  4. Do you see anything wrong with how I have used arrow here ?

@nomisRev
Copy link

nomisRev commented Jul 29, 2019

  1. On line #38, bind's implementation takes me to arrow.typeclasses.MonadContinuation, whose field context defaults to EmptyCoroutineContext. Should I assume that this executed with Dispatchers.Default and that is not on the current thread?

No, that assumption is incorrect. bind runs on the current thread, if you want to switch thread use continueOn.

  1. On line #43, I am traversing over a list of lists, which I assume is asynchronous, and then I traverse over each list, that operation I assume also asynchronous(separately) but I assume each chunk will finish and then the next chunk will be processed. Is it this true ?
val insertedUsers: List<List<User>> = userBatches.parTraverse { userBatch ->
  userBatch.parTraverse { user -> insert(user) }
}.bind()

parTraverse gathers all its operations and composes them using parMapN. Or in this case it folds over the list and combines all the IOs using parMap(ioa, iob) { a: Profile, b: List<Profile> -> a + b }. If you want only the batches to run in parallel you should use traverse for the chunked list, which is the sequential variant of parTraverse.

val insertedUsers: List<List<User>> = userBatches.traverse { userBatch ->
  userBatch.parTraverse { user -> insert(user) }
}.bind()
  1. Is it the same if I don't make any chunks and process one big list with just one parTraverse ?

In the current example yes, see answer to question 2.

  1. I assume Log at line #44 should somehow be bound with the operation to print when the chunk actually starts to process. I couldn't figure out how. Can you help me ?

If you want things to be pure wrap them in IO ;) Reverse the order if you want to print when finished inserting.

                    val insertedUsers: List<List<User>> = userBatches
                        .parTraverse { userBatch ->
                            IO.effect { LOG.info("Inserting users from ${userBatch.first().id} to ${userBatch.last().id}") }
                              .followedBy(userBatch.parTraverse { user -> insert(user) })
                        }
                        .bind()
  1. fx's NonBlocking uses coroutine's Dispatcher.Default in the arrow implementation, How can I run this operation in coroutine's Dispatchers.IO ?

That's actually incorrect. Fx's NonBlocking is a custom wrapper of ForkJoinPool, just like Dispatchers.Default. We currently do not have an Dispatchers.IO alternative, but we include it soon. It should be relatively easy to write your own dispatcher, and I'd gladly help you with it. Anyhow to run any IO (or F) on any CoroutineContext do IO.unit.continueOn(Dispatchers.IO).followedBy(myIO).

            IO.fx {
                    val userBatches: List<List<User>> =
                        get()
                        .bind()
                        .sortedBy { user -> user.id }
                        .chunked(1000)
                    
                    val insertedUsers: List<List<User>> = userBatches
                        .parTraverse { userBatch ->
                            LOG.info("Inserting users from ${userBatch.first().id} to ${userBatch.last().id}")

                            userBatch.parTraverse { user -> IO.unit.continueOn(Dispatchers.IO).followedBy(insert(user)) }
                           // userBatch.parTraverse { user -> Dispatchers.IO.shift().followedBy(insert(user)) } // Alias
                        }
                        .bind()

                    insertedUsers
                }
  1. After I call the function in my main() method, main() method does not exits. I was expecting after the computation is finished, it should. Can you help me understand why this does not happen?

Sounds like something is hanging on a dispatcher. I'd need to dive deeper into the code to check this, can you try playing with different dispatchers?

  1. I want to convert unsafe and runBlocking blocks to function chain. Is it possible to achieve that. I tried many things by reading their code implementation like using arrow.fx.IO.unsafeRun().run { ... }, but I could not make it work. Can you please help me?

Suggestions welcome... For IO you should be able to call fix().unsafeRunSync() or fix().unsafeRunAsync { }, you can make such an alias yourself like this. fun <A> IOOf<A>.unsafeRunBlocking(): A = IO.unsafeRun().run { runBlocking { fix() } }

  1. Do you see anything wrong with how I have used arrow here ?

The bracket code looks good 👍 I prefer expression bodies tho :p Only remark would be the Log, I'd wrap that in IO.effect and that would've probably spared you the problems.

@pakoito
Copy link

pakoito commented Jul 29, 2019

I'm 👌with all of Simon's suggestions.

userBatch.parTraverse { user -> IO.unit.continueOn(Dispatchers.IO).followedBy(insert(user)) }

Try IO.delay(Dispatchers.IO) { insert(user) }, IIRC we kept it.

@satyamagarwal
Copy link
Author

satyamagarwal commented Jul 29, 2019

unsafe {
        val result: List<IO<List<Profile>>> = IO
            .fx {
                val userBatches: List<suspend () -> List<User>> = IO
                    .unit
                    .continueOn(Dispatchers.IO)
                    .followedBy(get())
                    .bind()
                    .sortedBy { user -> user.id }
                    .chunked(1000)
                    .map { userBatch -> suspend { userBatch } }

                val result: List<IO<List<Profile>>> = userBatches
                    .traverse { userBatch ->
                        IO
                            .unit
                            .continueOn(Dispatchers.IO)
                            .followedBy(IO.effect { LOG.info("Inserting users from ${userBatch.first().id} to ${userBatch.last().id}") })
                            .followedBy(userBatch.parTraverse { user -> insert(user) })
                    }
                    .bind()

                result
            }
            .fix()
            .unsafeRunSync()

        result
    }
  1. IO.delay I could not find it. I am using 0.10.0-SNAPSHOT. tried searching in arrow github, only 1 hit, from which it looks that it could be in io.arrow-kt:arrow-fx-rx2/reactor ?

To use the .traverse { .. } I had to wrap my chunks in suspend block to meet the signature of function. Which then gives me a List<IO<List<Profile>>>. userBatches.traverse { .. } processes the first chunks and then waits. because I cannot bind the response from parTraverse, as I get compilation error if I do .bind() on it, it says its not in the coroutine scope.

  1. How can I fix this ?
  2. Is it possible to remove the suspend wrapping ? ( Most probably I am missing something here, please help :) )
  3. About program not exiting, I took a thread dump and it says it is waiting on unsafeRunSync(). unsafeRunSync() uses unsafeRunTimed(Duration.INFINITE). Could this be the reason ?

@pakoito
Copy link

pakoito commented Jul 29, 2019

  1. It's generated from the typeclass. Use arrow-fx, it should be there. IO.defer as an extension function, otherwise IO.async().defer: https://github.com/arrow-kt/arrow/blob/master/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/typeclasses/Async.kt#L242

2 and 3.

IO
    .fx {
        continueOn(Dispatchers.IO) // Top-level thread jump, available inside fx blocks
        val users: List<User> = get()
            .bind() // One step at a time. Get users first.

        val usersLambda: (List<User>) -> IO<List<Profile>> = { userBatch -> // We'll be using this below
            IO.effect { LOG.info("Inserting users from ${userBatch.first().id} to ${userBatch.last().id}") }  // We're already on Dispatchers.IO
                .followedBy(userBatch.parTraverse(Dispatchers.IO) { user -> insert(user) }) // parTraverse defaults to other scheduler, enforce IO
        } // not executed yet

        val result: IO<List<List<Profile>>> = 
            users.sortedBy { user -> user.id }
                    .chunked(1000) // it's all sync in Dispatchers.IO up to here
                    .map(usersLambda) // create the IOs to execute sequentially with the lambda above
                    .sequence(IO.applicative()) // List<IO<List<Profile>>> -> IO<List<List<Profile>>>

       // None of the inserts have happened yet. Everything so far has happened on Dispatchers.IO too

        val profiles: List<List<Profile>> = result.bind() // execute inserts NOW
        val profilesFlat: List<Profile> = profiles.value().flatMap { it } // Remove one layer of List

        profilesFlat
    }
    .fix()  // Shouldn't be necessary with fx now
    .unsafeRunSync() // Execute BLOCKING in this thread. There's no need to use unsafe if you're using the member method.

If insert is a suspend fun instead of returning IO, then IO.effect { insert(user) }

Replace unsafeRunSync with unsafeRunAsync, see what happens. May be a deadlock.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment