Skip to content

Instantly share code, notes, and snippets.

@kamilkloch
kamilkloch / StreamMergeOverhead2.scala
Last active December 7, 2023 12:08
Performance of merging fs2 streams
object StreamMergeOverhead2 extends IOApp.Simple {
def run: IO[Unit] = {
IO.println("=== Stream(1).covary[IO].repeatN(10_000) ===") >>
benchmark(Stream(1).covary[IO].repeatN(10_000), numStreams = 2, m = 10) >>
IO.println("=== Stream.emits(List.fill(10_000)(1)).covary[IO].repeatN(10) ===") >>
benchmark(Stream.emits(List.fill(10_000)(1)).covary[IO].repeatN(10), numStreams = 2, m = 10)
}
def benchmark[O](s: Stream[IO, O], numStreams: Int, m: Int): IO[Unit] = {
@kamilkloch
kamilkloch / StreamMergePerformanceTests.scala
Created November 30, 2023 18:01
fs2 Stream merge performance vs raw IOs
import cats.effect.std.Queue
import cats.effect.{IO, IOApp}
import fs2._
object StreamMergePerformanceTests extends IOApp.Simple {
def run: IO[Unit] = {
case class Payload(x: Integer)
@kamilkloch
kamilkloch / StreamEvalTapChunk.scala
Created November 29, 2023 10:13
Stream#evalTapChunk benchmark
package fs2
import cats.Applicative
import cats.effect.{IO, IOApp}
import cats.syntax.all._
/** https://github.com/typelevel/fs2/pull/3350 */
object StreamEvalTapChunk extends IOApp.Simple {
implicit class StreamOps[F[_], O](s: Stream[F, O]) {
def evalTapChunkNew[F2[x] >: F[x] : Applicative, O2](f: O => F2[O2]): Stream[F2, O] = {
time=2023-10-25T08:58:40Z level=info msg=started handling request http_request=map[headers:map[accept:text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8 accept-encoding:gzip, deflate accept-language:en-US,en;q=0.5 connection:keep-alive cookie:Value is sensitive and has been redacted. To see the value set config key "log.leak_sensitive_values = true" or environment variable "LOG_LEAK_SENSITIVE_VALUES=true". upgrade-insecure-requests:1 user-agent:Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0] host:kamilk:4433 method:GET path:/self-service/login/browser query:Value is sensitive and has been redacted. To see the value set config key "log.leak_sensitive_values = true" or environment variable "LOG_LEAK_SENSITIVE_VALUES=true". remote:192.168.93.61:56224 scheme:http]
time=2023-10-25T08:58:40Z level=info msg=[DEBUG] GET http://hydra:4445/admin/oauth2/auth/requests/login audience=application service_name=Ory Kratos service_version=v1.0.0
time=2023-10-2
time=2023-10-25T08:58:40Z level=info msg=started handling request http_request=map[headers:map[accept:text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8 accept-encoding:gzip, deflate accept-language:en-US,en;q=0.5 connection:keep-alive cookie:[csrf_token_6b16b8682aa116061b6c75d86bad2905ac02180a129ba1ceea0298b8238ad734=9BsW+mG43dOVENEN3TXA8N+P/HT83E/hIzrYJYIimnw=; ory_hydra_session_dev=MTY5ODE1MjEwNHxKMHVBUFhqd2ZuZU5HYVhGdjhfODJsY2l2Vi1BOURvaHdZS3ZxekVLc1ZMN0xOTTJMSndSaG5vQzJtSHRuWGI2VS1HeXZEdnNXbXNzOWxadmZ0LURCcU0xSW9BX3ZoOGI0ekxVUXJkSldMNkd6Uy12Sm1ZQ3FJbmU2LXJ0WDNyNkhRPT18pM9eOQDyLO0r3hl51n8GcGYrgC2chE7vvbCeArdNKVQ=; _csrf=4V648HisocMp0vY_5Gl5QPyM; ory_hydra_login_csrf_dev_2327924913=MTY5ODIyMzMxNnxHU0pEbzdmT3VFMjdfX010RXpwbDNkVDZLbFRPYlFYNnUyMXotcXkwc1RQMVZpSDVJLWtjY1ZOWnpRQWFTdzN4UkFIZ0oyZkU0QmNKYUdvN0lGdWJkQ3paM1hvZlBhUjJEbzV6R2FEM3J3Q3FaMC1Bc3FRY0NrTzVMSTZqdWc9PXyVvGuOzo3daRe4g-sdCcldnT4b4ItSEJjm-OOTpaXqbw==; ory_hydra_consent_csrf_dev_2327924913=MTY5ODIyMzMxNnxuNHBRdnh
@kamilkloch
kamilkloch / KKResearchDispatcherSequentialVsIO.scala
Created September 15, 2023 10:26
DispatcherSequential vs IO
object KKResearchDispatcherSequentialVsIO extends IOApp.Simple {
val n = 1000_000
val tasks = ArraySeq.fill(n)(IO.unit)
def run: IO[Unit] = {
val viaDispatcher = Dispatcher.sequential[IO](await = true).use { dispatcher =>
IO(tasks.foreach(dispatcher.unsafeRunAndForget))
}
@kamilkloch
kamilkloch / DeferredAsyncPerf.scala
Created August 14, 2023 11:07
Benchmark IO.async vs manual Deferred
import cats.effect.std.Dispatcher
import cats.effect.{IO, IOApp}
import cats.syntax.all._
import java.util.concurrent.atomic.AtomicInteger
object DeferredAsyncPerf extends IOApp.Simple {
trait Model {
def addListener(onResponse: Either[Throwable, Int] => Unit): Unit
import cats.effect.std.Supervisor
import cats.effect.{IO, IOApp}
import cats.syntax.all._
import org.HdrHistogram.{ConcurrentHistogram, Histogram}
import scala.concurrent.duration.DurationInt
import scala.util.chaining.scalaUtilChainingOps
object RefProducer extends IOApp.Simple {
val n = 20_000
import zio.Clock.ClockLive
import zio.Console.ConsoleLive
import zio._
import zio.stream.ZStream
import java.io.IOException
import java.util.concurrent.TimeUnit
object ZioHub extends ZIOAppDefault {
override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] = {
import cats.effect.std.Supervisor
import cats.effect.{IO, IOApp}
import fs2.Stream
import fs2.concurrent.Topic
import cats.syntax.all._
import scala.concurrent.duration.DurationInt
object Topics extends IOApp.Simple {