Created
January 30, 2023 11:32
-
-
Save otobrglez/03708ea50e811faabe80da4a0edd5931 to your computer and use it in GitHub Desktop.
Delayed ZIO tasks with Jesque / Sidekiq and Redis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Oto Brglez - <otobrglez@gmail.com> | |
import net.greghaines.jesque.client.{Client as JesqueClient, ClientImpl} | |
import net.greghaines.jesque.worker.{JobFactory, MapBasedJobFactory, WorkerImpl} | |
import net.greghaines.jesque.{Config as JesqueConfig, ConfigBuilder, Job as JesqueJob} | |
import zio.* | |
import zio.Clock.currentTime | |
import zio.Console.printLine | |
import zio.ZIO.{acquireRelease, attempt, blocking, fail, logDebug, logError, logInfo, service, succeed} | |
import zio.stream.ZStream.fromQueue | |
import java.net.URI | |
import java.util.concurrent.TimeUnit | |
import scala.concurrent.duration.* | |
import scala.jdk.CollectionConverters.* | |
trait DelayedTask: | |
self => | |
def run: ZIO[DelayedClient, Throwable, Unit] | |
def enqueue( | |
queueName: String = "default", | |
delay: Option[FiniteDuration] = None | |
): ZIO[DelayedClient, Throwable, Unit] = | |
ZIO.service[DelayedClient].flatMap(_.enqueue(self, queueName, delay)) | |
final class DelayedClient(private val client: JesqueClient): | |
def enqueue[T <: DelayedTask]( | |
task: T, | |
queueName: String = "default", | |
delay: Option[FiniteDuration] = None | |
): Task[Unit] = | |
attempt(new JesqueJob(task.getClass.getName, readVars(task).asJava)) | |
.map(job => (job, delay)) | |
.tap((job, delay) => logDebug(s"Encoded $job with delay $delay for \"$queueName\"")) | |
.flatMap { | |
case (job, Some(delay)) => | |
currentTime(TimeUnit.MILLISECONDS) | |
.map(_ + delay.toSeconds * 1000) | |
.flatMap(future => attempt(client.delayedEnqueue(queueName, job, future))) | |
case (job, _) => attempt(client.enqueue(queueName, job)) | |
} | |
private def readVars[T <: DelayedTask](task: T): Map[String, Any] = | |
task.getClass.getDeclaredFields | |
.tapEach(_.setAccessible(true)) | |
.foldLeft(Map.empty)((a, f) => a + (f.getName -> f.get(task))) | |
object DelayedClient: | |
def mkClient: ZIO[zio.Scope, Throwable, DelayedClient] = | |
configFromEnv.flatMap { config => | |
acquireRelease(attempt(ClientImpl(config)) <* logInfo("Client booted."))(client => attempt(client.end()).orDie) | |
.map(c => DelayedClient(c)) | |
} | |
val live: ZLayer[Scope, Throwable, DelayedClient] = ZLayer.fromZIO(mkClient) | |
def mkWorker( | |
queueNames: String* | |
)(received: JesqueJob => Unit): ZIO[Scope, Throwable, WorkerImpl] = | |
configFromEnv.flatMap { config => | |
acquireRelease( | |
attempt( | |
WorkerImpl( | |
config, | |
queueNames.asJava, | |
new JobFactory { override def materializeJob(job: JesqueJob): Unit = received(job); } | |
) | |
) <* logInfo("Worker booted.") | |
)(w => attempt(w.end(true)).orDie) | |
} | |
private def mkListenerThread(worker: WorkerImpl): ZIO[Scope, Throwable, Thread] = | |
acquireRelease(blocking(attempt(new Thread(worker))))(t => attempt(t.join()).orDie) | |
def workOn(queueNames: String*): ZIO[Scope with DelayedClient, Throwable, Unit] = | |
for | |
jobs <- Queue.unbounded[JesqueJob] | |
workFib <- fromQueue(jobs) | |
.mapZIO(taskFromJob) | |
.mapZIOParUnordered(4)(_.run.catchAll(th => logError(s"Worker crashed with ${th}"))) | |
.runDrain | |
.forever | |
.fork | |
worker <- mkWorker(queueNames: _*) { job => | |
// TODO: This is not nice. ZIO.async could be explored. | |
Unsafe.unsafe { (unsafe: Unsafe) => | |
given u: Unsafe = unsafe | |
Runtime.default.unsafe.run(jobs.offer(job)) | |
} | |
} | |
listenerFib <- mkListenerThread(worker).map(_.run()).fork | |
_ <- workFib.join | |
_ <- listenerFib.join | |
yield () | |
private def taskFromJob(job: JesqueJob): ZIO[Any, Throwable, DelayedTask] = | |
for | |
constructor <- attempt(Class.forName(job.getClassName).getConstructors.head) | |
values <- attempt(job.getVars.values.asScala.toList) | |
instance <- attempt(constructor.newInstance(values: _*).asInstanceOf[DelayedTask]) | |
yield instance | |
private def configFromEnv: Task[JesqueConfig] = | |
System.env("REDIS_URL").orDie.flatMap { | |
case None => fail(new RuntimeException("Missing REDIS_URL")) | |
case Some(url) => | |
for | |
uri <- attempt(new URI(url)) | |
config <- attempt { | |
val cb = new ConfigBuilder() | |
if (uri.getHost != null) cb.withHost(uri.getHost) | |
if (uri.getPort > -1) cb.withPort(uri.getPort) | |
val redisUserInfo = uri.getUserInfo | |
if (redisUserInfo != null) cb.withPassword(redisUserInfo.split(":", 2)(1)) | |
cb.build() | |
} | |
yield config | |
} | |
case class ReverseString(raw: String) extends DelayedTask: | |
def run = printLine(s"Reversing \"$raw\" to \"${raw.reverse}\"") | |
case class CountToZero(n: Int) extends DelayedTask: | |
def run = | |
for | |
_ <- printLine(s"n: $n") | |
_ <- if n - 1 >= 0 then CountToZero(n - 1).enqueue(delay = Some(1.second)) else ZIO.unit | |
yield () | |
object DelayedTaskApp extends ZIOAppDefault: | |
def clientProgram: ZIO[DelayedClient, Throwable, Unit] = | |
for | |
_ <- logInfo("Booting program. 👋") | |
_ <- ReverseString("Hello world from delayed task").enqueue() | |
_ <- CountToZero(10).enqueue() | |
yield () | |
def workerProgram: ZIO[Scope with DelayedClient, Throwable, Unit] = | |
DelayedClient.workOn("default", "delayed", "recurring") | |
def program = clientProgram <&> workerProgram | |
def run = program.provide(Scope.default, DelayedClient.live) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment