Skip to content

Instantly share code, notes, and snippets.

class OrderService()(implicit logger: Logger[IO]) {
def consumeOrder(msg: OrderEvent): IO[Boolean] = {
logger.info(s"Consuming order: $msg") *> IO.pure(true)
}
}
import cats.effect.IO
import cats.implicits.catsSyntaxApply
import com.permutive.pubsub.consumer.decoder.MessageDecoder
import config.PubSubConfiguration
import org.http4s.client.Client
import org.typelevel.log4cats.Logger
class PubSubConsumer[T: MessageDecoder] private[pubsub] (
pubSubSubscriber: PubSubSubscriber[T],
subscriptionName: String
def subscribe(subscriptionName: String): fs2.Stream[IO, ConsumerRecord[IO, T]] =
PubsubHttpConsumer.subscribe[IO, T](
projectId = ProjectId(config.projectId),
subscription = Subscription(subscriptionName),
serviceAccountPath = Some(config.keyFileLocation),
config = PubsubHttpConsumerConfig[IO](
host = config.host,
port = config.port,
isEmulator = false,
onTokenRefreshError = e =>
import cats.effect.IO
import cats.effect.kernel.Resource
import cats.implicits.catsSyntaxOptionId
import com.permutive.pubsub.producer.{Model, PubsubProducer}
import com.permutive.pubsub.producer.encoder.MessageEncoder
import com.permutive.pubsub.producer.http.{HttpPubsubProducer, PubsubHttpProducerConfig}
import config.PubSubConfiguration
import org.http4s.client.Client
import org.typelevel.log4cats.Logger
import com.permutive.pubsub.consumer.decoder.MessageDecoder
import com.permutive.pubsub.producer.encoder.MessageEncoder
import core.pubsub.CirceImplicits.{pubSubDecoder, pubSubEncoder}
import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
case class OrderEvent(id: String, name: String, price: Double)
object OrderEvent {
implicit val orderDecoder: Decoder[OrderEvent] =
@softberries
softberries / application.conf
Created December 23, 2022 13:42
pubsub-example-scala application.conf
http {
host = "localhost"
host = ${?HOSTNAME}
port = 8089
port = ${?PORT}
}
pub-sub {
project-id = "some-project-id"
project-id = ${?PROJECT_ID}
recovering from roll different than 4
recovering from roll different than 4
finished with: 4
Process finished with exit code 0
object CatsRetryOnSomeErrors extends IOApp.Simple {
val loadedDie: LoadedDie = LoadedDie(2, 5, 4, 1, 3, 2, 6)
def unsafeFunction(): IO[Int] = {
val res = loadedDie.roll()
if(res != 4) {
IO.raiseError(new IllegalArgumentException("roll different than 4"))
} else {
IO.pure(res)
Rolled a 2, retrying ... WillDelayAndRetry(0 days,0,0 days)
Rolled a 5, retrying ... WillDelayAndRetry(0 days,1,0 days)
Rolled a 4, retrying ... WillDelayAndRetry(0 days,2,0 days)
Rolled a 1, retrying ... GivingUp(3,0 days)
finished with: 1
Process finished with exit code 0
sealed trait RetryDetails {
def retriesSoFar: Int
def cumulativeDelay: FiniteDuration
def givingUp: Boolean
def upcomingDelay: Option[FiniteDuration]
}