Skip to content

Instantly share code, notes, and snippets.

@otobrglez
Created January 30, 2023 11:32
Show Gist options
  • Save otobrglez/03708ea50e811faabe80da4a0edd5931 to your computer and use it in GitHub Desktop.
Save otobrglez/03708ea50e811faabe80da4a0edd5931 to your computer and use it in GitHub Desktop.
Delayed ZIO tasks with Jesque / Sidekiq and Redis
// Oto Brglez - <otobrglez@gmail.com>
import net.greghaines.jesque.client.{Client as JesqueClient, ClientImpl}
import net.greghaines.jesque.worker.{JobFactory, MapBasedJobFactory, WorkerImpl}
import net.greghaines.jesque.{Config as JesqueConfig, ConfigBuilder, Job as JesqueJob}
import zio.*
import zio.Clock.currentTime
import zio.Console.printLine
import zio.ZIO.{acquireRelease, attempt, blocking, fail, logDebug, logError, logInfo, service, succeed}
import zio.stream.ZStream.fromQueue
import java.net.URI
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.*
import scala.jdk.CollectionConverters.*
trait DelayedTask:
self =>
def run: ZIO[DelayedClient, Throwable, Unit]
def enqueue(
queueName: String = "default",
delay: Option[FiniteDuration] = None
): ZIO[DelayedClient, Throwable, Unit] =
ZIO.service[DelayedClient].flatMap(_.enqueue(self, queueName, delay))
final class DelayedClient(private val client: JesqueClient):
def enqueue[T <: DelayedTask](
task: T,
queueName: String = "default",
delay: Option[FiniteDuration] = None
): Task[Unit] =
attempt(new JesqueJob(task.getClass.getName, readVars(task).asJava))
.map(job => (job, delay))
.tap((job, delay) => logDebug(s"Encoded $job with delay $delay for \"$queueName\""))
.flatMap {
case (job, Some(delay)) =>
currentTime(TimeUnit.MILLISECONDS)
.map(_ + delay.toSeconds * 1000)
.flatMap(future => attempt(client.delayedEnqueue(queueName, job, future)))
case (job, _) => attempt(client.enqueue(queueName, job))
}
private def readVars[T <: DelayedTask](task: T): Map[String, Any] =
task.getClass.getDeclaredFields
.tapEach(_.setAccessible(true))
.foldLeft(Map.empty)((a, f) => a + (f.getName -> f.get(task)))
object DelayedClient:
def mkClient: ZIO[zio.Scope, Throwable, DelayedClient] =
configFromEnv.flatMap { config =>
acquireRelease(attempt(ClientImpl(config)) <* logInfo("Client booted."))(client => attempt(client.end()).orDie)
.map(c => DelayedClient(c))
}
val live: ZLayer[Scope, Throwable, DelayedClient] = ZLayer.fromZIO(mkClient)
def mkWorker(
queueNames: String*
)(received: JesqueJob => Unit): ZIO[Scope, Throwable, WorkerImpl] =
configFromEnv.flatMap { config =>
acquireRelease(
attempt(
WorkerImpl(
config,
queueNames.asJava,
new JobFactory { override def materializeJob(job: JesqueJob): Unit = received(job); }
)
) <* logInfo("Worker booted.")
)(w => attempt(w.end(true)).orDie)
}
private def mkListenerThread(worker: WorkerImpl): ZIO[Scope, Throwable, Thread] =
acquireRelease(blocking(attempt(new Thread(worker))))(t => attempt(t.join()).orDie)
def workOn(queueNames: String*): ZIO[Scope with DelayedClient, Throwable, Unit] =
for
jobs <- Queue.unbounded[JesqueJob]
workFib <- fromQueue(jobs)
.mapZIO(taskFromJob)
.mapZIOParUnordered(4)(_.run.catchAll(th => logError(s"Worker crashed with ${th}")))
.runDrain
.forever
.fork
worker <- mkWorker(queueNames: _*) { job =>
// TODO: This is not nice. ZIO.async could be explored.
Unsafe.unsafe { (unsafe: Unsafe) =>
given u: Unsafe = unsafe
Runtime.default.unsafe.run(jobs.offer(job))
}
}
listenerFib <- mkListenerThread(worker).map(_.run()).fork
_ <- workFib.join
_ <- listenerFib.join
yield ()
private def taskFromJob(job: JesqueJob): ZIO[Any, Throwable, DelayedTask] =
for
constructor <- attempt(Class.forName(job.getClassName).getConstructors.head)
values <- attempt(job.getVars.values.asScala.toList)
instance <- attempt(constructor.newInstance(values: _*).asInstanceOf[DelayedTask])
yield instance
private def configFromEnv: Task[JesqueConfig] =
System.env("REDIS_URL").orDie.flatMap {
case None => fail(new RuntimeException("Missing REDIS_URL"))
case Some(url) =>
for
uri <- attempt(new URI(url))
config <- attempt {
val cb = new ConfigBuilder()
if (uri.getHost != null) cb.withHost(uri.getHost)
if (uri.getPort > -1) cb.withPort(uri.getPort)
val redisUserInfo = uri.getUserInfo
if (redisUserInfo != null) cb.withPassword(redisUserInfo.split(":", 2)(1))
cb.build()
}
yield config
}
case class ReverseString(raw: String) extends DelayedTask:
def run = printLine(s"Reversing \"$raw\" to \"${raw.reverse}\"")
case class CountToZero(n: Int) extends DelayedTask:
def run =
for
_ <- printLine(s"n: $n")
_ <- if n - 1 >= 0 then CountToZero(n - 1).enqueue(delay = Some(1.second)) else ZIO.unit
yield ()
object DelayedTaskApp extends ZIOAppDefault:
def clientProgram: ZIO[DelayedClient, Throwable, Unit] =
for
_ <- logInfo("Booting program. 👋")
_ <- ReverseString("Hello world from delayed task").enqueue()
_ <- CountToZero(10).enqueue()
yield ()
def workerProgram: ZIO[Scope with DelayedClient, Throwable, Unit] =
DelayedClient.workOn("default", "delayed", "recurring")
def program = clientProgram <&> workerProgram
def run = program.provide(Scope.default, DelayedClient.live)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment