-
-
Save satyamagarwal/336f02954fa4dc369fdea6524f593d1a to your computer and use it in GitHub Desktop.
val LOG: Logger = LoggerFactory.getLogger("ImportLogger") | |
sealed class Errors : Throwable() { | |
object NotFound : Errors() | |
} | |
fun get(): IO<List<User>> { | |
return dataSource1 | |
.openConnection() | |
.bracketCase( | |
release = { handle, exitCase -> handle.closeConnection(exitCase) }, | |
use = { handle -> userRepository.getUsers(handle) } | |
) | |
.flatMap { list -> | |
when { | |
list.isEmpty() -> IO.raiseError(Errors.NotFound) | |
else -> IO.just(list) | |
} | |
} | |
} | |
fun insert(user: User): IO<Profile> { | |
return dataSource2 | |
.beginTransaction() | |
.bracketCase( | |
release = { handle, exitCase -> handle.closeTransaction(exitCase) }, | |
use = { handle -> profileRepository.insert(user).raiseIfNull { Errors.NotFound } } | |
) | |
} | |
fun import() { | |
unsafe { | |
val allInsertedUsers: List<List<Profile>> = runBlocking { | |
IO | |
.fx { | |
val userBatches: List<List<User>> = | |
get() | |
.bind() | |
.sortedBy { user -> user.id } | |
.chunked(1000) | |
val insertedUsers: List<List<User>> = userBatches | |
.parTraverse { userBatch -> | |
LOG.info("Inserting users from ${userBatch.first().id} to ${userBatch.last().id}") | |
userBatch.parTraverse { user -> insert(user) } | |
} | |
.bind() | |
insertedUsers | |
} | |
} | |
allInsertedUsers | |
} | |
} |
- On line #38, bind's implementation takes me to arrow.typeclasses.MonadContinuation, whose field context defaults to EmptyCoroutineContext. Should I assume that this executed with Dispatchers.Default and that is not on the current thread?
No, that assumption is incorrect. bind
runs on the current thread, if you want to switch thread use continueOn
.
- On line #43, I am traversing over a list of lists, which I assume is asynchronous, and then I traverse over each list, that operation I assume also asynchronous(separately) but I assume each chunk will finish and then the next chunk will be processed. Is it this true ?
val insertedUsers: List<List<User>> = userBatches.parTraverse { userBatch ->
userBatch.parTraverse { user -> insert(user) }
}.bind()
parTraverse
gathers all its operations and composes them using parMapN
. Or in this case it folds over the list and combines all the IO
s using parMap(ioa, iob) { a: Profile, b: List<Profile> -> a + b }
. If you want only the batches to run in parallel you should use traverse
for the chunked list, which is the sequential variant of parTraverse
.
val insertedUsers: List<List<User>> = userBatches.traverse { userBatch ->
userBatch.parTraverse { user -> insert(user) }
}.bind()
- Is it the same if I don't make any chunks and process one big list with just one parTraverse ?
In the current example yes, see answer to question 2.
- I assume Log at line #44 should somehow be bound with the operation to print when the chunk actually starts to process. I couldn't figure out how. Can you help me ?
If you want things to be pure wrap them in IO
;) Reverse the order if you want to print when finished inserting.
val insertedUsers: List<List<User>> = userBatches
.parTraverse { userBatch ->
IO.effect { LOG.info("Inserting users from ${userBatch.first().id} to ${userBatch.last().id}") }
.followedBy(userBatch.parTraverse { user -> insert(user) })
}
.bind()
- fx's NonBlocking uses coroutine's Dispatcher.Default in the arrow implementation, How can I run this operation in coroutine's Dispatchers.IO ?
That's actually incorrect. Fx's NonBlocking
is a custom wrapper of ForkJoinPool
, just like Dispatchers.Default
. We currently do not have an Dispatchers.IO
alternative, but we include it soon. It should be relatively easy to write your own dispatcher, and I'd gladly help you with it. Anyhow to run any IO
(or F
) on any CoroutineContext
do IO.unit.continueOn(Dispatchers.IO).followedBy(myIO)
.
IO.fx {
val userBatches: List<List<User>> =
get()
.bind()
.sortedBy { user -> user.id }
.chunked(1000)
val insertedUsers: List<List<User>> = userBatches
.parTraverse { userBatch ->
LOG.info("Inserting users from ${userBatch.first().id} to ${userBatch.last().id}")
userBatch.parTraverse { user -> IO.unit.continueOn(Dispatchers.IO).followedBy(insert(user)) }
// userBatch.parTraverse { user -> Dispatchers.IO.shift().followedBy(insert(user)) } // Alias
}
.bind()
insertedUsers
}
- After I call the function in my main() method, main() method does not exits. I was expecting after the computation is finished, it should. Can you help me understand why this does not happen?
Sounds like something is hanging on a dispatcher. I'd need to dive deeper into the code to check this, can you try playing with different dispatchers?
- I want to convert unsafe and runBlocking blocks to function chain. Is it possible to achieve that. I tried many things by reading their code implementation like using arrow.fx.IO.unsafeRun().run { ... }, but I could not make it work. Can you please help me?
Suggestions welcome... For IO
you should be able to call fix().unsafeRunSync()
or fix().unsafeRunAsync { }
, you can make such an alias yourself like this. fun <A> IOOf<A>.unsafeRunBlocking(): A = IO.unsafeRun().run { runBlocking { fix() } }
- Do you see anything wrong with how I have used arrow here ?
The bracket code looks good 👍 I prefer expression bodies tho :p Only remark would be the Log
, I'd wrap that in IO.effect
and that would've probably spared you the problems.
I'm 👌with all of Simon's suggestions.
userBatch.parTraverse { user -> IO.unit.continueOn(Dispatchers.IO).followedBy(insert(user)) }
Try IO.delay(Dispatchers.IO) { insert(user) }
, IIRC we kept it.
unsafe {
val result: List<IO<List<Profile>>> = IO
.fx {
val userBatches: List<suspend () -> List<User>> = IO
.unit
.continueOn(Dispatchers.IO)
.followedBy(get())
.bind()
.sortedBy { user -> user.id }
.chunked(1000)
.map { userBatch -> suspend { userBatch } }
val result: List<IO<List<Profile>>> = userBatches
.traverse { userBatch ->
IO
.unit
.continueOn(Dispatchers.IO)
.followedBy(IO.effect { LOG.info("Inserting users from ${userBatch.first().id} to ${userBatch.last().id}") })
.followedBy(userBatch.parTraverse { user -> insert(user) })
}
.bind()
result
}
.fix()
.unsafeRunSync()
result
}
IO.delay
I could not find it. I am using0.10.0-SNAPSHOT.
tried searching in arrow github, only 1 hit, from which it looks that it could be inio.arrow-kt:arrow-fx-rx2/reactor
?
To use the .traverse { .. }
I had to wrap my chunks in suspend block to meet the signature of function. Which then gives me a List<IO<List<Profile>>>
. userBatches.traverse { .. }
processes the first chunks and then waits. because I cannot bind the response from parTraverse
, as I get compilation error if I do .bind()
on it, it says its not in the coroutine scope.
- How can I fix this ?
- Is it possible to remove the suspend wrapping ? ( Most probably I am missing something here, please help :) )
- About program not exiting, I took a thread dump and it says it is waiting on
unsafeRunSync()
.unsafeRunSync()
usesunsafeRunTimed(Duration.INFINITE)
. Could this be the reason ?
- It's generated from the typeclass. Use
arrow-fx
, it should be there.IO.defer
as an extension function, otherwiseIO.async().defer
: https://github.com/arrow-kt/arrow/blob/master/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/typeclasses/Async.kt#L242
2 and 3.
IO
.fx {
continueOn(Dispatchers.IO) // Top-level thread jump, available inside fx blocks
val users: List<User> = get()
.bind() // One step at a time. Get users first.
val usersLambda: (List<User>) -> IO<List<Profile>> = { userBatch -> // We'll be using this below
IO.effect { LOG.info("Inserting users from ${userBatch.first().id} to ${userBatch.last().id}") } // We're already on Dispatchers.IO
.followedBy(userBatch.parTraverse(Dispatchers.IO) { user -> insert(user) }) // parTraverse defaults to other scheduler, enforce IO
} // not executed yet
val result: IO<List<List<Profile>>> =
users.sortedBy { user -> user.id }
.chunked(1000) // it's all sync in Dispatchers.IO up to here
.map(usersLambda) // create the IOs to execute sequentially with the lambda above
.sequence(IO.applicative()) // List<IO<List<Profile>>> -> IO<List<List<Profile>>>
// None of the inserts have happened yet. Everything so far has happened on Dispatchers.IO too
val profiles: List<List<Profile>> = result.bind() // execute inserts NOW
val profilesFlat: List<Profile> = profiles.value().flatMap { it } // Remove one layer of List
profilesFlat
}
.fix() // Shouldn't be necessary with fx now
.unsafeRunSync() // Execute BLOCKING in this thread. There's no need to use unsafe if you're using the member method.
If insert
is a suspend fun instead of returning IO, then IO.effect { insert(user) }
Replace unsafeRunSync
with unsafeRunAsync
, see what happens. May be a deadlock.
After playing and refactoring a lot, I was able to make this program which works as it should.
But There are few things that I don't understand on why its working ( or how it is even working).
On line #38,
bind
's implementation takes me toarrow.typeclasses.MonadContinuation
, whose fieldcontext
defaults toEmptyCoroutineContext
. Should I assume that this executed withDispatchers.Default
and that is not on the current thread?On line #43, I am traversing over a list of lists, which I assume is asynchronous, and then I traverse over each list, that operation I assume also asynchronous(separately) but I assume each chunk will finish and then the next chunk will be processed. Is it this true ?
Is it the same if I don't make any chunks and process one big list with just one
parTraverse
?Next question is based on assumption that each chunk is process after the previous finishes.
4. I assume Log at line #44 should somehow be bound with the operation to print when the chunk actually starts to process. I couldn't figure out how. Can you help me ?
fx
'sNonBlocking
uses coroutine'sDispatcher.Default
in the arrow implementation, How can I run this operation in coroutine'sDispatchers.IO
?After I call the function in my
main()
method,main()
method does not exits. I was expecting after the computation is finished, it should. Can you help me understand why this does not happen?I want to convert
unsafe
andrunBlocking
blocks to function chain. Is it possible to achieve that. I tried many things by reading their code implementation like usingarrow.fx.IO.unsafeRun().run { ... }
, but I could not make it work. Can you please help me?Do you see anything wrong with how I have used arrow here ?