Skip to content

Instantly share code, notes, and snippets.

View gvolpe's full-sized avatar
🤓
https://leanpub.com/u/gvolpe

Gabriel Volpe gvolpe

🤓
https://leanpub.com/u/gvolpe
View GitHub Profile
@gvolpe
gvolpe / ES_Scala_Refactor.scala
Last active August 29, 2015 14:16
ES_Scala_Refactor
object TheApp extends App {
EventProcessor("BR").process
}
trait ESClient {
def country: String
def settings: String = "settings-" + country
def client: String
}
@gvolpe
gvolpe / fs2-queue-behavior.scala
Created January 31, 2017 04:37
Different behavior between Q.enqueue1 and Q.enqueue when calling Q.dequeue
package com.gvolpe.fs2queue
import fs2.{Stream, Task, async}
object FS2QueueBehavior extends App {
implicit val S = fs2.Strategy.fromFixedDaemonPool(2, "fs2-queue")
val p1 = for {
simpleQ <- Stream.eval(async.boundedQueue[Task, String](10))
@gvolpe
gvolpe / liftToPF.scala
Last active December 12, 2017 19:22
Function to PartialFunction
private def liftToPF[X <: Y, W, Y](f: Function[X, W])(implicit ct: ClassTag[X]): PartialFunction[Y, W] =
new PartialFunction[Y, W] {
override def isDefinedAt(x: Y): Boolean = ct.runtimeClass.isInstance(x)
override def apply(v1: Y): W = f(v1.asInstanceOf[X])
}
sealed trait MyError
case class ErrorOne(msg: String) extends MyError
case class ErrorTwo(msg: String) extends MyError
case class ErrorThree(msg: String) extends MyError
@gvolpe
gvolpe / TypedActor.scala
Last active December 20, 2017 07:37
Simple Typed Actor
import akka.actor.Actor
/**
* Simple [[TypedActor]] that gives any class implementing it the power to have typed messages making
* proper use of the compiler for type check exhaustiveness by just using a typed [[Function1]].
*
* For convenience use this trait instead of using directly [[Actor]] unless you have a good reason.
* */
trait TypedActor[A] extends Actor {
type TypedReceive = A => Unit
@gvolpe
gvolpe / concurrent-bracketing.scala
Last active March 20, 2018 07:25
Stream Bracket (fs2)
import cats.effect.IO
import cats.syntax.apply._
import fs2.{Scheduler, Stream, StreamApp}
import fs2.StreamApp.ExitCode
import scala.concurrent.ExecutionContext.Implicits.global
object Streaming extends StreamApp[IO] {
override def stream(args: List[String], requestShutdown: IO[Unit]): fs2.Stream[IO, ExitCode] =
@gvolpe
gvolpe / Bracketing.scala
Last active April 18, 2018 05:34
IO, Bracket and Cancelation
import java.io.FileOutputStream
import cats.effect.ExitCase.{Canceled, Completed, Error}
import cats.effect._
import cats.syntax.apply._
import cats.syntax.functor._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
package com.github.gvolpe.fs2rabbit.examples
import cats.effect.Effect
import com.github.gvolpe.fs2rabbit.config.QueueConfig
import com.github.gvolpe.fs2rabbit.interpreter.Fs2Rabbit
import com.github.gvolpe.fs2rabbit.json.Fs2JsonEncoder
import com.github.gvolpe.fs2rabbit.model._
import com.github.gvolpe.fs2rabbit.typeclasses.StreamEval
import fs2.{Pipe, Stream}
type Arguments = Args[A] forSome { type A }
final class Args[A: SafeArgument](val underlying: Map[String, A])
object Arguments {
def empty: ArgumentsAlt[String] = new ArgumentsAlt(Map.empty)
def apply[V: SafeArgument](kv: (String, V)*): ArgumentsAlt[V] =
new ArgumentsAlt(kv.toMap)
}
def scanEval[F[_]: Sync, S, A](p: Stream[F, A])(start: F[S])(f: (S, A) => F[S]): Stream[F, S] = {
def zipper(ref: Ref[F, S]): Stream[F, S] =
p.zip(Stream.eval(ref.get).repeat).evalMap { case (a, s) =>
for {
ns <- f(s, a)
_ <- ref.set(ns)
} yield ns
}
for {
import cats.effect.{ExitCode, IO, IOApp}
import cats.instances.list._
import cats.syntax.all._
import fs2._
import scala.concurrent.duration._
object jobs extends IOApp {
val largeStream: Stream[IO, Int] = Stream.range(0, 100).covary[IO]