Skip to content

Instantly share code, notes, and snippets.

@karlroberts
Last active January 8, 2020 04:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save karlroberts/19f53eed78e0ad19f206912b474a6b1f to your computer and use it in GitHub Desktop.
Save karlroberts/19f53eed78e0ad19f206912b474a6b1f to your computer and use it in GitHub Desktop.
Show how to get some tasks in a cats.Effect IO to run concurrently with the other IO's in a for comprehension
It is confusiong to many (inc me) so here is a demo.
I think the main confusion comes from understanding the differences between Blocking, NonBlocking Concurrency, Asynchronicity, runUnsafe and runUnsafeAsync.
Concurrent means tasks run or appear to run at the same time either by running on different processors or machines or by timeslicing threads
Asynchronous means we don't wait for the result of running a call. This is easy if a function is a proceedure that is a pure side effect ie
returns Unit () but if it does then we need to create a callback handler to handle it when it does return. That callback may interupt th running thread or run on another one.
All synchronous calls running locally block. You have to wait for the result of one call before the next one proceeds.
If that local function takes a long time you will notice it!
But people usually refer to Blocking calls when calling remote services or any other kind of IO eg writting to a disk.
This is because these calls are orders of magnitude slower than running code on the same CPU or thread, and so
appear to block the calling process, but this is not conceptually diffent from calling a slow CPU intensive local
function.
It is noted that if you can dispatch a callback handler to handle the response eventually, then you can jump past
that Blocking call and carry on doing useful work (for other contexts or client calls, while waiting for the slow call
to complete.
This is what peopl mean by Non-Blocking, you dont waste a thread that just sits there doing noting for seconds at
a time.
cats Effect IO is a mechanism for running anything that needs IO, ie a side-effect, such as writing to a disk or
calling a service or printing to the console in a pure manner.
Side effects (functions that have no result,ie return Unit) are simple CPU spinning electricity wasters as far a
functional program is concerned. Functional programs are effectivly calculations that return a result value.
Of course side effects are how real work in the outside world out side the calculation are performed, eg writing
the result of a pure calculation to the console.
Side effects are not referentially transparent and so are hard to reason about in a pure function programming style.
cats.effect.IO is a Monad that wraps SideEffects in pure calculations. It does this by suspending the side-effect in an IO.
The IO is a result (and so pure, not SideEffect is executed, it is really a description of how to run the side effect when you
get around to running it.
In this way many side-effects can be composed together in a functional style and then run when convienient.
this is done with the IO's runUnsafe and runUnsafeAsync methods and their cousins.
Confusion can arrise here wrt Concurrent and async programming.
see https://typelevel.org/cats-effect/datatypes/io.html#unsafe-operations
runUnsafe :- will "read" the IO "program" description and run the side effect synchronously on the current thread... you will
have to wait for it, ie the thread is blocked, and you will have to handle the result success or failure.
runUnsafeAsync :- will "read" the IO "program" description and run the side effect asynchronously, the calling thread is not blocked
at this time as the "result of runUnsafeAsync" is Unit, you code will carry on, the result of the IO is handled by a callback
function thatyou provide a callback to handle the result success or failure.
The callback executes on the current Thread when the IO returns a value.
you provide a callback to handle the result success or failure.
Of course you may want the callback to run on another thread concurrently to the main program when it is called, eg to update a
log or database record. cats.effetc.IO gives you the ability to Thread shift execution using ContextShift.
ContextShift lets you specify an executionContext to run the IO on, usually backed by a thread pool.
Now the point that mny people including myself want to do is to program with IO but control how and where code runs
so we can dispatch an async call whose callback is handled by a thread pool other than the main (global)
fork-join work-stealing pool.
You may have a chain of IO's in for comprehention doing busy work on the efficient global fork-join pool but one of the IO's may be
a blocking call to a remote system... rather than block the fork-join pool that will stop other busy work from happening
you can contextShift to evalOn a blocking thread pool. The for comprension of IO monads will still enforce the sequencing of events
(sequencing is the main thing a Monad does) but some tasks will happen on another pool.
However, you may have an IO in the sequence that you don't want to wait for, it can happen concurrently as you progress to the next IO,
eg logging statements or fire and forget proceedures such as closing down and tidying up resources, that don't realy contibute to the
"calculation" you are doing but need to be scheduled after certain tasks are done.
IO gives you access to a "green thread" mechanism called Fibers that are very lightweight and allow co-operative concurrency.
IO.start returns a IO[Fiber[IO, A]] (where A is the return type of the IO).
Of course this is still an IO so nothing happens yet but the "description of the side-effect program" can be interpreted as
"when you see IO.start kick off a running Fibre to run this IO side-effect"
Now this will generate work but to see the result (remember it happend concurrently) you need to run Fibre.join to pass the result back to the thread of execution
Fibers operate concurrently because you have to implicitly provide a ContextShift for them to run on. Of course this means that you can explicitly
provide one too, to specify a diffent thread pool to run on, usefull if the work will be Blocking.
/*
The code below has io1 io2 io3 that sleep for 2 seconds, 3 seconds and 2 seconds respectivly.
I sequence them in order in the for comprension but show how they can be context shifted onto other pools or
run concurrently in Fibers. the results are attched to this gist.
Note I use IOApp rather than App which automatically runs the run function to get the IO then calls runUnsafe on it
*/
package kfoo
import java.util.concurrent.Executors
import cats.effect._
import cats.syntax.all._
import org.apache.logging.log4j.scala.Logging
import scala.concurrent.ExecutionContext
object ThreadPools {
import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicLong
// so I can name threads
def threadFactory(prefix: String, initialVal: Long = 0): ThreadFactory =
new ThreadFactory() {
final private val threadIndex = new AtomicLong(initialVal)
override def newThread(runnable: Runnable): Thread = {
val thread = new Thread(runnable)
thread.setName(prefix + "-" + threadIndex.getAndIncrement)
thread
}
}
// a pool with a single thread in it. for when we know ther is just on thing to do
val singleAsyncExecutor =
Executors.newSingleThreadExecutor(threadFactory("single-async-1"))
val async1 = ExecutionContext.fromExecutor(singleAsyncExecutor)
// cached pool will add more threads if non are available and shrink back to minimum when done
// usefull for remote blocking operations, its no problem to have thousands of threads (other then mem overhead)
// if they are blocked and so in waiting state and not bothering the CPU
val blocker1Executor =
Executors.newCachedThreadPool(threadFactory("blocker-1"))
val blocker1 = ExecutionContext.fromExecutor(blocker1Executor)
}
object KarlDemoConcurrent extends IOApp with Logging {
import ThreadPools._
def run(args: List[String]): IO[ExitCode] =
(
IO { logger.info("starting doIOS_AllSynchronousSamePool") } *> doIOS_AllSynchronousSamePool() *>
IO { logger.info("\n\nstarting doIOS_AllSynchronousDiffPool") } *> doIOS_AllSynchronousDiffPool() *>
IO { logger.info("\n\nstarting doIOS_io2IsConcurrentSamePool") } *> doIOS_io2IsConcurrentSamePool() *>
IO { logger.info("\n\nstarting doIOS_io2IsConcurrentDiffPool") } *> doIOS_io2IsConcurrentDiffPool() *>
IO { logger.info("\n\nstarting doIOS_AllConcurrent") } *> doIOS_AllConcurrent()
).as(ExitCode.Success)
def io1 = IO { Thread.sleep((2000)) } *> IO { logger.info("This is io1") }
def io2 = IO { Thread.sleep((3000)) } *> IO { logger.info("This is io2") }.handleErrorWith(t => IO { logger.error("WTF", t) })
def io3 = IO { Thread.sleep((2000)) } *> IO { logger.info("This is io3") }
def doIOS_AllSynchronousSamePool() =
for {
io1 <- io1
io2 <- io2
io3 <- io3
} yield ()
def doIOS_AllSynchronousDiffPool() =
for {
io1 <- io1
io2 <- contextShift.evalOn(async1)(io2)
io3 <- io3
} yield ()
def doIOS_io2IsConcurrentSamePool() =
for {
io1 <- io1
io2 <- io2.start
io3 <- io3
_ <- io2.join
} yield ()
def doIOS_io2IsConcurrentDiffPool() =
for {
io1 <- io1
io2 <- io2.start(IO.contextShift(async1))
io3 <- io3
_ <- io2.join
} yield ()
def doIOS_AllConcurrent() =
for {
io1 <- io1.start
io2 <- io2.start(IO.contextShift(async1))
io3 <- io3.start
_ <- io2.join
_ <- io3.join
_ <- io1.join
} yield ()
}
15:13:05.901 [ioapp-compute-0] INFO kfoo.KarlDemoAsync$ - starting doIOS_AllSynchronousSamePool
15:13:07.904 [ioapp-compute-0] INFO kfoo.KarlDemoAsync$ - This is io1
15:13:10.909 [ioapp-compute-0] INFO kfoo.KarlDemoAsync$ - This is io2
15:13:12.914 [ioapp-compute-0] INFO kfoo.KarlDemoAsync$ - This is io3
15:13:12.915 [ioapp-compute-0] INFO kfoo.KarlDemoAsync$ -
starting doIOS_AllSynchronousDiffPool
15:13:14.915 [ioapp-compute-0] INFO kfoo.KarlDemoAsync$ - This is io1
15:13:17.952 [single-async-1-0] INFO kfoo.KarlDemoAsync$ - This is io2
15:13:19.963 [ioapp-compute-1] INFO kfoo.KarlDemoAsync$ - This is io3
15:13:19.964 [ioapp-compute-1] INFO kfoo.KarlDemoAsync$ -
starting doIOS_io2IsConcurrentSamePool
15:13:21.964 [ioapp-compute-1] INFO kfoo.KarlDemoAsync$ - This is io1
15:13:23.968 [ioapp-compute-1] INFO kfoo.KarlDemoAsync$ - This is io3
15:13:24.967 [ioapp-compute-2] INFO kfoo.KarlDemoAsync$ - This is io2
15:13:24.973 [ioapp-compute-2] INFO kfoo.KarlDemoAsync$ -
starting doIOS_io2IsConcurrentDiffPool
15:13:26.973 [ioapp-compute-2] INFO kfoo.KarlDemoAsync$ - This is io1
15:13:28.977 [ioapp-compute-2] INFO kfoo.KarlDemoAsync$ - This is io3
15:13:29.976 [single-async-1-0] INFO kfoo.KarlDemoAsync$ - This is io2
15:13:29.977 [single-async-1-0] INFO kfoo.KarlDemoAsync$ -
starting doIOS_AllConcurrent
15:13:31.978 [ioapp-compute-3] INFO kfoo.KarlDemoAsync$ - This is io1
15:13:31.983 [ioapp-compute-4] INFO kfoo.KarlDemoAsync$ - This is io3
15:13:32.984 [single-async-1-0] INFO kfoo.KarlDemoAsync$ - This is io2
Process finished with exit code 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment