Skip to content

Instantly share code, notes, and snippets.

View Daenyth's full-sized avatar

Gavin Bisesi Daenyth

View GitHub Profile
Daenyth /
Created November 28, 2023 16:44
fs2 parEvalMap vs parEvalMapUnordered

Unordered means that we don't care about the order of results from the evalMap action.

It allows for higher throughput because it never waits to start a new job. It has N job permits (with a semaphore), and the moment one job finishes, it requests the next element from the stream and begins operation.

When you use parEvalMap with ordered results, it means that it only begins the next job if the oldest input's job is ready to emit.

This matters when individual elements can take a variable amount of time to complete - and that's the case here, because backfill can take more or less time depending on how many transactions are present within the time window.

Suppose we have 4 jobs we want to run with up to 2 at a time. Job 1 takes 60 seconds to complete, and all the rest take 10 seconds. Using parEvalMap would mean the entire set of inputs would take ~70 seconds to complete.

Daenyth / k8s-port.bash
Created August 29, 2023 15:03
Get exposed port for a kubernetes service
View k8s-port.bash
kubectl get svc SERVICENAME --namespace NAMESPACE -o jsonpath='{.spec.ports[?(@.nodePort)].nodePort}'
Daenyth / resetTimeout.scala
Created August 12, 2022 13:49
resetTimeout fs2
View resetTimeout.scala
/** Execute `onTimeout` every time the stream goes `timeout` duration with no
* elements
def resetTimeout[F[_]: Temporal, A](
timeout: FiniteDuration,
onTimeout: F[Unit]
): fs2.Pipe[F, A, A] = {
def go(timedPull: Pull.Timed[F, A]): Pull[F, A, Unit] =
timedPull.timeout(timeout) >>
timedPull.uncons.flatMap {
Daenyth / doobie.scala
Created June 9, 2022 14:47
Doobie code structure example
View doobie.scala
package myapp.users
// This is the shape of code I've found to work best for doobie in a cats-effect application using capability traits
trait UserOnboarding[F[_]] {
// The api expresses the business-level workflow, not the implementation of inserting to a database
// Keep the interface expressing the high level concern; database is only *one* implementation and it can change later
def registerUser(userInfo: UserInfo): F[UserId]
View RawSqlInstances.scala
import cats.{Contravariant, Functor, Semigroupal}
import slick.jdbc.{
trait RawSqlInstances {
Daenyth / ssh-config
Last active April 21, 2022 12:08
Use a different ssh key for a specific github organization
View ssh-config
# ~/.ssh/config
Match host exec "[[ $(git remote get-url origin | sed 's/^.*://' | sed 's/\/.*//') = TheGithubOrganization ]]"
# Don't forget to ssh-add this in addition to other keys, or git will pick one that's already in the agent first
IdentityFile ~/.ssh/specific_rsa
Daenyth / munit-cats-laws.scala
Created March 1, 2022 14:41
munit-discipline cats law testing example
View munit-cats-laws.scala
import cats.kernel.laws.discipline.SemigroupTests
import cats.syntax.all._
import munit.DisciplineSuite
import org.scalacheck.Arbitrary
case class MyInt(value: Int)
object MyInt {
implicit val semigroupMyInt: Semigroup[MyInt] = Semigroup.instance((x1, x2) => MyInt(x1.value + x2.value))
Daenyth / toki-pona-words.txt
Last active May 13, 2023 22:03
lines in /usr/share/dict/words that match toki pona phonotactics
View toki-pona-words.txt
Daenyth / Prelude.scala
Created January 14, 2022 14:47
cats-effect prelude
View Prelude.scala
package com.myproject.prelude
import cats.syntax.{AllSyntaxBinCompat => CatsSyntax}
import cats.effect.syntax.{AllSyntax => CESyntax}
import cats.effect.instances.{AllInstances => CEInstances}
/** Custom prelude for importing with -Yimport
* This means we never need to import cats syntax or stream explicitly
Daenyth / VulcanHelper.scala
Created November 22, 2021 14:26
fs2-kafka vulcan => avro Serde helper
View VulcanHelper.scala
import cats.effect.Sync
import cats.syntax.all._
import cats.effect.std.Dispatcher
import fs2.kafka.vulcan.{AvroSettings, avroDeserializer, avroSerializer}
import fs2.kafka.{Deserializer, Headers, Serializer}
import org.apache.kafka.common.serialization.{Serde, Serdes}
import vulcan.Codec
// Credit to Fede Fernández for the original CE2 version
// Note: this isn't under an open source license; it's for educational purposes only