Skip to content

Instantly share code, notes, and snippets.

@sids
Last active July 11, 2024 06:23
Show Gist options
  • Save sids/66e3ec74aec5fa2781d96fe2b1e238b9 to your computer and use it in GitHub Desktop.
Save sids/66e3ec74aec5fa2781d96fe2b1e238b9 to your computer and use it in GitHub Desktop.
Workflow API exploration
typealias Input = String
typealias Output_1 = String
typealias Output_2 = String
typealias Output_3 = String
typealias Output_4 = String
typealias Output_5 = String
typealias Output_6 = String
class ExampleWorkflowTask : SimpleWorkflowTask<Input, Output_1>() {
override suspend fun execute(input: Input): Output_1 {
return "$0/step 1"
}
}
class ExampleSuspendingWorkflowTask : WorkflowTask<Output_2, String, Output_3>() {
override suspend fun start(input: Output_2): String {
// call some external service to create task for manual input
// get an id that can be used to check if input is ready
delay(1000)
val id = UUIDUtils.unique20()
return id
}
override suspend fun isDone(ref: String): Boolean {
// use ref to check if input is available -- it's the id that start returned
delay(1000)
// return true if input is available
return true
}
override suspend fun getResult(ref: String): Output_3 {
// use ref to fetch the input -- it's the id that start returned
delay(1000)
return "$0/step 2"
}
}
class ExampleWorkflow1 : WorkflowV1<Input, Output_6>() {
override val pipeline: WorkflowPipeline<Input, Output_6> =
startWorkflow<Input>()
.task<Output_1>(ExampleWorkflowTask()
.retries(2))
.task<Output_2>(ExampleSuspendingWorkflowTask()
.timeoutMillis(Duration.ofDays(2).toMillis()))
.workflow<Output_5, ExampleWorkflow2>(ExampleWorkflow2::class)
.then<Output_6> { "$0/step 6" }
}
class ExampleWorkflow2 : WorkflowV1<Output_2, Output_5>() {
override val pipeline: WorkflowPipeline<Output_2, Output_5> =
startWorkflow<Output_2>()
.parallelTasks(
fnWorkflowTask<Output_2, Output_3> { "$0/step 3" }.retries(3),
fnWorkflowTask<Output_2, Output_4> { "$0/stap 4" }.timeoutMillis(5000)
)
.then<Output_5> { "$0++$1" }
}
fun main() = TelemetryScope.runBlocking {
val orchestrator = WorkflowOrchestrator()
orchestrator.triggerWorkflow(ExampleWorkflow1::class, "start")
}
/**
* Base class for all workflow tasks. A task is a single step in a workflow.
*
* I: type of input
* R: type of reference for the task under execution
* O: type of output
*/
abstract class WorkflowTask<I, R, O>() {
var retries: Int = 0
private set
var timeoutMillis: Long = 60000
private set
open var rollbackTask: WorkflowTask<Triple<I, R, O>, *, *>? = null
protected set
abstract suspend fun start(input: I): R
abstract suspend fun isDone(ref: R): Boolean
abstract suspend fun getResult(ref: R): O
fun retries(retries: Int): WorkflowTask<I, R, O> {
this.retries = retries
return this
}
fun timeoutMillis(timeoutMillis: Long): WorkflowTask<I, R, O> {
this.timeoutMillis = timeoutMillis
return this
}
fun rollbackTask(rollbackTask: WorkflowTask<Triple<I, R, O>, *, *>): WorkflowTask<I, R, O> {
this.rollbackTask = rollbackTask
return this
}
}
/**
* Workflow Task that is expressed as a function; the task is complete when the function returns.
*/
abstract class SimpleWorkflowTask<I, O>: WorkflowTask<I, Unit, O>() {
private lateinit var resultDeferred: Deferred<O>
abstract suspend fun execute(input: I): O
open suspend fun rollback(input: I, output: O) { }
final override suspend fun start(input: I) {
resultDeferred = TelemetryScope.async(Dispatchers.IO) {
execute(input)
}
}
final override suspend fun isDone(ref: Unit): Boolean {
return resultDeferred.isCompleted
}
final override suspend fun getResult(ref: Unit): O {
return resultDeferred.await()
}
}
typealias RollbackFn<I, O> = suspend (input: I, output: O) -> Unit
/**
* Convenience function to define a workflow task using a simple inline function.
*/
fun <I, O> fnWorkflowTask(fn: suspend (input: I) -> O, rollback: RollbackFn<I, O>?) =
object : SimpleWorkflowTask<I, O>() {
override suspend fun execute(input: I): O {
return fn(input)
}
override suspend fun rollback(input: I, output: O) {
if (rollback != null) {
rollback(input, output)
} else {
super.rollback(input, output)
}
}
}
/**
* Enables a workflow to be used as a step in another workflow.
*/
fun <I, O, W : WorkflowV1<I, O>> childWorkflowTask(workflow: KClass<W>) =
object : WorkflowTask<I, String, O>() {
override suspend fun start(input: I): String {
TODO("trigger the workflow; return workflowId")
}
override suspend fun isDone(ref: String): Boolean {
TODO("check if workflow is done; ref is workflowId")
}
override suspend fun getResult(ref: String): O {
TODO("return result of workflow; ref is workflowId")
}
override var rollbackTask: WorkflowTask<Triple<I, String, O>, *, *>? = TODO("call rollback for the workflow")
}
/**
* Enables two WorkflowTasks to be called in parallel.
*/
fun <I, R1, R2, O1, O2> parallelWorkflowTasks(w1: WorkflowTask<I, R1, O1>, w2: WorkflowTask<I, R2, O2>)
: WorkflowTask<I, Pair<R1, R2>, Pair<O1, O2>> =
object : WorkflowTask<I, Pair<R1, R2>, Pair<O1, O2>>() {
override suspend fun start(input: I): Pair<R1, R2> {
val ref1 = w1.start(input)
val ref2 = w2.start(input)
return Pair(ref1, ref2)
}
override suspend fun isDone(ref: Pair<R1, R2>): Boolean {
return w1.isDone(ref.first) && w2.isDone(ref.second)
}
override suspend fun getResult(ref: Pair<R1, R2>): Pair<O1, O2> {
val output1 = w1.getResult(ref.first)
val output2 = w2.getResult(ref.second)
return Pair(output1, output2)
}
override var rollbackTask: WorkflowTask<Triple<I, Pair<R1, R2>, Pair<O1, O2>>, *, *>?
= TODO("call rollback for the both the tasks")
}
import com.udaan.common.utils.idgen.UUIDUtils
import com.udaan.instrumentation.TelemetryScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import java.time.Duration
import java.time.Instant
import kotlin.reflect.KClass
import kotlin.reflect.full.createInstance
/**
* Base class for all workflow tasks. A task is a single step in a workflow.
*
* I: type of input
* R: type of reference for the task under execution
* O: type of output
*/
abstract class WorkflowTask<I, R, O>() {
var retries: Int = 0
private set
var timeoutMillis: Long = 60000
private set
abstract suspend fun start(input: I): R
abstract suspend fun isDone(ref: R): Boolean
abstract suspend fun getResult(ref: R): O
fun retries(retries: Int): WorkflowTask<I, R, O> {
this.retries = retries
return this
}
fun timeoutMillis(timeoutMillis: Long): WorkflowTask<I, R, O> {
this.timeoutMillis = timeoutMillis
return this
}
}
/**
* Workflow Task that is expressed as a function; the task is complete when the function returns.
*/
abstract class SimpleWorkflowTask<I, O>: WorkflowTask<I, Unit, O>() {
private lateinit var resultDeferred: Deferred<O>
abstract suspend fun execute(input: I): O
final override suspend fun start(input: I) {
resultDeferred = TelemetryScope.async(Dispatchers.IO) {
execute(input)
}
}
final override suspend fun isDone(ref: Unit): Boolean {
return resultDeferred.isCompleted
}
final override suspend fun getResult(ref: Unit): O {
return resultDeferred.await()
}
}
/**
* Convenience function to define a workflow task using a simple inline function.
*/
fun <I, O> fnWorkflowTask(fn: suspend (input: I) -> O) =
object : SimpleWorkflowTask<I, O>() {
override suspend fun execute(input: I): O {
return fn(input)
}
}
/**
* Enables a workflow to be used as a step in another workflow.
*/
fun <I, O, W : WorkflowV1<I, O>> childWorkflowTask(workflow: KClass<W>) =
object : WorkflowTask<I, String, O>() {
override suspend fun start(input: I): String {
TODO("trigger the workflow; return workflowId")
}
override suspend fun isDone(ref: String): Boolean {
TODO("check if workflow is done; ref is workflowId")
}
override suspend fun getResult(ref: String): O {
TODO("return result of workflow; ref is workflowId")
}
}
/**
* Enables two WorkflowTasks to be called in parallel.
*/
fun <I, R1, R2, O1, O2> parallelWorkflowTasks(w1: WorkflowTask<I, R1, O1>, w2: WorkflowTask<I, R2, O2>)
: WorkflowTask<I, Pair<R1, R2>, Pair<O1, O2>> =
object : WorkflowTask<I, Pair<R1, R2>, Pair<O1, O2>>() {
override suspend fun start(input: I): Pair<R1, R2> {
val ref1 = w1.start(input)
val ref2 = w2.start(input)
return Pair(ref1, ref2)
}
override suspend fun isDone(ref: Pair<R1, R2>): Boolean {
return w1.isDone(ref.first) && w2.isDone(ref.second)
}
override suspend fun getResult(ref: Pair<R1, R2>): Pair<O1, O2> {
val output1 = w1.getResult(ref.first)
val output2 = w2.getResult(ref.second)
return Pair(output1, output2)
}
}
class WorkflowPipeline<I, O1>(
val steps: List<WorkflowTask<*, *, *>>
) {
fun <O2> task(task: WorkflowTask<O1, *, O2>)
: WorkflowPipeline<I, O2> {
return WorkflowPipeline(steps + task)
}
fun <O2> then(fn: suspend (input: O1) -> O2)
: WorkflowPipeline<I, O2> {
return task(fnWorkflowTask(fn))
}
fun <O2, W : WorkflowV1<O1, O2>> workflow(workflow: KClass<W>)
: WorkflowPipeline<I, O2> {
return task(childWorkflowTask(workflow))
}
fun <R1, R2, O2, O3> parallelTasks(task1: WorkflowTask<O1, R1, O2>, task2: WorkflowTask<O1, R2, O3>)
: WorkflowPipeline<I, Pair<O2, O3>> {
return task(parallelWorkflowTasks(task1, task2))
}
}
fun <I> startWorkflow() = WorkflowPipeline<I, I>(steps = emptyList())
abstract class WorkflowV1<I, O> {
abstract val pipeline: WorkflowPipeline<I, O>
}
class WorkflowOrchestrator {
suspend fun <I, W : WorkflowV1<I, *>> triggerWorkflow(workflowKClass: KClass<W>, input: I) {
var state = WorkflowState(
id = UUIDUtils.unique20(),
currentTaskState = WorkflowTaskState(
id = UUIDUtils.unique20(),
input = input as Any
)
)
while (!state.status.isCompleted()) {
val workflow = workflowKClass.createInstance()
state = workflow.process(state)
delay(1000)
}
}
}
enum class WorkflowStatus {
NotStarted, InProgress, Success, Failure;
fun isCompleted() = this == Success || this == Failure
}
data class WorkflowState(
val id: String,
val status: WorkflowStatus = WorkflowStatus.NotStarted,
val currentTaskIndex: Int = -1,
val currentTaskState: WorkflowTaskState,
val startedAt: Instant? = null,
val completedAt: Instant? = null
)
enum class WorkflowTaskStatus {
NotStarted, InProgress, Success, Failure;
fun isCompleted() = this == Success || this == Failure
}
data class WorkflowTaskState(
val id: String,
var status: WorkflowTaskStatus = WorkflowTaskStatus.NotStarted,
val input: Any,
val ref: Any? = null,
val output: Any? = null,
var startedAt: Instant? = null,
var retriesLeft: Int = 0,
var lastRetryAt: Instant? = null,
var completedAt: Instant? = null
)
@Suppress("UNCHECKED_CAST")
suspend fun <I, O> WorkflowV1<I, O>.process(state: WorkflowState): WorkflowState {
when (state.status) {
WorkflowStatus.NotStarted -> {
return process(state.copy(
status = WorkflowStatus.InProgress,
currentTaskIndex = 0,
startedAt = Instant.now()
))
}
WorkflowStatus.InProgress -> {
val currentTaskIndex = state.currentTaskIndex
assert(currentTaskIndex < pipeline.steps.count())
val currentTask = pipeline.steps[currentTaskIndex]
val currentTaskState = state.currentTaskState
when (currentTaskState.status) {
WorkflowTaskStatus.NotStarted -> {
return state.copy(
currentTaskState = currentTask.process(currentTaskState.copy(retriesLeft = currentTask.retries))
)
}
WorkflowTaskStatus.InProgress -> {
return state.copy(
currentTaskState = currentTask.process(currentTaskState)
)
}
WorkflowTaskStatus.Success -> {
if (currentTaskIndex == pipeline.steps.count()) {
return state.copy(
status = WorkflowStatus.Success,
completedAt = currentTaskState.completedAt
)
} else {
return state.copy(
currentTaskIndex = currentTaskIndex + 1,
currentTaskState = WorkflowTaskState(
id = UUIDUtils.unique20(),
input = currentTaskState.output!!
)
)
}
}
WorkflowTaskStatus.Failure -> {
return state.copy(
status = WorkflowStatus.Failure,
completedAt = currentTaskState.completedAt
)
}
}
}
WorkflowStatus.Success,
WorkflowStatus.Failure -> return state
}
}
@Suppress("UNCHECKED_CAST")
suspend fun <I, R, O> WorkflowTask<I, R, O>.process(state: WorkflowTaskState): WorkflowTaskState {
return when (state.status) {
WorkflowTaskStatus.NotStarted -> {
val ref = start(state.input as I)
state.copy(
status = WorkflowTaskStatus.InProgress,
ref = ref,
startedAt = Instant.now()
)
}
WorkflowTaskStatus.InProgress -> {
if (isDone(state.ref as R)) {
try {
val output = getResult(state.ref as R)
state.copy(
status = WorkflowTaskStatus.Success,
output = output,
completedAt = Instant.now()
)
} catch (e: Exception) {
// todo: log
if (state.retriesLeft > 0) {
val ref = start(state.input as I)
state.copy(
status = WorkflowTaskStatus.InProgress,
ref = ref,
retriesLeft = state.retriesLeft - 1,
lastRetryAt = Instant.now()
)
} else {
state.copy(
status = WorkflowTaskStatus.Failure,
completedAt = Instant.now()
)
}
}
} else {
state
}
}
WorkflowTaskStatus.Success,
WorkflowTaskStatus.Failure -> return state
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment