Skip to content

Instantly share code, notes, and snippets.

View calvinlfer's full-sized avatar
🥳

Calvin Lee Fernandes calvinlfer

🥳
View GitHub Profile
@calvinlfer
calvinlfer / README.md
Last active April 24, 2024 13:03
Developing an understanding of the FS2 Pull datatype

Developing an understanding of the FS2 Pull data type

In Pull[F, O, R], R is the return type. Pull represents a computation that emits some values on the Stream (of type O) and returns a new thing (R). In order to convert a Pull to a Stream, R must be Unit. This is because an FS2 Stream does not have the ability to terminate with a return value.

image

See here for the conversation

  • Stream[F, O] is monadic over O which are the output values emitted
  • Pull[F, O, R] is monadic over R which is used for stateful transformations
package io.kaizensolutions.virgil.nextlevel
import io.kaizensolutions.virgil.nextlevel.nextlevel.peopleTable
object nextlevel {
sealed trait Table {
type TableTag
}
object Table {
@calvinlfer
calvinlfer / build-native-image.sh
Created May 8, 2023 13:06
ZIO HTTP (Graal Native Image & Scala CLI)
#!/bin/sh
scala-cli --power package --native-image -f *.scala -o ziohttp.bin -- --no-fallback --enable-url-protocols=http,https -Djdk.http.auth.tunneling.disabledSchemes= --install-exit-handlers --enable-http --initialize-at-run-time=io.netty.channel.DefaultFileRegion --initialize-at-run-time=io.netty.channel.epoll.Native --initialize-at-run-time=io.netty.channel.epoll.Epoll --initialize-at-run-time=io.netty.channel.epoll.EpollEventLoop --initialize-at-run-time=io.netty.channel.epoll.EpollEventArray --initialize-at-run-time=io.netty.channel.kqueue.KQueue --initialize-at-run-time=io.netty.channel.kqueue.KQueueEventLoop --initialize-at-run-time=io.netty.channel.kqueue.KQueueEventArray --initialize-at-run-time=io.netty.channel.kqueue.Native --initialize-at-run-time=io.netty.channel.unix.Limits --initialize-at-run-time=io.netty.channel.unix.Errors --initialize-at-run-time=io.netty.channel.unix.IovArray --initialize-at-run-time=io.netty.handler.codec.compression.ZstdOptions --initialize-at-run-time=io.netty.incub
@calvinlfer
calvinlfer / ZioHttpServerTracer.scala
Created February 17, 2023 19:42
getting tracing to work with zio-http 0.0.4's new Handler abstraction
package io.kaizensolutions.trace4cats.zio.extras.ziohttp.server
import trace4cats.ErrorHandler
import trace4cats.model.AttributeValue.{LongValue, StringValue}
import trace4cats.model.SemanticAttributeKeys.*
import trace4cats.model.{AttributeValue, SpanKind, SpanStatus}
import io.kaizensolutions.trace4cats.zio.extras.ZTracer
import io.kaizensolutions.trace4cats.zio.extras.ziohttp.{extractTraceHeaders, toSpanStatus}
import zio.*
import zio.http.*
@calvinlfer
calvinlfer / PrimeExample.scala
Created March 6, 2021 20:07
Generating prime numbers in pure functional Scala using FS2 Streams
object PrimeExample extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val simplePrimes: Stream[Pure, Int] =
Stream
.iterate(1)(_ + 1)
.flatMap(candidate =>
Stream
.range(2, candidate - 1) // can further optimize using sqrt(candidate)
.exists(i => candidate % i == 0)
@calvinlfer
calvinlfer / OneToManyJoin.scala
Created November 25, 2022 17:49
In memory one to many join
def joinLManyR[Err, LeftElem, LeftKey, MiddleElem, MiddleKey, RightElem, RightKey, RightProjection](left: Iterable[LeftKey])(
middle: Iterable[LeftKey] => Stream[Err, MiddleElem]
)(middleKey: MiddleElem => MiddleKey)(middleToLeftKey: MiddleElem => LeftKey)(middleToRightKey: MiddleElem => RightKey)(
right: Iterable[RightKey] => Stream[Err, RightElem]
)(rightKey: RightElem => RightKey)(rightProjection: RightElem => RightProjection): IO[Err, Map[LeftKey, Set[RightProjection]]] =
val extractMiddle: IO[Err, Chunk[MiddleElem]] = middle(left).runCollect
def rightElems(in: Iterable[RightKey]): IO[Err, Chunk[RightElem]] =
right(in).runCollect
@calvinlfer
calvinlfer / ZonedDateTimeRange.scala
Last active October 31, 2022 20:16
Preliminary support for a specific range in Skunk
import skunk.Codec
import skunk.data.Type
import zio.ZIOAppDefault
import java.time.{OffsetDateTime, ZonedDateTime}
import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, DateTimeParseException}
import java.time.temporal.ChronoField.*
import java.util.Locale
// TODO: account for closed and open intervals so like [x,y), (x,y), etc.
@calvinlfer
calvinlfer / ExampleApp.scala
Created March 23, 2019 17:31
Spark Direct Streaming Kafka
package com.experiments.calvin
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.{Seconds, StreamingContext}
object ExampleApp extends App {
@calvinlfer
calvinlfer / docker-compose.yaml
Created May 7, 2022 20:21
Single node Kafka docker-compose compatible with Docker on M1 Macs
version: "3.9"
services:
zookeeper:
restart: unless-stopped
image: ubuntu/zookeeper:latest
ports:
- "2181:2181"
kafka:
@calvinlfer
calvinlfer / pipeline.scala
Created May 3, 2022 14:12
Understanding ZChannels (zio-streams 2.x)
object Experiment extends ZIOAppDefault {
/**
Collect elements and emit them in single chunks of 100
*/
val aggregate: ZPipeline[Any, Nothing, Int, Long] = {
def go[Err](acc: Long): ZChannel[Any, Err, Chunk[Int], Any, Err, Chunk[Long], Any] =
ZChannel.readWith[Any, Err, Chunk[Int], Any, Err, Chunk[Long], Any](
in = { inChunk =>
val next = acc + inChunk.sum
if (next > 100L) {