Skip to content

Instantly share code, notes, and snippets.

@catron
Created March 31, 2020 15:00
Show Gist options
  • Save catron/c3844ad4ea9581bd12fd0190d7fced1e to your computer and use it in GitHub Desktop.
Save catron/c3844ad4ea9581bd12fd0190d7fced1e to your computer and use it in GitHub Desktop.
Kotlin flow
Asynchronous Data Loading With New Kotlin Flow
Yet another amazing operator, Flow, from coroutines in Kotlin
Siva Ganesh Kantamani
Siva Ganesh Kantamani
Jan 3 · 6 min read
Coroutines
When we call an asynchronous function like a service call, retrieving data from the database, reading files or anything, we need a callback so that we know that the operation is finished and we can resume the actual work like in mobiles updating the UI after receiving data from your servers.
Yup, that’s fine, but when in real-time, it won’t be that easy.
For example, after receiving the data from the server, if you need to filter the data based on some metrics for which you need to do some local database operations and again you need another callback, so the list goes on for complex projects which results in an explosion of callbacks.
That’s where the real power of coroutines comes into play, with coroutines you no longer need to write callbacks for asynchronous functions anymore.
There is an amazing concept called suspend functions in coroutines which does all the work you need and returns the data in expected datatype.
Simple Coroutine sample
This will save the developers from callback hell and concentrate on their business logic in the normal format they’re working. Isn’t that great, this is just the overview of coroutines.
With coroutines, you can execute asynchronously, yet sequentially.
Please refer to the following links to know about coroutines in Kotlin, before going forward in this article.
Coroutines on Android (part I): Getting the background
What problems do coroutines solve?
medium.com
Synchronous and Asynchronous execution in Kotlin Coroutines with error and exception handling
This article is not about how to use coroutines in kotlin(although I’ll briefly touch upon the concept of coroutines we…
medium.com
Suspend Functions
A suspending function is simply a function that can be paused and resumed at a later time. They can execute a long-running operation and wait for it to complete without blocking service calls, database operations, or reading a long file.
The syntax of a suspending function is similar to that of a regular function except for the addition of the suspend keyword at the beginning.
The wonderful thing about a suspending function is that it can return any number of responses.
suspend fun getItem() : Response
suspend fun getItems() : List<Response>
Let’s see how this works.
For example, we have a suspend function foo which returns a set of responses as shown below.
suspend fun foo() : List<Responses> = buildlist{
add(Execute("A"))
add(Execute("B"))
add(Execute("C"))
}
Then, you call it from your flow as shown below.
fun getData() {
val list = withContext(Dispatchers.IO) { foo() }
for (x in list) println(x)
}
What happens here is that when you call the suspend function foo, it starts executing one-by-one and after completing all the executions, it’ll return the list of responses.
Executing list of request with suspend function
So, here we have to wait until all the requested executions are finished which is not an optimized solution.
We could do better, so the next solution the Kotlin team found was Channel.
Channel
Channels are pipes, structured, where you send data on one side and receive data on the other side, as shown below.
Overview of how channels work
To use channels, you need to change a bit of code, instead of using List<Response> as return type, we use ReceiveChannel<Response>, and instead of buildlist use produce, and instead of addTo replace it with send.
Have a look:
suspend fun foo() : ReceiveChannel<Response> = produce{
send(execute("A"))
send(xecute("B"))
send(Execute("C"))
}
And then, while using it in our flow, we’ll receive channels instead of a list of responses. Then, you iterate and print, have a look:
fun getData() {
val chaannel = withContext(Dispatchers.IO) { foo() }
for (x in channel) println(x)
}
What difference does it make? Let’s see.
When you call the foo function it will create a channel and return it immediately but won’t start the execution. Now we have two coroutines that are running. One to emit the data and others to observe it.
When you call the channel while iterating, the execution starts and it’ll execute the first one and return the response and then the second and others similarly, Have a look to have a better idea:
channels work flow
So, by using channels, you no longer need to wait to complete all the executions. But there is a problem here, Channels are hot.
Remember that I said before that two coroutines are running, one to observe and another to emit. What if there is no observer, either by mistake or any exception?
You know Channels are like opening a network connection or reading a file that uses expensive resources and if there is no observer, the connection will stay open, looking for the observer.
We can resolve this kind of thing but in the long run, it won’t be optimistic. It’ll cause serious problems in debugging and testing in the long run.
We can do better, right, and that’s how the Kotlin concept Flow came to light.
Kotlin Flow
The stable version of Flow was released a few days back. Flow not only solves the pain points of Channels but also offers many new features which you’ll see in this article.
To use Flow in the above example, we just need to change a few things like the return type to Flow and user flow instead of produce. And inside flow, you have to use emit, have a look:
fun foo() : Flow<Response> = flow{
emit(execute("A"))
emit(execute("B"))
emit(execute("C"))
}
On the other hand, in your general function, you have to use collect on the result of foo. We are collecting all the elements emitted by foo through the flow.
fun getData() {
val flowData = foo()
withContext(Dispatchers.IO) { flowdata.collect{ println(x) }
}
What happens here is when you call foo, it immediately returns the flow object but won’t start executing.
When you start collecting flow then it starts executing and a request is executed. It returns the result and then starts the next one until there is no more left.
Flow Execution
It is similar to Channel, it emits data and receives until there is no more, but the big difference is that flow is cold. Even if there is no observer, either by mistake or intentionally, the coroutine won’t hold to it because it didn’t start anything.
Flow gives a reactive behavior to your functionality by emitting the result after completing the present execution, unlike waiting to complete the entire request list.
Flow Is Declarative
When we call the foo function in the flow sample, what happens is that it create a flow and returns it, so we can use some operators like map to make flow more declarative, as shown below.
fun foo() : Flow<Response> =
flowOf("A","B","C").map{ name->
execute(name)
}
}
Here, if you observe, the function foo is not a suspend function. Why?
As I said earlier, it only defines the flow object then emits it immediately and only computes or runs when it has started to collect.
Flow Is Reactive
By reading this headline, many of the developers think of RxJava. Yes, RxJava is a kind of start to a reactive program in JVM where Kotlin runs mainly.
Why Flow?
Let’s take a close look, for example, if you want to transform any object of type A to type B in RxJava, we have an operator called map.
Yes, it works fine but what if the transformation should be done asynchronously, then we’ve to use another operator called flatmapSingle.
But in Kotlin, we have an operator called map, the transformation lambda of this operator is the suspend modifier which makes map usable in both synchronous and asynchronous jobs.
We can avoid hundreds of operators like this by using Kotlin because Kotlin has suspend functions where RxJava and others don’t, because they are not developed in Kotlin.
Before we wrap up, every concept that mentioned and explained in this article is from a talk by Roman Elizarov in KotlinConfig 2019. IF you’re interested watch from below link.
Thank you for reading.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment