This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class OrderService()(implicit logger: Logger[IO]) { | |
def consumeOrder(msg: OrderEvent): IO[Boolean] = { | |
logger.info(s"Consuming order: $msg") *> IO.pure(true) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.IO | |
import cats.implicits.catsSyntaxApply | |
import com.permutive.pubsub.consumer.decoder.MessageDecoder | |
import config.PubSubConfiguration | |
import org.http4s.client.Client | |
import org.typelevel.log4cats.Logger | |
class PubSubConsumer[T: MessageDecoder] private[pubsub] ( | |
pubSubSubscriber: PubSubSubscriber[T], | |
subscriptionName: String |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def subscribe(subscriptionName: String): fs2.Stream[IO, ConsumerRecord[IO, T]] = | |
PubsubHttpConsumer.subscribe[IO, T]( | |
projectId = ProjectId(config.projectId), | |
subscription = Subscription(subscriptionName), | |
serviceAccountPath = Some(config.keyFileLocation), | |
config = PubsubHttpConsumerConfig[IO]( | |
host = config.host, | |
port = config.port, | |
isEmulator = false, | |
onTokenRefreshError = e => |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.IO | |
import cats.effect.kernel.Resource | |
import cats.implicits.catsSyntaxOptionId | |
import com.permutive.pubsub.producer.{Model, PubsubProducer} | |
import com.permutive.pubsub.producer.encoder.MessageEncoder | |
import com.permutive.pubsub.producer.http.{HttpPubsubProducer, PubsubHttpProducerConfig} | |
import config.PubSubConfiguration | |
import org.http4s.client.Client | |
import org.typelevel.log4cats.Logger |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.permutive.pubsub.consumer.decoder.MessageDecoder | |
import com.permutive.pubsub.producer.encoder.MessageEncoder | |
import core.pubsub.CirceImplicits.{pubSubDecoder, pubSubEncoder} | |
import io.circe.{Decoder, Encoder} | |
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} | |
case class OrderEvent(id: String, name: String, price: Double) | |
object OrderEvent { | |
implicit val orderDecoder: Decoder[OrderEvent] = |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
http { | |
host = "localhost" | |
host = ${?HOSTNAME} | |
port = 8089 | |
port = ${?PORT} | |
} | |
pub-sub { | |
project-id = "some-project-id" | |
project-id = ${?PROJECT_ID} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
recovering from roll different than 4 | |
recovering from roll different than 4 | |
finished with: 4 | |
Process finished with exit code 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object CatsRetryOnSomeErrors extends IOApp.Simple { | |
val loadedDie: LoadedDie = LoadedDie(2, 5, 4, 1, 3, 2, 6) | |
def unsafeFunction(): IO[Int] = { | |
val res = loadedDie.roll() | |
if(res != 4) { | |
IO.raiseError(new IllegalArgumentException("roll different than 4")) | |
} else { | |
IO.pure(res) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Rolled a 2, retrying ... WillDelayAndRetry(0 days,0,0 days) | |
Rolled a 5, retrying ... WillDelayAndRetry(0 days,1,0 days) | |
Rolled a 4, retrying ... WillDelayAndRetry(0 days,2,0 days) | |
Rolled a 1, retrying ... GivingUp(3,0 days) | |
finished with: 1 | |
Process finished with exit code 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sealed trait RetryDetails { | |
def retriesSoFar: Int | |
def cumulativeDelay: FiniteDuration | |
def givingUp: Boolean | |
def upcomingDelay: Option[FiniteDuration] | |
} |