map :: F[A] => (A => B) => F[B]
flatMap :: F[A] => (A => F[B]) => F[B]
traverse :: G[A] => (A => F[B]) => F[G[B]]
flatTraverse :: G[A] => (A => F[G[B]]) => F[G[B]]
traverse_ :: F[A] => (A => F[B]) => F[Unit]
cats-effect Resource
is extremely handy for managing the lifecycle of stateful resources, for example database or queue connections. It gives a main interface of:
trait Resource[F[_], A] {
/** - Acquire resource
* - Run f
* - guarantee that if acquire ran, release will run, even if `use` is cancelled or `f` fails
import requests | |
import logging | |
import httplib | |
# Debug logging | |
httplib.HTTPConnection.debuglevel = 1 | |
logging.basicConfig() | |
logging.getLogger().setLevel(logging.DEBUG) | |
req_log = logging.getLogger('requests.packages.urllib3') | |
req_log.setLevel(logging.DEBUG) |
import cats.Eq | |
import cats.effect.{ContextShift, IO, Timer} | |
import org.scalactic.Prettifier | |
import org.scalactic.source.Position | |
import org.scalatest.exceptions.TestFailedException | |
import org.scalatest.{Assertion, AsyncTestSuite} | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
import scala.reflect.ClassTag |
Unordered means that we don't care about the order of results from the evalMap action.
It allows for higher throughput because it never waits to start a new job. It has N job permits (with a semaphore), and the moment one job finishes, it requests the next element from the stream and begins operation.
When you use parEvalMap
with ordered results, it means that it only begins the next job if the oldest input's job is ready to emit.
This matters when individual elements can take a variable amount of time to complete - and that's the case here, because backfill can take more or less time depending on how many transactions are present within the time window.
Suppose we have 4 jobs we want to run with up to 2 at a time. Job 1 takes 60 seconds to complete, and all the rest take 10 seconds.
Using parEvalMap
would mean the entire set of inputs would take ~70 seconds to complete.
I have some data which has adjacent entries that I want to group together and perform actions on.
I know roughly that fs2.Pull
can be used to "step" through a stream and do more complicated
logic than the built in combinators allow. I don't know how to write one though!
In the end we should have something like
def combineAdjacent[F[_], A](
shouldCombine: (A, A) => Boolean,
#!/usr/bin/env bash | |
set -o errexit | |
## git refresh | |
# | |
# Update all git remotes | |
# Update local main or master branch from the remote | |
# Delete local branches tracking remotely-deleted branches | |
# Delete local branches that have been merged |
package com.myproject.prelude | |
import cats.syntax.{AllSyntaxBinCompat => CatsSyntax} | |
import cats.effect.syntax.{AllSyntax => CESyntax} | |
import cats.effect.instances.{AllInstances => CEInstances} | |
/** Custom prelude for importing with -Yimport | |
* | |
* This means we never need to import cats syntax or stream explicitly | |
*/ |
kubectl get svc SERVICENAME --namespace NAMESPACE -o jsonpath='{.spec.ports[?(@.nodePort)].nodePort}' |
Operation | Input | Result | Notes |
---|---|---|---|
map | F[A] , A => B |
F[B] |
Functor |
apply | F[A] , F[A => B] |
F[B] |
Applicative |
(fa, fb, ...).mapN | (F[A], F[B], ...) , (A, B, ...) => C |
F[C] |
Applicative |
(fa, fb, ...).tupled | (F[A], F[B], ...) |
F[(A, B, ...)] |
Applicative |
flatMap | F[A] , A => F[B] |
F[B] |
Monad |
traverse | F[A] , A => G[B] |
G[F[A]] |
Traversable; fa.traverse(f) == fa.map(f).sequence ; "foreach with effects" |
sequence | F[G[A]] |
G[F[A]] |
Same as fga.traverse(identity) |
attempt | F[A] |
F[Either[E, A]] |
Given ApplicativeError[F, E] |