Skip to content

Instantly share code, notes, and snippets.

@vitaliihonta
Last active October 3, 2023 16:45
Show Gist options
  • Save vitaliihonta/3b1d6ea96422caef5ea11af03281ad77 to your computer and use it in GitHub Desktop.
Save vitaliihonta/3b1d6ea96422caef5ea11af03281ad77 to your computer and use it in GitHub Desktop.
ZIO Temporal: Working with Workflows and Activities

ZIO Temporal: Working with Workflows and Activities

Explore the essential components of Temporal: Workflows and Activities. Try to unlock the potential of orchestration using Temporal!

This is the source code used in my article about Temporal Workflows and Activities. I hope you enjoy playing with it!

Before you start

The example requires installing Scala CLI. Follow the official documentation to install it https://scala-cli.virtuslab.org/install.
Refer to Scala CLI IDE support documentation to set up the example properly https://scala-cli.virtuslab.org/docs/guides/ide.
Once you install Scala CLI, follow the code and use the Scala CLI to execute it!

Running examples

How to run the Client application (ClientMain):

scala-cli run --main-class ClientMain workflow-activities.scala

How to run the Worker application (WorkerMain):

scala-cli run --main-class WorkerMain workflow-activities.scala
//> using scala 2.13
//> using dep dev.vhonta::zio-temporal-core:0.6.0
//> using dep dev.zio::zio:2.0.18
//> using dep dev.zio::zio-streams:2.0.18
//> using dep dev.zio::zio-nio:2.0.2
//> using dep dev.zio::zio-json:0.6.2
//> using dep dev.zio::zio-logging:2.1.14
//> using dep dev.zio::zio-logging-slf4j-bridge:2.1.14
//> using dep com.beachape::enumeratum:1.7.3
//> using dep com.google.api-client:google-api-client:2.2.0
//> using dep com.google.apis:google-api-services-youtube:v3-rev20230502-2.0.0
import enumeratum.{Enum, EnumEntry}
import java.time.LocalDateTime
import zio.json._
sealed trait ContentType extends EnumEntry
case object ContentType extends Enum[ContentType] {
// The Content Sync currently supports only Text and Video
case object Text extends ContentType
case object Video extends ContentType
override val values = findValues
// Define JSON serialization logic for enumeratum enums
implicit val jsonCodec: JsonCodec[ContentType] = {
JsonCodec(
JsonEncoder.string.contramap[ContentType](_.entryName),
JsonDecoder.string.mapOrFail(
ContentType.withNameEither(_).left.map(_.getMessage)
)
)
}
}
// The domain model for Content we pull and process
@jsonMemberNames(SnakeCase)
case class ContentFeedItem(
title: String,
description: Option[String],
url: String,
publishedAt: LocalDateTime,
contentType: ContentType)
// Define JSON serialization logic for the case class
object ContentFeedItem {
implicit val jsonCodec: JsonCodec[ContentFeedItem] =
DeriveJsonCodec.gen[ContentFeedItem]
}
// A lot of YouTube Java API imports
import com.google.api.services.youtube.model.{
ResourceId,
SearchResult,
SearchResultSnippet,
Subscription,
SubscriptionContentDetails,
SubscriptionSnippet
}
// ZIO imports
import zio._
import zio.stream._
// Other
import java.time.{LocalDateTime, Instant, ZoneOffset}
object YoutubeClient {
val make: ULayer[YoutubeClient] =
ZLayer.succeed(YoutubeClient())
}
// Dependencies doesn't matter ATM
case class YoutubeClient(/**/) {
def listSubscriptions(accessToken: String): Stream[Throwable, Subscription] = {
ZStream.range(1, 10).mapZIO(_ => makeRandomSubscription)
}
def channelVideos(
accessToken: String,
channelId: String,
minDate: LocalDateTime
): Stream[Throwable, SearchResult] = {
ZStream.range(1, 10).mapZIO(_ => makeRandomVideo(channelId))
}
// Random subscription generator...
private def makeRandomSubscription: UIO[Subscription] = {
for {
id <- Random.nextUUID
channelId <- Random.nextUUID
title <- Random.nextString(10)
itemsCount <- Random.nextIntBetween(10, 100)
} yield {
new Subscription()
.setId(id.toString)
.setSnippet(
new SubscriptionSnippet()
.setTitle(title)
.setResourceId(new ResourceId().setChannelId(channelId.toString))
)
.setContentDetails(
new SubscriptionContentDetails().setTotalItemCount(itemsCount)
)
}
}
// Random video generator...
private def makeRandomVideo(channelId: String): UIO[SearchResult] = {
for {
videoId <- Random.nextUUID
title <- Random.nextString(10)
} yield {
new SearchResult()
.setId(
new ResourceId()
.setVideoId(videoId.toString)
.setChannelId(channelId)
)
.setSnippet(
new SearchResultSnippet()
.setTitle(title)
.setDescription(s"Some description for video $videoId")
.setPublishedAt(
new com.google.api.client.util.DateTime(1668294000000L)
)
)
}
}
}
// YouTube Activities
// Input: how to fetch the videos
case class FetchVideosParams(
integrationId: Long,
minDate: LocalDateTime
)
// Output: fetched videos
case class FetchVideosResult(values: List[YoutubeSearchResult])
// Video item
case class YoutubeSearchResult(
videoId: String,
title: String,
description: Option[String],
publishedAt: LocalDateTime
)
import zio._
import zio.temporal._
import zio.temporal.activity._
@activityInterface
trait YoutubeActivities {
def fetchVideos(params: FetchVideosParams): FetchVideosResult
}
// Activity implementation
// Step 1: define an internal state as we're going to use
// while loop over the subscriptions
case class FetchVideosState(
subscriptionsLeft: List[YoutubeSubscription],
accumulator: FetchVideosResult
)
case class YoutubeSubscription(
channelId: String,
channelName: String
)
// Step 2: define the activity implementation
case class YoutubeActivitiesImpl(
// Pass dependencies (such as YouTube client) into the class constructor
youtubeClient: YoutubeClient
// This is required to run ZIO inside activities
)(implicit options: ZActivityRunOptions[Any])
extends YoutubeActivities {
// simple string for demonstration purposes
private val youtubeAccessToken = "Hey, let me in"
// We have to make pauses to avoid being rate-limited by YouTube API
private val pollInterval = 5.seconds
// Activity implementation
override def fetchVideos(params: FetchVideosParams): FetchVideosResult = {
// ZActivity.run "extracts" the value from ZIO
ZActivity.run {
for {
_ <- ZIO.logInfo(s"Fetching videos integration=${params.integrationId}")
// Fetching the subscriptions for the initial state
subscriptions <- youtubeClient
.listSubscriptions(youtubeAccessToken)
.runCollect
// Populate the initial state
initialState = createInitialState(subscriptions)
// Start the loop
result <- process(params)(initialState)
} yield result
}
}
private def createInitialState(
subscriptions: Chunk[Subscription]
): FetchVideosState = {
// Limit the number of subscriptions to reduce quota usage
val desiredSubscriptions = subscriptions
.sortBy(s =>
Option(
s.getContentDetails.getTotalItemCount.toLong
).getOrElse(0L)
)(Ordering[Long].reverse)
// Get top 5
.take(5)
.toList
FetchVideosState(
subscriptionsLeft = desiredSubscriptions.map {
subscription =>
YoutubeSubscription(
channelId = subscription.getSnippet
.getResourceId.getChannelId,
channelName = subscription.getSnippet.getTitle
)
},
accumulator = FetchVideosResult(values = Nil)
)
}
// Loop over subscriptions
private def process(params: FetchVideosParams)(
state: FetchVideosState
): Task[FetchVideosResult] = {
// Finish if no more subscriptions are left
if (state.subscriptionsLeft.isEmpty) {
ZIO.succeed(state.accumulator)
} else {
// Get the next subscription to process
val subscription :: rest = state.subscriptionsLeft
val channelId = subscription.channelId
for {
_ <- ZIO.logInfo(
s"Pulling channel=$channelId name=${subscription.channelName} (channels left: ${rest.size})"
)
// Fetch videos via API
videos <- youtubeClient
.channelVideos(
youtubeAccessToken,
channelId,
params.minDate
)
.runCollect
// Convert videos into our domain classes
convertedVideos = videos.map { result =>
YoutubeSearchResult(
videoId = result.getId.getVideoId,
title = result.getSnippet.getTitle,
description = Option(
result.getSnippet.getDescription
),
publishedAt = {
Instant
.ofEpochMilli(
result.getSnippet.getPublishedAt.getValue
)
.atOffset(ZoneOffset.UTC)
.toLocalDateTime
}
)
}
// Update the state for the next iteration
updatedState = state.copy(
subscriptionsLeft = rest,
accumulator = state.accumulator.copy(
values = state.accumulator.values ++ convertedVideos
)
)
// Take a break to avoid being rate-limited by YouTube API
_ <- ZIO.logInfo(s"Sleep for $pollInterval")
_ <- ZIO.sleep(pollInterval)
// Next iteration
result <- process(params)(updatedState)
} yield result
}
}
}
object YoutubeActivitiesImpl {
val make: URLayer[YoutubeClient with ZActivityRunOptions[Any], YoutubeActivities] =
ZLayer.fromFunction(
YoutubeActivitiesImpl(_: YoutubeClient)(_: ZActivityRunOptions[Any])
)
}
// Data Lake activities
// Input: list of videos to store
case class YoutubeVideosList(
values: List[YoutubeSearchResult]
)
// Input: where to store the data
case class StoreVideosParameters(
integrationId: Long,
datalakeOutputDir: String
)
// activity interface
@activityInterface
trait DatalakeActivities {
def storeVideos(videos: YoutubeVideosList, params: StoreVideosParameters): Unit
}
// For configuration
import java.net.URI
// For file writes
import java.io.IOException
import zio._
import zio.stream._
import zio.json._
import zio.nio.file.Files
import zio.nio.file.Path
// List all the dependencies and configuration
case class DatalakeActivitiesImpl(
youtubeBaseUri: URI
// ZActivityRunOptions to run ZIO
)(implicit options: ZActivityRunOptions[Any])
extends DatalakeActivities {
override def storeVideos(
videos: YoutubeVideosList,
params: StoreVideosParameters
): Unit = {
// Run ZIO code
ZActivity.run {
// To simplify writing, wrap the videos list into a ZStream
val contentFeedItemsStream = ZStream
.fromIterable(videos.values)
.map { video =>
ContentFeedItem(
title = video.title,
description = video.description,
url = youtubeBaseUri.toString + video.videoId,
publishedAt = video.publishedAt,
contentType = ContentType.Video
)
}
for {
_ <- ZIO.logInfo("Storing videos")
written <- writeStreamToJson(
contentFeedItemsStream = contentFeedItemsStream,
datalakeOutputDir = params.datalakeOutputDir,
integrationId = params.integrationId
)
_ <- ZIO.logInfo(s"Written $written videos")
} yield ()
}
}
// Writes the data stream as JSON lines files
private def writeStreamToJson(
contentFeedItemsStream: UStream[ContentFeedItem],
datalakeOutputDir: String,
integrationId: Long
): IO[IOException, Long] = {
// Writes a data chunk
def writeChunk(now: LocalDateTime)(
items: Chunk[ContentFeedItem]
): IO[IOException, Long] = {
ZIO.scoped {
for {
uuid <- ZIO.randomWith(_.nextUUID)
// Create pull directory if not exists
dir = Path(datalakeOutputDir) /
s"pulledDate=$now" /
s"integration=$integrationId"
_ <- Files.createDirectories(dir)
// Write to a JSON lines file
path = dir / s"pull-$uuid.jsonl"
_ <- Files.writeLines(path, lines = items.map(_.toJson))
// Return the number of objects written
} yield items.size
}
}
for {
now <- ZIO.clockWith(_.localDateTime)
written <- contentFeedItemsStream
// Write small chunks of data
.grouped(100)
.mapZIO(writeChunk(now))
// Calculate the total number of objects written
.runSum
} yield written
}
}
// For dependency injection
object DatalakeActivitiesImpl {
val make: URLayer[ZActivityRunOptions[Any], DatalakeActivities] =
// NOTE: it's better to read URI from a configuration file using ZIO Config capabilities
ZLayer.fromFunction(
DatalakeActivitiesImpl(new URI("https://www.youtube.com/watch?v="))(
_: ZActivityRunOptions[Any]
)
)
}
// YouTube Puller Workflow
// Input: what data to fetch and where to store
case class YoutubePullerParameters(
integrationId: Long,
minDate: LocalDateTime,
datalakeOutputDir: String
)
// Output: how much data was processed
case class PullingResult(processed: Long)
// Workflow Interface
import zio._
import zio.temporal._
import zio.temporal.workflow._
@workflowInterface
trait YoutubePullWorkflow {
@workflowMethod
def pull(params: YoutubePullerParameters): PullingResult
}
// Workflow Implementation
// Just extend the workflow interface
class YoutubePullWorkflowImpl extends YoutubePullWorkflow {
// Create a logger
private val logger = ZWorkflow.makeLogger
// Step 1: get the YoutubeActivities
private val youtubeActivities: ZActivityStub.Of[YoutubeActivities] =
ZWorkflow.newActivityStub[YoutubeActivities](
ZActivityOptions
// it may take long time to process...
.withStartToCloseTimeout(30.minutes)
.withRetryOptions(
ZRetryOptions.default
.withMaximumAttempts(5)
// bigger coefficient due to rate limiting on the YouTube side
.withBackoffCoefficient(3)
)
)
// Step 2: get the DatalakeActivities
private val datalakeActivities = ZWorkflow.newActivityStub[DatalakeActivities](
ZActivityOptions
.withStartToCloseTimeout(1.minute)
.withRetryOptions(
ZRetryOptions.default.withMaximumAttempts(5)
)
)
override def pull(params: YoutubePullerParameters): PullingResult = {
logger.info(
s"Getting videos integrationId=${params.integrationId} minDate=${params.minDate}"
)
// Step 3: execute YoutubeActivities.fetchVideos.
// Note that the method invocation is wrapped into ZActivityStub.execute
val videos = ZActivityStub.execute(
youtubeActivities.fetchVideos(
FetchVideosParams(
integrationId = params.integrationId,
minDate = params.minDate
)
)
)
if (videos.values.isEmpty) {
// No need to produce empty files if there is no input data
logger.info("No new videos found")
PullingResult(0)
} else {
val videosCount = videos.values.size
logger.info(s"Going to store $videosCount videos...")
// Step 4: execute DatalakeActivities.storeVideos.
ZActivityStub.execute(
datalakeActivities.storeVideos(
videos = YoutubeVideosList(videos.values),
params = StoreVideosParameters(
integrationId = params.integrationId,
datalakeOutputDir = params.datalakeOutputDir
)
)
)
PullingResult(videosCount)
}
}
}
// Client application
import zio.logging.slf4j.bridge.Slf4jBridge
// scala-cli run --main-class ClientMain workflow-activities.scala
object ClientMain extends ZIOAppDefault {
override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] =
Slf4jBridge.initialize
def run = {
val retryOptions = ZRetryOptions.default
// maximum retry attempts
.withMaximumAttempts(5)
// initial backoff interval
.withInitialInterval(1.second)
// exponential backoff coefficiant
.withBackoffCoefficient(1.2)
// do not retry certain errors
.withDoNotRetry(nameOf[IllegalArgumentException])
def workflowOptions(workflowId: String) = ZWorkflowOptions
.withWorkflowId(workflowId)
.withTaskQueue("youtube-pulling-queue")
.withWorkflowRunTimeout(20.minutes)
.withRetryOptions(retryOptions)
val createWorkflowStubZIO: URIO[ZWorkflowClient, ZWorkflowStub.Of[YoutubePullWorkflow]] =
ZIO.serviceWithZIO[ZWorkflowClient] { workflowClient =>
Random.nextUUID.flatMap { uuid =>
workflowClient.newWorkflowStub[YoutubePullWorkflow](
workflowOptions(workflowId = s"youtube/$uuid")
)
}
}
val startWorkflow: RIO[ZWorkflowClient, Unit] = ZIO.serviceWithZIO[ZWorkflowClient] { workflowClient =>
for {
// Random UUID will be used to generate workflow ID
uuid <- Random.nextUUID
// Step 1: create a workflow stub
youtubePullWorkflow <- workflowClient.newWorkflowStub[YoutubePullWorkflow](
workflowOptions(workflowId = s"youtube/$uuid")
)
// Step 2: start the workflow
_ <- ZWorkflowStub.start(
youtubePullWorkflow.pull(
// Provide input parameters
YoutubePullerParameters(
integrationId = 1,
minDate = LocalDateTime.of(2023, 1, 1, 0, 0),
datalakeOutputDir = "./datalake"
)
)
)
_ <- ZIO.logInfo("YouTube pull result workflow started!")
} yield ()
}
val clientProgram: Task[Unit] =
startWorkflow.provide(
// The client itself
ZWorkflowClient.make,
// Client's direct dependencies:
// 1. Client configuration
ZWorkflowClientOptions.make,
// 2. Workflow service stubs, responsible for all the RPC
ZWorkflowServiceStubs.make,
ZWorkflowServiceStubsOptions.make
)
clientProgram
}
}
// Worker application
import zio.temporal.worker._
// scala-cli run --main-class WorkerMain workflow-activities.scala
object WorkerMain extends ZIOAppDefault {
override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] =
Slf4jBridge.initialize
def run = {
// Note that activities' dependencies are propagated
val registerWorker: URIO[ZWorkerFactory with YoutubeClient with ZActivityRunOptions[Any] with Scope, ZWorker] =
ZWorkerFactory.newWorker("youtube-pulling-queue") @@
// Register workflow
ZWorker.addWorkflow[YoutubePullWorkflow].from(new YoutubePullWorkflowImpl) @@
// Register activity implementations
ZWorker.addActivityImplementationLayer(YoutubeActivitiesImpl.make) @@
ZWorker.addActivityImplementationLayer(DatalakeActivitiesImpl.make)
val workerProcess =
for {
_ <- registerWorker
// Setup the internal transport
_ <- ZWorkflowServiceStubs.setup()
// blocks forever while the program is alive
_ <- ZWorkerFactory.serve
} yield ()
val workerProgram: RIO[Scope, Unit] =
workerProcess.provideSome[Scope](
// Worker factory itself
ZWorkerFactory.make,
// Worker factory configuration
ZWorkerFactoryOptions.make,
// It requires the workflow client
ZWorkflowClient.make,
ZWorkflowClientOptions.make,
// ...as well as the workflow service stubs
ZWorkflowServiceStubs.make,
ZWorkflowServiceStubsOptions.make,
// Activities dependencies
YoutubeClient.make,
ZActivityRunOptions.default
)
workerProgram
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment