Skip to content

Instantly share code, notes, and snippets.

View vpavkin's full-sized avatar

Vladimir Pavkin vpavkin

View GitHub Profile
@vpavkin
vpavkin / akka.md
Last active March 25, 2020 13:30
Akka migration notes

There are some minor breaking changes to akka-http adapter API. With this release AkkaHttpAdapter doesn't lock you up with circe, but lets you choose a json backend between circe and play-json. The simplest way to keep previous behavior is to:

  • mixin AkkaHttpCirceAdapter trait to your endpoint definition wrapper
  • add "de.heikoseeberger" %% "akka-http-circe" % "1.31.0" dependency to your build.sbt, because now it's optional for caliban-akka-http. Luckily, compiler will detect if it's missing.
  • Use adapter instead of AkkaHttpAdapter to create routes.

You can see an example of this simple migration here.

A more composable and recommended way to create caliban adapter would be this (this time it's play-json):

class BookingWireProtocolClient {
def place(client: ClientId, concert: ConcertId, seats: NonEmptyList[Seat]): (BitVector, Decoder[Unit])
def confirm(tickets: NonEmptyList[Ticket]): (BitVector, Decoder[Unit])
// ...
def status: (BitVector, Decoder[BookingStatus])
def tickets: (BitVector, Decoder[Option[NonEmptyList[Ticket]]])
}
type Encoded[A] = (BitVector, Decoder[A])
trait WireProtocol[M[_[_]]] {
def server: Decoder[PairE[Invocation[M, ?], Encoder]]
def client: M[Encoded]
}
//
// def place(client: ClientId, concert: ConcertId, seats: NonEmptyList[Seat]): F[Unit]
//
val placeInvocation = new Invocation[Booking, Unit] {
def run[F[_]](behavior: Booking[F]): F[Unit] =
behavior.place(
client = ClientId("1"),
concert = ConcertId("a"),
seats = NonEmptyList.of(Seat(Row(1), 10))
)
import scodec.bits.BitVector
import scodec.{ Decoder, Encoder }
trait WireProtocol[M[_[_]]] {
def decoder: Decoder[PairE[Invocation[M, ?], Encoder]]
def encoder: M[Encoded]
}
type Encoded[A] = (BitVector, Decoder[A])
@vpavkin
vpavkin / BookingExpirationProcessWiring.scala
Last active February 9, 2019 14:07
BookingExpirationProcessWiring
class BookingExpirationProcessWiring[F[_]: ConcurrentEffect: Timer](
clock: Clock[F],
frequency: FiniteDuration,
process: Instant => F[Unit]) {
val processStream: fs2.Stream[F, Unit] =
fs2.Stream
.fixedDelay[F](frequency)
.evalMap(_ => clock.realTime(TimeUnit.MILLISECONDS).map(Instant.ofEpochMilli))
.evalMap(process)
@vpavkin
vpavkin / BookingViewRepository.scala
Created February 9, 2019 14:03
BookingViewRepository
trait BookingViewRepository[F[_]] {
def get(bookingId: BookingKey): F[Option[BookingView]]
def set(view: BookingView): F[Unit]
def byClient(clientId: ClientId): F[List[BookingView]]
def expired(now: Instant): fs2.Stream[F, BookingKey]
}
@vpavkin
vpavkin / BookingExpirationProcess.scala
Created February 9, 2019 13:57
Booking Expiration Process Step
class BookingExpirationProcess[F[_]: Sync](bookings: Bookings[F],
bookingView: BookingViewRepository[F])
extends (Instant => F[Unit]) {
def apply(now: Instant): F[Unit] =
bookingView
.expired(now)
.evalMap(k => bookings(k).expire.void)
.compile
.drain
val bookingConfirmationProcess =
new BookingConfirmationProcess(
bookings,
confirmationService,
Slf4jLogger.unsafeFromName("BookingConfirmationProcess")
)
val bookingConfirmationProcessWiring =
new BookingConfirmationProcessWiring(
(tag, consumer) =>
@vpavkin
vpavkin / BookingConfirmationProcessWiring.scala
Created February 5, 2019 08:21
Booking Confirmation Process Wiring
type EventSource[F[_]] = fs2.Stream[F, Committable[F, EntityEvent[BookingKey, BookingEvent]]]
class BookingConfirmationProcessWiring[F[_]: ConcurrentEffect](
eventSource: (EventTag, ConsumerId) => EventSource[F],
tagging: Tagging[BookingKey],
process: (BookingKey, BookingPlaced) => F[Unit]
) {
val consumerId = ConsumerId("BookingConfirmationProcess")