Skip to content

Instantly share code, notes, and snippets.

@Daenyth
Daenyth / parEvalMap.md
Created November 28, 2023 16:44
fs2 parEvalMap vs parEvalMapUnordered

Unordered means that we don't care about the order of results from the evalMap action.

It allows for higher throughput because it never waits to start a new job. It has N job permits (with a semaphore), and the moment one job finishes, it requests the next element from the stream and begins operation.

When you use parEvalMap with ordered results, it means that it only begins the next job if the oldest input's job is ready to emit.

This matters when individual elements can take a variable amount of time to complete - and that's the case here, because backfill can take more or less time depending on how many transactions are present within the time window.

Suppose we have 4 jobs we want to run with up to 2 at a time. Job 1 takes 60 seconds to complete, and all the rest take 10 seconds. Using parEvalMap would mean the entire set of inputs would take ~70 seconds to complete.

@Daenyth
Daenyth / k8s-port.bash
Created August 29, 2023 15:03
Get exposed port for a kubernetes service
kubectl get svc SERVICENAME --namespace NAMESPACE -o jsonpath='{.spec.ports[?(@.nodePort)].nodePort}'
@Daenyth
Daenyth / resetTimeout.scala
Created August 12, 2022 13:49
resetTimeout fs2
/** Execute `onTimeout` every time the stream goes `timeout` duration with no
* elements
*/
def resetTimeout[F[_]: Temporal, A](
timeout: FiniteDuration,
onTimeout: F[Unit]
): fs2.Pipe[F, A, A] = {
def go(timedPull: Pull.Timed[F, A]): Pull[F, A, Unit] =
timedPull.timeout(timeout) >>
timedPull.uncons.flatMap {
@Daenyth
Daenyth / doobie.scala
Created June 9, 2022 14:47
Doobie code structure example
package myapp.users
// This is the shape of code I've found to work best for doobie in a cats-effect application using capability traits
trait UserOnboarding[F[_]] {
// The api expresses the business-level workflow, not the implementation of inserting to a database
// Keep the interface expressing the high level concern; database is only *one* implementation and it can change later
def registerUser(userInfo: UserInfo): F[UserId]
}
import cats.{Contravariant, Functor, Semigroupal}
import slick.jdbc.{
GetResult,
PositionedParameters,
PositionedResult,
SetParameter
}
trait RawSqlInstances {
@Daenyth
Daenyth / ssh-config
Last active April 21, 2022 12:08
Use a different ssh key for a specific github organization
# ~/.ssh/config
Match host github.com exec "[[ $(git remote get-url origin | sed 's/^.*://' | sed 's/\/.*//') = TheGithubOrganization ]]"
# Don't forget to ssh-add this in addition to other keys, or git will pick one that's already in the agent first
IdentityFile ~/.ssh/specific_rsa
@Daenyth
Daenyth / munit-cats-laws.scala
Created March 1, 2022 14:41
munit-discipline cats law testing example
import cats.kernel.laws.discipline.SemigroupTests
import cats.syntax.all._
import munit.DisciplineSuite
import org.scalacheck.Arbitrary
case class MyInt(value: Int)
object MyInt {
implicit val semigroupMyInt: Semigroup[MyInt] = Semigroup.instance((x1, x2) => MyInt(x1.value + x2.value))
}
@Daenyth
Daenyth / toki-pona-words.txt
Last active May 13, 2023 22:03
lines in /usr/share/dict/words that match toki pona phonotactics
a
ajaja
ajowan
aka
akala
akali
akan
akasa
ake
akeki
@Daenyth
Daenyth / Prelude.scala
Created January 14, 2022 14:47
cats-effect prelude
package com.myproject.prelude
import cats.syntax.{AllSyntaxBinCompat => CatsSyntax}
import cats.effect.syntax.{AllSyntax => CESyntax}
import cats.effect.instances.{AllInstances => CEInstances}
/** Custom prelude for importing with -Yimport
*
* This means we never need to import cats syntax or stream explicitly
*/
@Daenyth
Daenyth / VulcanHelper.scala
Created November 22, 2021 14:26
fs2-kafka vulcan => avro Serde helper
import cats.effect.Sync
import cats.syntax.all._
import cats.effect.std.Dispatcher
import fs2.kafka.vulcan.{AvroSettings, avroDeserializer, avroSerializer}
import fs2.kafka.{Deserializer, Headers, Serializer}
import org.apache.kafka.common.serialization.{Serde, Serdes}
import vulcan.Codec
// Credit to Fede Fernández for the original CE2 version
// Note: this isn't under an open source license; it's for educational purposes only