Skip to content

Instantly share code, notes, and snippets.

package com.wix.ecom
import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest
import com.netflix.conductor.sdk.workflow.task.WorkerTask
import io.orkes.conductor.client.http.{OrkesWorkflowClient}
import io.orkes.conductor.client.{ApiClient, WorkflowClient}
import java.util.UUID
import scala.jdk.CollectionConverters.MapHasAsJava
object TrackingExample extends scala.App {
def run: URIO[zio.ZEnv with FiberTracking, ExitCode] =
for {
_ <- runSomePeriodicJob.repeat(Schedule.spaced(1.seconds)).forkDaemon
fibers <- FiberTracking.fibers
fibersPretty <- Fiber.dumpStr(fibers:_*)
_ <- console.putStrLn(s">>>> tracking: $fibersPretty")
_ <- console.putStrLn("do other stuff").repeat(Schedule.spaced(1.seconds))
} yield ExitCode(0)
object RuntimeOps {
implicit class RuntimeEnvOps[R <: Has[_]](val runtime: zio.Runtime[R]) extends AnyVal {
def withFiberTracking(implicit tag: zio.Tag[R]): zio.Runtime[R with FiberTracking] = {
val supervisor = runtime.unsafeRunTask(Supervisor.track(weak = false))
runtime
.mapPlatform(_.withSupervisor(supervisor))
.map(_.union[FiberTracking](FiberTracking.make(supervisor)))
}
}
}
object FiberTracking {
type FiberTracking = Has[FiberTracking.Service]
trait Service {
def fibers: UIO[Chunk[Fiber.Runtime[Any, Any]]]
}
def fibers: URIO[FiberTracking, Chunk[Fiber.Runtime[Any, Any]]] =
ZIO.accessM(_.get.fibers)
object InterruptExample extends App {
import zio._
import zio.duration._
import java.time.LocalTime
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
for {
fiber <- runSomeLongJob
.ensuring(printWithTime("finalizing job!"))
.forkDaemon
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
for {
fiber <- runSomeLongJob
.ensuring(printWithTime("finalizing job!"))
.forkDaemon
_ <- printWithTime("do stuff").repeat(Schedule.spaced(1.seconds) && Schedule.recurs(2))
timeout <- fiber.join.resurrect.ignore.disconnect.timeout(5.seconds)
_ <- ZIO.when(timeout.isEmpty)(fiber.interruptFork)
_ <- printWithTime("do other stuff").repeat(Schedule.spaced(1.seconds))
} yield ExitCode(0)
runSomePeriodicJob
.tapCause(c => console.putStrLn(s"job failed with $c. extra information..."))
.repeat(Schedule.spaced(1.seconds)).forkDaemon
runSomePeriodicJob
.catchAllCause(cause => console.putStrLn(s"job failed with $cause"))
.repeat(Schedule.spaced(1.seconds)).forkDaemon
object DefectNotHandledExample extends App {
import zio._
import zio.duration._
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
for {
_ <- runSomePeriodicJob
.catchAll(e => console.putStrLn(s"job failed with $e"))
.repeat(Schedule.spaced(1.seconds))
.forkDaemon
package com.wix.cronulla.example;
import com.wix.cronulla.example.GetTasks;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import java.util.concurrent.ExecutorService;
public class LeaderNodeCronScheduler extends LeaderSelectorListenerAdapter {