Skip to content

Instantly share code, notes, and snippets.

View jchapuis's full-sized avatar

Jonas Chapuis jchapuis

View GitHub Profile
@jchapuis
jchapuis / prepare-transfer.scala
Last active March 12, 2024 16:22
Prepare transfer
def prepare(transferID: TransferID, transfer: Transfer): F[Branch.Vote[TransferFailure]] = {
if (accountID === transfer.origin)
Logger[F].debug(show"Preparing outgoing transfer $transferID: $transfer for account $accountID") >>
account.prepareOutgoingTransfer(transferID, transfer)
.onErrorRetryWithBackoff(Logger[F].warn(_)(show"Error preparing outgoing transfer $transferID, retrying in a bit"))
.onLeftRetryWithBackoff { case Account.PendingOutgoingTransfer =>
Logger[F].warn(show"Account $accountID has a pending outgoing transfer, retrying in a bit")
}(retryParameters.onPendingTransfer)
.flatMap {
case Left(Account.Unknown) => Branch.Vote.Abort(TransferFailure.AccountNotFound(accountID)).pure[F]
def transfer(from: AccountID, to: AccountID, amount: PosAmount): F[TransferFailure \/ Unit] =
coordinator
.create(TransferID.random, Transfer(from, to, amount), from, to)
.use(_.pollForFinalStatus())
.flatMap {
case Status.Committed => ().asRight[TransferFailure].pure
case Status.Aborted(reason) =>
reason match {
case AbortReason.Timeout =>
EitherT.leftT(TransferFailure.Timeout: TransferFailure).value
@jchapuis
jchapuis / transfer-coordinator.scala
Created March 7, 2024 11:06
Transfer coordinator
transactor.coordinator[TransferID, AccountID, Transfer, TransferFailure](
"transfer",
{ accountID =>
val account = sharding.entityFor(accountID)
new TransferBranch(accountID, account)
},
Some(transferParameters.timeout)
)
trait Branch[F[_], TID, Q, R]
def prepare(id: TID, query: Q): F[Vote[R]]
def commit(id: TID): F[Unit]
def abort(id: TID): F[Unit]
@jchapuis
jchapuis / prepare-phase.csv
Last active March 5, 2024 16:10
prepare-phase
Example: orchestration of the booking process for a journey - prepare phase Data store Transaction branch operation
Create cancelable hotel & flight reservations External API Make a reversible synchronous HTTP POST/PUT request
Request the billing backend for the credit card guarantee charge and await confirmation Internal service Send a message or call an endpoint and wait for an event to occur
Add details to the customer row in the database Database Acquire an exclusive lock on a database row or use builtin XA support
Grab a semaphore to update the recent bookings cache In-memory resource Lock an in-memory resource
Schedule traveller reminder notifications Actor cluster Send a command to an actor
Add an entry in a bookings log File Persist a rollback-ready change in a file
@jchapuis
jchapuis / Differ.scala
Created May 25, 2021 12:02
Generic LCS diffing in Scala (dynamic programming with memoization)
import cats.{ Eq, Show }
import cats.syntax.eq._
import cats.syntax.show._
import cats.instances.int._
import FunctionHelpers._
object Differ {
sealed trait Diff[T]
object Diff {
final case class Insert[T](revision: T) extends Diff[T]
@jchapuis
jchapuis / reader-compatibility-types.csv
Last active March 29, 2021 18:54
reader-compatibility-types
Reader compatibility type Intuition Writer changes allowed Writer changes forbidden
'BACKWARD' Clients can keep decoding the responses for their requests Add fields & Delete optional fields & Shrink types Delete fields & Widen types
@jchapuis
jchapuis / diagrams.scala
Created March 22, 2021 21:43
Parsable diagrams
"DateTimeInterval.relate" should {
"return Disjoint" in new Fixture {
val a = "+---+ "
val b = " +--+"
a.relateTo(b) shouldBe Disjoint
b.relateTo(a) shouldBe Disjoint
}
"return IncludedBy and Includes" in new Fixture {
val a = " +-------+ "
@jchapuis
jchapuis / dsl.scala
Created March 22, 2021 21:31
akka-dsl
builder
.startSystemDependent(SomeEtcdService())
.thenStartHttp(
HttpServiceDefinition(
new AkkaControllers().routes,
serverConfig.host,
serverConfig.port,
serverConfig.hardDeadline,
serverConfig.unbindingDelay
)
def getBlocksForResource[F[_, _]: BifunctorMonadError: Bilogger]
(resourceID: ResourceID, timeFilter: TimeFilter)
(implicit resourceRepo: ResourceRepo[F], blockRepo: BlockRepo[F]): F[ResourceBlockError, List[Block]] =
(for {
resource <- resourceRepo.get(resourceID)
blocks <- blockRepo.blocksForResource(resource, timeFilter)
_ <- F.info(show"Successfully generated ${blocks.size} blocks for resource $resourceID in interval $timeFilter")
} yield blocks).onLeft {
case repositoryError: RepositoryError => F.error(repositoryError.message)
case blockError: BlockError => F.error(blockError.message)