|
//> using scala 2.13 |
|
//> using dep dev.vhonta::zio-temporal-core:0.6.0 |
|
//> using dep dev.zio::zio:2.0.18 |
|
//> using dep dev.zio::zio-streams:2.0.18 |
|
//> using dep dev.zio::zio-nio:2.0.2 |
|
//> using dep dev.zio::zio-json:0.6.2 |
|
//> using dep dev.zio::zio-logging:2.1.14 |
|
//> using dep dev.zio::zio-logging-slf4j-bridge:2.1.14 |
|
//> using dep com.beachape::enumeratum:1.7.3 |
|
//> using dep com.google.api-client:google-api-client:2.2.0 |
|
//> using dep com.google.apis:google-api-services-youtube:v3-rev20230502-2.0.0 |
|
|
|
import enumeratum.{Enum, EnumEntry} |
|
import java.time.LocalDateTime |
|
import zio.json._ |
|
|
|
sealed trait ContentType extends EnumEntry |
|
case object ContentType extends Enum[ContentType] { |
|
// The Content Sync currently supports only Text and Video |
|
case object Text extends ContentType |
|
case object Video extends ContentType |
|
|
|
override val values = findValues |
|
|
|
// Define JSON serialization logic for enumeratum enums |
|
implicit val jsonCodec: JsonCodec[ContentType] = { |
|
JsonCodec( |
|
JsonEncoder.string.contramap[ContentType](_.entryName), |
|
JsonDecoder.string.mapOrFail( |
|
ContentType.withNameEither(_).left.map(_.getMessage) |
|
) |
|
) |
|
} |
|
} |
|
|
|
|
|
// The domain model for Content we pull and process |
|
@jsonMemberNames(SnakeCase) |
|
case class ContentFeedItem( |
|
title: String, |
|
description: Option[String], |
|
url: String, |
|
publishedAt: LocalDateTime, |
|
contentType: ContentType) |
|
|
|
// Define JSON serialization logic for the case class |
|
object ContentFeedItem { |
|
implicit val jsonCodec: JsonCodec[ContentFeedItem] = |
|
DeriveJsonCodec.gen[ContentFeedItem] |
|
} |
|
|
|
// A lot of YouTube Java API imports |
|
import com.google.api.services.youtube.model.{ |
|
ResourceId, |
|
SearchResult, |
|
SearchResultSnippet, |
|
Subscription, |
|
SubscriptionContentDetails, |
|
SubscriptionSnippet |
|
} |
|
// ZIO imports |
|
import zio._ |
|
import zio.stream._ |
|
// Other |
|
import java.time.{LocalDateTime, Instant, ZoneOffset} |
|
|
|
object YoutubeClient { |
|
val make: ULayer[YoutubeClient] = |
|
ZLayer.succeed(YoutubeClient()) |
|
} |
|
|
|
// Dependencies doesn't matter ATM |
|
case class YoutubeClient(/**/) { |
|
|
|
def listSubscriptions(accessToken: String): Stream[Throwable, Subscription] = { |
|
ZStream.range(1, 10).mapZIO(_ => makeRandomSubscription) |
|
} |
|
|
|
def channelVideos( |
|
accessToken: String, |
|
channelId: String, |
|
minDate: LocalDateTime |
|
): Stream[Throwable, SearchResult] = { |
|
ZStream.range(1, 10).mapZIO(_ => makeRandomVideo(channelId)) |
|
} |
|
|
|
// Random subscription generator... |
|
private def makeRandomSubscription: UIO[Subscription] = { |
|
for { |
|
id <- Random.nextUUID |
|
channelId <- Random.nextUUID |
|
title <- Random.nextString(10) |
|
itemsCount <- Random.nextIntBetween(10, 100) |
|
} yield { |
|
new Subscription() |
|
.setId(id.toString) |
|
.setSnippet( |
|
new SubscriptionSnippet() |
|
.setTitle(title) |
|
.setResourceId(new ResourceId().setChannelId(channelId.toString)) |
|
) |
|
.setContentDetails( |
|
new SubscriptionContentDetails().setTotalItemCount(itemsCount) |
|
) |
|
} |
|
} |
|
|
|
// Random video generator... |
|
private def makeRandomVideo(channelId: String): UIO[SearchResult] = { |
|
for { |
|
videoId <- Random.nextUUID |
|
title <- Random.nextString(10) |
|
} yield { |
|
new SearchResult() |
|
.setId( |
|
new ResourceId() |
|
.setVideoId(videoId.toString) |
|
.setChannelId(channelId) |
|
) |
|
.setSnippet( |
|
new SearchResultSnippet() |
|
.setTitle(title) |
|
.setDescription(s"Some description for video $videoId") |
|
.setPublishedAt( |
|
new com.google.api.client.util.DateTime(1668294000000L) |
|
) |
|
) |
|
} |
|
} |
|
} |
|
|
|
// YouTube Activities |
|
// Input: how to fetch the videos |
|
case class FetchVideosParams( |
|
integrationId: Long, |
|
minDate: LocalDateTime |
|
) |
|
|
|
// Output: fetched videos |
|
case class FetchVideosResult(values: List[YoutubeSearchResult]) |
|
// Video item |
|
case class YoutubeSearchResult( |
|
videoId: String, |
|
title: String, |
|
description: Option[String], |
|
publishedAt: LocalDateTime |
|
) |
|
|
|
import zio._ |
|
import zio.temporal._ |
|
import zio.temporal.activity._ |
|
|
|
@activityInterface |
|
trait YoutubeActivities { |
|
def fetchVideos(params: FetchVideosParams): FetchVideosResult |
|
} |
|
|
|
// Activity implementation |
|
// Step 1: define an internal state as we're going to use |
|
// while loop over the subscriptions |
|
case class FetchVideosState( |
|
subscriptionsLeft: List[YoutubeSubscription], |
|
accumulator: FetchVideosResult |
|
) |
|
|
|
case class YoutubeSubscription( |
|
channelId: String, |
|
channelName: String |
|
) |
|
|
|
// Step 2: define the activity implementation |
|
case class YoutubeActivitiesImpl( |
|
// Pass dependencies (such as YouTube client) into the class constructor |
|
youtubeClient: YoutubeClient |
|
// This is required to run ZIO inside activities |
|
)(implicit options: ZActivityRunOptions[Any]) |
|
extends YoutubeActivities { |
|
|
|
// simple string for demonstration purposes |
|
private val youtubeAccessToken = "Hey, let me in" |
|
|
|
// We have to make pauses to avoid being rate-limited by YouTube API |
|
private val pollInterval = 5.seconds |
|
|
|
// Activity implementation |
|
override def fetchVideos(params: FetchVideosParams): FetchVideosResult = { |
|
// ZActivity.run "extracts" the value from ZIO |
|
ZActivity.run { |
|
for { |
|
_ <- ZIO.logInfo(s"Fetching videos integration=${params.integrationId}") |
|
// Fetching the subscriptions for the initial state |
|
subscriptions <- youtubeClient |
|
.listSubscriptions(youtubeAccessToken) |
|
.runCollect |
|
// Populate the initial state |
|
initialState = createInitialState(subscriptions) |
|
// Start the loop |
|
result <- process(params)(initialState) |
|
} yield result |
|
} |
|
} |
|
|
|
private def createInitialState( |
|
subscriptions: Chunk[Subscription] |
|
): FetchVideosState = { |
|
// Limit the number of subscriptions to reduce quota usage |
|
val desiredSubscriptions = subscriptions |
|
.sortBy(s => |
|
Option( |
|
s.getContentDetails.getTotalItemCount.toLong |
|
).getOrElse(0L) |
|
)(Ordering[Long].reverse) |
|
// Get top 5 |
|
.take(5) |
|
.toList |
|
|
|
FetchVideosState( |
|
subscriptionsLeft = desiredSubscriptions.map { |
|
subscription => |
|
YoutubeSubscription( |
|
channelId = subscription.getSnippet |
|
.getResourceId.getChannelId, |
|
channelName = subscription.getSnippet.getTitle |
|
) |
|
}, |
|
accumulator = FetchVideosResult(values = Nil) |
|
) |
|
} |
|
|
|
// Loop over subscriptions |
|
private def process(params: FetchVideosParams)( |
|
state: FetchVideosState |
|
): Task[FetchVideosResult] = { |
|
// Finish if no more subscriptions are left |
|
if (state.subscriptionsLeft.isEmpty) { |
|
ZIO.succeed(state.accumulator) |
|
} else { |
|
// Get the next subscription to process |
|
val subscription :: rest = state.subscriptionsLeft |
|
val channelId = subscription.channelId |
|
|
|
for { |
|
_ <- ZIO.logInfo( |
|
s"Pulling channel=$channelId name=${subscription.channelName} (channels left: ${rest.size})" |
|
) |
|
// Fetch videos via API |
|
videos <- youtubeClient |
|
.channelVideos( |
|
youtubeAccessToken, |
|
channelId, |
|
params.minDate |
|
) |
|
.runCollect |
|
|
|
// Convert videos into our domain classes |
|
convertedVideos = videos.map { result => |
|
YoutubeSearchResult( |
|
videoId = result.getId.getVideoId, |
|
title = result.getSnippet.getTitle, |
|
description = Option( |
|
result.getSnippet.getDescription |
|
), |
|
publishedAt = { |
|
Instant |
|
.ofEpochMilli( |
|
result.getSnippet.getPublishedAt.getValue |
|
) |
|
.atOffset(ZoneOffset.UTC) |
|
.toLocalDateTime |
|
} |
|
) |
|
} |
|
|
|
// Update the state for the next iteration |
|
updatedState = state.copy( |
|
subscriptionsLeft = rest, |
|
accumulator = state.accumulator.copy( |
|
values = state.accumulator.values ++ convertedVideos |
|
) |
|
) |
|
// Take a break to avoid being rate-limited by YouTube API |
|
_ <- ZIO.logInfo(s"Sleep for $pollInterval") |
|
_ <- ZIO.sleep(pollInterval) |
|
// Next iteration |
|
result <- process(params)(updatedState) |
|
} yield result |
|
} |
|
} |
|
} |
|
|
|
object YoutubeActivitiesImpl { |
|
val make: URLayer[YoutubeClient with ZActivityRunOptions[Any], YoutubeActivities] = |
|
ZLayer.fromFunction( |
|
YoutubeActivitiesImpl(_: YoutubeClient)(_: ZActivityRunOptions[Any]) |
|
) |
|
} |
|
|
|
// Data Lake activities |
|
// Input: list of videos to store |
|
case class YoutubeVideosList( |
|
values: List[YoutubeSearchResult] |
|
) |
|
|
|
// Input: where to store the data |
|
case class StoreVideosParameters( |
|
integrationId: Long, |
|
datalakeOutputDir: String |
|
) |
|
|
|
// activity interface |
|
@activityInterface |
|
trait DatalakeActivities { |
|
def storeVideos(videos: YoutubeVideosList, params: StoreVideosParameters): Unit |
|
} |
|
|
|
// For configuration |
|
import java.net.URI |
|
// For file writes |
|
import java.io.IOException |
|
import zio._ |
|
import zio.stream._ |
|
import zio.json._ |
|
import zio.nio.file.Files |
|
import zio.nio.file.Path |
|
|
|
|
|
// List all the dependencies and configuration |
|
case class DatalakeActivitiesImpl( |
|
youtubeBaseUri: URI |
|
// ZActivityRunOptions to run ZIO |
|
)(implicit options: ZActivityRunOptions[Any]) |
|
extends DatalakeActivities { |
|
|
|
override def storeVideos( |
|
videos: YoutubeVideosList, |
|
params: StoreVideosParameters |
|
): Unit = { |
|
// Run ZIO code |
|
ZActivity.run { |
|
// To simplify writing, wrap the videos list into a ZStream |
|
val contentFeedItemsStream = ZStream |
|
.fromIterable(videos.values) |
|
.map { video => |
|
ContentFeedItem( |
|
title = video.title, |
|
description = video.description, |
|
url = youtubeBaseUri.toString + video.videoId, |
|
publishedAt = video.publishedAt, |
|
contentType = ContentType.Video |
|
) |
|
} |
|
|
|
for { |
|
_ <- ZIO.logInfo("Storing videos") |
|
written <- writeStreamToJson( |
|
contentFeedItemsStream = contentFeedItemsStream, |
|
datalakeOutputDir = params.datalakeOutputDir, |
|
integrationId = params.integrationId |
|
) |
|
_ <- ZIO.logInfo(s"Written $written videos") |
|
} yield () |
|
} |
|
} |
|
|
|
// Writes the data stream as JSON lines files |
|
private def writeStreamToJson( |
|
contentFeedItemsStream: UStream[ContentFeedItem], |
|
datalakeOutputDir: String, |
|
integrationId: Long |
|
): IO[IOException, Long] = { |
|
|
|
// Writes a data chunk |
|
def writeChunk(now: LocalDateTime)( |
|
items: Chunk[ContentFeedItem] |
|
): IO[IOException, Long] = { |
|
ZIO.scoped { |
|
for { |
|
uuid <- ZIO.randomWith(_.nextUUID) |
|
// Create pull directory if not exists |
|
dir = Path(datalakeOutputDir) / |
|
s"pulledDate=$now" / |
|
s"integration=$integrationId" |
|
_ <- Files.createDirectories(dir) |
|
|
|
// Write to a JSON lines file |
|
path = dir / s"pull-$uuid.jsonl" |
|
_ <- Files.writeLines(path, lines = items.map(_.toJson)) |
|
// Return the number of objects written |
|
} yield items.size |
|
} |
|
} |
|
|
|
for { |
|
now <- ZIO.clockWith(_.localDateTime) |
|
written <- contentFeedItemsStream |
|
// Write small chunks of data |
|
.grouped(100) |
|
.mapZIO(writeChunk(now)) |
|
// Calculate the total number of objects written |
|
.runSum |
|
} yield written |
|
} |
|
} |
|
|
|
// For dependency injection |
|
object DatalakeActivitiesImpl { |
|
val make: URLayer[ZActivityRunOptions[Any], DatalakeActivities] = |
|
// NOTE: it's better to read URI from a configuration file using ZIO Config capabilities |
|
ZLayer.fromFunction( |
|
DatalakeActivitiesImpl(new URI("https://www.youtube.com/watch?v="))( |
|
_: ZActivityRunOptions[Any] |
|
) |
|
) |
|
} |
|
|
|
// YouTube Puller Workflow |
|
// Input: what data to fetch and where to store |
|
case class YoutubePullerParameters( |
|
integrationId: Long, |
|
minDate: LocalDateTime, |
|
datalakeOutputDir: String |
|
) |
|
|
|
// Output: how much data was processed |
|
case class PullingResult(processed: Long) |
|
|
|
// Workflow Interface |
|
import zio._ |
|
import zio.temporal._ |
|
import zio.temporal.workflow._ |
|
|
|
@workflowInterface |
|
trait YoutubePullWorkflow { |
|
@workflowMethod |
|
def pull(params: YoutubePullerParameters): PullingResult |
|
} |
|
|
|
// Workflow Implementation |
|
// Just extend the workflow interface |
|
class YoutubePullWorkflowImpl extends YoutubePullWorkflow { |
|
// Create a logger |
|
private val logger = ZWorkflow.makeLogger |
|
|
|
// Step 1: get the YoutubeActivities |
|
private val youtubeActivities: ZActivityStub.Of[YoutubeActivities] = |
|
ZWorkflow.newActivityStub[YoutubeActivities]( |
|
ZActivityOptions |
|
// it may take long time to process... |
|
.withStartToCloseTimeout(30.minutes) |
|
.withRetryOptions( |
|
ZRetryOptions.default |
|
.withMaximumAttempts(5) |
|
// bigger coefficient due to rate limiting on the YouTube side |
|
.withBackoffCoefficient(3) |
|
) |
|
) |
|
|
|
// Step 2: get the DatalakeActivities |
|
private val datalakeActivities = ZWorkflow.newActivityStub[DatalakeActivities]( |
|
ZActivityOptions |
|
.withStartToCloseTimeout(1.minute) |
|
.withRetryOptions( |
|
ZRetryOptions.default.withMaximumAttempts(5) |
|
) |
|
) |
|
|
|
override def pull(params: YoutubePullerParameters): PullingResult = { |
|
logger.info( |
|
s"Getting videos integrationId=${params.integrationId} minDate=${params.minDate}" |
|
) |
|
// Step 3: execute YoutubeActivities.fetchVideos. |
|
// Note that the method invocation is wrapped into ZActivityStub.execute |
|
val videos = ZActivityStub.execute( |
|
youtubeActivities.fetchVideos( |
|
FetchVideosParams( |
|
integrationId = params.integrationId, |
|
minDate = params.minDate |
|
) |
|
) |
|
) |
|
|
|
if (videos.values.isEmpty) { |
|
// No need to produce empty files if there is no input data |
|
logger.info("No new videos found") |
|
PullingResult(0) |
|
} else { |
|
val videosCount = videos.values.size |
|
logger.info(s"Going to store $videosCount videos...") |
|
// Step 4: execute DatalakeActivities.storeVideos. |
|
ZActivityStub.execute( |
|
datalakeActivities.storeVideos( |
|
videos = YoutubeVideosList(videos.values), |
|
params = StoreVideosParameters( |
|
integrationId = params.integrationId, |
|
datalakeOutputDir = params.datalakeOutputDir |
|
) |
|
) |
|
) |
|
PullingResult(videosCount) |
|
} |
|
} |
|
} |
|
|
|
// Client application |
|
import zio.logging.slf4j.bridge.Slf4jBridge |
|
|
|
// scala-cli run --main-class ClientMain workflow-activities.scala |
|
object ClientMain extends ZIOAppDefault { |
|
override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] = |
|
Slf4jBridge.initialize |
|
|
|
def run = { |
|
val retryOptions = ZRetryOptions.default |
|
// maximum retry attempts |
|
.withMaximumAttempts(5) |
|
// initial backoff interval |
|
.withInitialInterval(1.second) |
|
// exponential backoff coefficiant |
|
.withBackoffCoefficient(1.2) |
|
// do not retry certain errors |
|
.withDoNotRetry(nameOf[IllegalArgumentException]) |
|
|
|
def workflowOptions(workflowId: String) = ZWorkflowOptions |
|
.withWorkflowId(workflowId) |
|
.withTaskQueue("youtube-pulling-queue") |
|
.withWorkflowRunTimeout(20.minutes) |
|
.withRetryOptions(retryOptions) |
|
|
|
val createWorkflowStubZIO: URIO[ZWorkflowClient, ZWorkflowStub.Of[YoutubePullWorkflow]] = |
|
ZIO.serviceWithZIO[ZWorkflowClient] { workflowClient => |
|
Random.nextUUID.flatMap { uuid => |
|
workflowClient.newWorkflowStub[YoutubePullWorkflow]( |
|
workflowOptions(workflowId = s"youtube/$uuid") |
|
) |
|
} |
|
} |
|
|
|
val startWorkflow: RIO[ZWorkflowClient, Unit] = ZIO.serviceWithZIO[ZWorkflowClient] { workflowClient => |
|
for { |
|
// Random UUID will be used to generate workflow ID |
|
uuid <- Random.nextUUID |
|
|
|
// Step 1: create a workflow stub |
|
youtubePullWorkflow <- workflowClient.newWorkflowStub[YoutubePullWorkflow]( |
|
workflowOptions(workflowId = s"youtube/$uuid") |
|
) |
|
// Step 2: start the workflow |
|
_ <- ZWorkflowStub.start( |
|
youtubePullWorkflow.pull( |
|
// Provide input parameters |
|
YoutubePullerParameters( |
|
integrationId = 1, |
|
minDate = LocalDateTime.of(2023, 1, 1, 0, 0), |
|
datalakeOutputDir = "./datalake" |
|
) |
|
) |
|
) |
|
|
|
_ <- ZIO.logInfo("YouTube pull result workflow started!") |
|
} yield () |
|
} |
|
|
|
val clientProgram: Task[Unit] = |
|
startWorkflow.provide( |
|
// The client itself |
|
ZWorkflowClient.make, |
|
// Client's direct dependencies: |
|
// 1. Client configuration |
|
ZWorkflowClientOptions.make, |
|
// 2. Workflow service stubs, responsible for all the RPC |
|
ZWorkflowServiceStubs.make, |
|
ZWorkflowServiceStubsOptions.make |
|
) |
|
|
|
clientProgram |
|
} |
|
} |
|
|
|
// Worker application |
|
import zio.temporal.worker._ |
|
|
|
// scala-cli run --main-class WorkerMain workflow-activities.scala |
|
object WorkerMain extends ZIOAppDefault { |
|
override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] = |
|
Slf4jBridge.initialize |
|
|
|
def run = { |
|
// Note that activities' dependencies are propagated |
|
val registerWorker: URIO[ZWorkerFactory with YoutubeClient with ZActivityRunOptions[Any] with Scope, ZWorker] = |
|
ZWorkerFactory.newWorker("youtube-pulling-queue") @@ |
|
// Register workflow |
|
ZWorker.addWorkflow[YoutubePullWorkflow].from(new YoutubePullWorkflowImpl) @@ |
|
// Register activity implementations |
|
ZWorker.addActivityImplementationLayer(YoutubeActivitiesImpl.make) @@ |
|
ZWorker.addActivityImplementationLayer(DatalakeActivitiesImpl.make) |
|
|
|
val workerProcess = |
|
for { |
|
_ <- registerWorker |
|
// Setup the internal transport |
|
_ <- ZWorkflowServiceStubs.setup() |
|
// blocks forever while the program is alive |
|
_ <- ZWorkerFactory.serve |
|
} yield () |
|
|
|
val workerProgram: RIO[Scope, Unit] = |
|
workerProcess.provideSome[Scope]( |
|
// Worker factory itself |
|
ZWorkerFactory.make, |
|
// Worker factory configuration |
|
ZWorkerFactoryOptions.make, |
|
// It requires the workflow client |
|
ZWorkflowClient.make, |
|
ZWorkflowClientOptions.make, |
|
// ...as well as the workflow service stubs |
|
ZWorkflowServiceStubs.make, |
|
ZWorkflowServiceStubsOptions.make, |
|
// Activities dependencies |
|
YoutubeClient.make, |
|
ZActivityRunOptions.default |
|
) |
|
|
|
workerProgram |
|
} |
|
} |