Skip to content

Instantly share code, notes, and snippets.

View Algiras's full-sized avatar

Algimantas Krasauskas Algiras

  • Vilnius, Lithuania
View GitHub Profile
@Algiras
Algiras / SquaringSortedArray.scala
Created November 4, 2021 20:58
SquaringSortedArray
import scala.annotation.tailrec
object SquaringSortedArray {
/**
* Given a sorted array, create a new array containing squares of all the numbers of the input array in the sorted order.
*
* Solution should have O(N) time complexity
*
* @param input sorted array
package com.wixpress.slidingwindow
object FruitsIntoBaskets {
/**
*
* Given an array of characters where each character represents a fruit tree,
* you are given two baskets, and your goal is to put maximum number of fruits in each basket.
* The only restriction is that each basket can have only one type of fruit.
*
* You can start with any tree, but you can’t skip a tree once you have started.
cronService.stream.flatMap(
job => ZStream.fromEffect(actionLogTopic.record(ActionRequest(job.id)))
).runDrain.fork.toManaged_
def make(topic: Topic[CronulaEvent, CronulaEvent]) = for {
// create inMemory state
state <- Ref.make(CronulaSate.empty).toManaged_
// subscribe to records to update the state
stream <- topic.records
_ <- stream.flatMap(
event => ZStream.fromEffect(state.update(CronulaSate.process(event, _)))
).runDrain.fork.toManaged(_.interruptFork)
} yield new KafkaCron {
override def getAll = state.get.map(state => state.jobs.values.toSet)
trait Cron[F[_]] {
def getAll: F[Set[CronJob]]
def get(id: UUID): F[Either[NotFoundException.type, CronJob]]
def create(cronString: String): F[Either[ParseException.type, UUID]]
def update(id: UUID, cronString: String): F[Either[CronException, Unit]]
def delete(id: UUID): F[Unit]
}
// Cron Topic
(kafkaConfig: KafkaConfig) =>
Topic.makeKafkaTopic[CronulaEvent, CronulaEvent](kafkaConfig, _.cron, cronEventSerde, (_, event) => event)
// Action Topic
(kafkaConfig: KafkaConfig) =>
Topic.makeKafkaTopic[ActionRequest, Action](kafkaConfig, _.action, actionSerde, (id, req) => Action(id, req.issuerId))
def makeKafkaTopic[VReq, V](kafkaConfig: KafkaConfig,
getTopicName: Topics => String,
valueSerde: Serde[V],
fromRequest: (UUID, VReq) => V): ZManaged[ZEnv with GreyhoundMetrics, Throwable, Topic[VReq, V]] = {
val kafkaConfigStr = s"${kafkaConfig.host}:${kafkaConfig.port}"
// extracting topic name from kafka config
val topic = getTopicName(kafkaConfig.topics)
for {
// register producer to record kafka messages
def circeSerde[T](implicit decoder: Decoder[T], encoder: Encoder[T]): Serde[T] = new Serde[T] {
override def deserialize(topic: core.Topic, headers: Headers, data: Chunk[Byte]): Task[T] =
Serdes.StringSerde.deserialize(topic, headers, data).flatMap(str => ZIO.fromEither(decode[T](str)))
override def serialize(topic: String, value: T): Task[Chunk[Byte]] =
Serdes.StringSerde.serialize(topic, value.asJson.noSpaces)
}
// and now all Serdes can be expressed as
val keySerde: Serde[Key] = circeSerde[Key]
val cronEventSerde: Serde[CronulaEvent] = circeSerde[CronulaEvent]
// we also add concept of tenant for easier use of multiple instances running at the same time
case class KafkaConfig(host: String, port: Int, topics: Topics,
group: Option[String], tenantId: Option[UUID])
case class Key(tenantId: UUID, id: UUID)
case class Topics(cron: String, action: String)