Last active
July 11, 2024 06:23
-
-
Save sids/66e3ec74aec5fa2781d96fe2b1e238b9 to your computer and use it in GitHub Desktop.
Workflow API exploration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
typealias Input = String | |
typealias Output_1 = String | |
typealias Output_2 = String | |
typealias Output_3 = String | |
typealias Output_4 = String | |
typealias Output_5 = String | |
typealias Output_6 = String | |
class ExampleWorkflowTask : SimpleWorkflowTask<Input, Output_1>() { | |
override suspend fun execute(input: Input): Output_1 { | |
return "$0/step 1" | |
} | |
} | |
class ExampleSuspendingWorkflowTask : WorkflowTask<Output_2, String, Output_3>() { | |
override suspend fun start(input: Output_2): String { | |
// call some external service to create task for manual input | |
// get an id that can be used to check if input is ready | |
delay(1000) | |
val id = UUIDUtils.unique20() | |
return id | |
} | |
override suspend fun isDone(ref: String): Boolean { | |
// use ref to check if input is available -- it's the id that start returned | |
delay(1000) | |
// return true if input is available | |
return true | |
} | |
override suspend fun getResult(ref: String): Output_3 { | |
// use ref to fetch the input -- it's the id that start returned | |
delay(1000) | |
return "$0/step 2" | |
} | |
} | |
class ExampleWorkflow1 : WorkflowV1<Input, Output_6>() { | |
override val pipeline: WorkflowPipeline<Input, Output_6> = | |
startWorkflow<Input>() | |
.task<Output_1>(ExampleWorkflowTask() | |
.retries(2)) | |
.task<Output_2>(ExampleSuspendingWorkflowTask() | |
.timeoutMillis(Duration.ofDays(2).toMillis())) | |
.workflow<Output_5, ExampleWorkflow2>(ExampleWorkflow2::class) | |
.then<Output_6> { "$0/step 6" } | |
} | |
class ExampleWorkflow2 : WorkflowV1<Output_2, Output_5>() { | |
override val pipeline: WorkflowPipeline<Output_2, Output_5> = | |
startWorkflow<Output_2>() | |
.parallelTasks( | |
fnWorkflowTask<Output_2, Output_3> { "$0/step 3" }.retries(3), | |
fnWorkflowTask<Output_2, Output_4> { "$0/stap 4" }.timeoutMillis(5000) | |
) | |
.then<Output_5> { "$0++$1" } | |
} | |
fun main() = TelemetryScope.runBlocking { | |
val orchestrator = WorkflowOrchestrator() | |
orchestrator.triggerWorkflow(ExampleWorkflow1::class, "start") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Base class for all workflow tasks. A task is a single step in a workflow. | |
* | |
* I: type of input | |
* R: type of reference for the task under execution | |
* O: type of output | |
*/ | |
abstract class WorkflowTask<I, R, O>() { | |
var retries: Int = 0 | |
private set | |
var timeoutMillis: Long = 60000 | |
private set | |
open var rollbackTask: WorkflowTask<Triple<I, R, O>, *, *>? = null | |
protected set | |
abstract suspend fun start(input: I): R | |
abstract suspend fun isDone(ref: R): Boolean | |
abstract suspend fun getResult(ref: R): O | |
fun retries(retries: Int): WorkflowTask<I, R, O> { | |
this.retries = retries | |
return this | |
} | |
fun timeoutMillis(timeoutMillis: Long): WorkflowTask<I, R, O> { | |
this.timeoutMillis = timeoutMillis | |
return this | |
} | |
fun rollbackTask(rollbackTask: WorkflowTask<Triple<I, R, O>, *, *>): WorkflowTask<I, R, O> { | |
this.rollbackTask = rollbackTask | |
return this | |
} | |
} | |
/** | |
* Workflow Task that is expressed as a function; the task is complete when the function returns. | |
*/ | |
abstract class SimpleWorkflowTask<I, O>: WorkflowTask<I, Unit, O>() { | |
private lateinit var resultDeferred: Deferred<O> | |
abstract suspend fun execute(input: I): O | |
open suspend fun rollback(input: I, output: O) { } | |
final override suspend fun start(input: I) { | |
resultDeferred = TelemetryScope.async(Dispatchers.IO) { | |
execute(input) | |
} | |
} | |
final override suspend fun isDone(ref: Unit): Boolean { | |
return resultDeferred.isCompleted | |
} | |
final override suspend fun getResult(ref: Unit): O { | |
return resultDeferred.await() | |
} | |
} | |
typealias RollbackFn<I, O> = suspend (input: I, output: O) -> Unit | |
/** | |
* Convenience function to define a workflow task using a simple inline function. | |
*/ | |
fun <I, O> fnWorkflowTask(fn: suspend (input: I) -> O, rollback: RollbackFn<I, O>?) = | |
object : SimpleWorkflowTask<I, O>() { | |
override suspend fun execute(input: I): O { | |
return fn(input) | |
} | |
override suspend fun rollback(input: I, output: O) { | |
if (rollback != null) { | |
rollback(input, output) | |
} else { | |
super.rollback(input, output) | |
} | |
} | |
} | |
/** | |
* Enables a workflow to be used as a step in another workflow. | |
*/ | |
fun <I, O, W : WorkflowV1<I, O>> childWorkflowTask(workflow: KClass<W>) = | |
object : WorkflowTask<I, String, O>() { | |
override suspend fun start(input: I): String { | |
TODO("trigger the workflow; return workflowId") | |
} | |
override suspend fun isDone(ref: String): Boolean { | |
TODO("check if workflow is done; ref is workflowId") | |
} | |
override suspend fun getResult(ref: String): O { | |
TODO("return result of workflow; ref is workflowId") | |
} | |
override var rollbackTask: WorkflowTask<Triple<I, String, O>, *, *>? = TODO("call rollback for the workflow") | |
} | |
/** | |
* Enables two WorkflowTasks to be called in parallel. | |
*/ | |
fun <I, R1, R2, O1, O2> parallelWorkflowTasks(w1: WorkflowTask<I, R1, O1>, w2: WorkflowTask<I, R2, O2>) | |
: WorkflowTask<I, Pair<R1, R2>, Pair<O1, O2>> = | |
object : WorkflowTask<I, Pair<R1, R2>, Pair<O1, O2>>() { | |
override suspend fun start(input: I): Pair<R1, R2> { | |
val ref1 = w1.start(input) | |
val ref2 = w2.start(input) | |
return Pair(ref1, ref2) | |
} | |
override suspend fun isDone(ref: Pair<R1, R2>): Boolean { | |
return w1.isDone(ref.first) && w2.isDone(ref.second) | |
} | |
override suspend fun getResult(ref: Pair<R1, R2>): Pair<O1, O2> { | |
val output1 = w1.getResult(ref.first) | |
val output2 = w2.getResult(ref.second) | |
return Pair(output1, output2) | |
} | |
override var rollbackTask: WorkflowTask<Triple<I, Pair<R1, R2>, Pair<O1, O2>>, *, *>? | |
= TODO("call rollback for the both the tasks") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.udaan.common.utils.idgen.UUIDUtils | |
import com.udaan.instrumentation.TelemetryScope | |
import kotlinx.coroutines.Deferred | |
import kotlinx.coroutines.Dispatchers | |
import kotlinx.coroutines.delay | |
import java.time.Duration | |
import java.time.Instant | |
import kotlin.reflect.KClass | |
import kotlin.reflect.full.createInstance | |
/** | |
* Base class for all workflow tasks. A task is a single step in a workflow. | |
* | |
* I: type of input | |
* R: type of reference for the task under execution | |
* O: type of output | |
*/ | |
abstract class WorkflowTask<I, R, O>() { | |
var retries: Int = 0 | |
private set | |
var timeoutMillis: Long = 60000 | |
private set | |
abstract suspend fun start(input: I): R | |
abstract suspend fun isDone(ref: R): Boolean | |
abstract suspend fun getResult(ref: R): O | |
fun retries(retries: Int): WorkflowTask<I, R, O> { | |
this.retries = retries | |
return this | |
} | |
fun timeoutMillis(timeoutMillis: Long): WorkflowTask<I, R, O> { | |
this.timeoutMillis = timeoutMillis | |
return this | |
} | |
} | |
/** | |
* Workflow Task that is expressed as a function; the task is complete when the function returns. | |
*/ | |
abstract class SimpleWorkflowTask<I, O>: WorkflowTask<I, Unit, O>() { | |
private lateinit var resultDeferred: Deferred<O> | |
abstract suspend fun execute(input: I): O | |
final override suspend fun start(input: I) { | |
resultDeferred = TelemetryScope.async(Dispatchers.IO) { | |
execute(input) | |
} | |
} | |
final override suspend fun isDone(ref: Unit): Boolean { | |
return resultDeferred.isCompleted | |
} | |
final override suspend fun getResult(ref: Unit): O { | |
return resultDeferred.await() | |
} | |
} | |
/** | |
* Convenience function to define a workflow task using a simple inline function. | |
*/ | |
fun <I, O> fnWorkflowTask(fn: suspend (input: I) -> O) = | |
object : SimpleWorkflowTask<I, O>() { | |
override suspend fun execute(input: I): O { | |
return fn(input) | |
} | |
} | |
/** | |
* Enables a workflow to be used as a step in another workflow. | |
*/ | |
fun <I, O, W : WorkflowV1<I, O>> childWorkflowTask(workflow: KClass<W>) = | |
object : WorkflowTask<I, String, O>() { | |
override suspend fun start(input: I): String { | |
TODO("trigger the workflow; return workflowId") | |
} | |
override suspend fun isDone(ref: String): Boolean { | |
TODO("check if workflow is done; ref is workflowId") | |
} | |
override suspend fun getResult(ref: String): O { | |
TODO("return result of workflow; ref is workflowId") | |
} | |
} | |
/** | |
* Enables two WorkflowTasks to be called in parallel. | |
*/ | |
fun <I, R1, R2, O1, O2> parallelWorkflowTasks(w1: WorkflowTask<I, R1, O1>, w2: WorkflowTask<I, R2, O2>) | |
: WorkflowTask<I, Pair<R1, R2>, Pair<O1, O2>> = | |
object : WorkflowTask<I, Pair<R1, R2>, Pair<O1, O2>>() { | |
override suspend fun start(input: I): Pair<R1, R2> { | |
val ref1 = w1.start(input) | |
val ref2 = w2.start(input) | |
return Pair(ref1, ref2) | |
} | |
override suspend fun isDone(ref: Pair<R1, R2>): Boolean { | |
return w1.isDone(ref.first) && w2.isDone(ref.second) | |
} | |
override suspend fun getResult(ref: Pair<R1, R2>): Pair<O1, O2> { | |
val output1 = w1.getResult(ref.first) | |
val output2 = w2.getResult(ref.second) | |
return Pair(output1, output2) | |
} | |
} | |
class WorkflowPipeline<I, O1>( | |
val steps: List<WorkflowTask<*, *, *>> | |
) { | |
fun <O2> task(task: WorkflowTask<O1, *, O2>) | |
: WorkflowPipeline<I, O2> { | |
return WorkflowPipeline(steps + task) | |
} | |
fun <O2> then(fn: suspend (input: O1) -> O2) | |
: WorkflowPipeline<I, O2> { | |
return task(fnWorkflowTask(fn)) | |
} | |
fun <O2, W : WorkflowV1<O1, O2>> workflow(workflow: KClass<W>) | |
: WorkflowPipeline<I, O2> { | |
return task(childWorkflowTask(workflow)) | |
} | |
fun <R1, R2, O2, O3> parallelTasks(task1: WorkflowTask<O1, R1, O2>, task2: WorkflowTask<O1, R2, O3>) | |
: WorkflowPipeline<I, Pair<O2, O3>> { | |
return task(parallelWorkflowTasks(task1, task2)) | |
} | |
} | |
fun <I> startWorkflow() = WorkflowPipeline<I, I>(steps = emptyList()) | |
abstract class WorkflowV1<I, O> { | |
abstract val pipeline: WorkflowPipeline<I, O> | |
} | |
class WorkflowOrchestrator { | |
suspend fun <I, W : WorkflowV1<I, *>> triggerWorkflow(workflowKClass: KClass<W>, input: I) { | |
var state = WorkflowState( | |
id = UUIDUtils.unique20(), | |
currentTaskState = WorkflowTaskState( | |
id = UUIDUtils.unique20(), | |
input = input as Any | |
) | |
) | |
while (!state.status.isCompleted()) { | |
val workflow = workflowKClass.createInstance() | |
state = workflow.process(state) | |
delay(1000) | |
} | |
} | |
} | |
enum class WorkflowStatus { | |
NotStarted, InProgress, Success, Failure; | |
fun isCompleted() = this == Success || this == Failure | |
} | |
data class WorkflowState( | |
val id: String, | |
val status: WorkflowStatus = WorkflowStatus.NotStarted, | |
val currentTaskIndex: Int = -1, | |
val currentTaskState: WorkflowTaskState, | |
val startedAt: Instant? = null, | |
val completedAt: Instant? = null | |
) | |
enum class WorkflowTaskStatus { | |
NotStarted, InProgress, Success, Failure; | |
fun isCompleted() = this == Success || this == Failure | |
} | |
data class WorkflowTaskState( | |
val id: String, | |
var status: WorkflowTaskStatus = WorkflowTaskStatus.NotStarted, | |
val input: Any, | |
val ref: Any? = null, | |
val output: Any? = null, | |
var startedAt: Instant? = null, | |
var retriesLeft: Int = 0, | |
var lastRetryAt: Instant? = null, | |
var completedAt: Instant? = null | |
) | |
@Suppress("UNCHECKED_CAST") | |
suspend fun <I, O> WorkflowV1<I, O>.process(state: WorkflowState): WorkflowState { | |
when (state.status) { | |
WorkflowStatus.NotStarted -> { | |
return process(state.copy( | |
status = WorkflowStatus.InProgress, | |
currentTaskIndex = 0, | |
startedAt = Instant.now() | |
)) | |
} | |
WorkflowStatus.InProgress -> { | |
val currentTaskIndex = state.currentTaskIndex | |
assert(currentTaskIndex < pipeline.steps.count()) | |
val currentTask = pipeline.steps[currentTaskIndex] | |
val currentTaskState = state.currentTaskState | |
when (currentTaskState.status) { | |
WorkflowTaskStatus.NotStarted -> { | |
return state.copy( | |
currentTaskState = currentTask.process(currentTaskState.copy(retriesLeft = currentTask.retries)) | |
) | |
} | |
WorkflowTaskStatus.InProgress -> { | |
return state.copy( | |
currentTaskState = currentTask.process(currentTaskState) | |
) | |
} | |
WorkflowTaskStatus.Success -> { | |
if (currentTaskIndex == pipeline.steps.count()) { | |
return state.copy( | |
status = WorkflowStatus.Success, | |
completedAt = currentTaskState.completedAt | |
) | |
} else { | |
return state.copy( | |
currentTaskIndex = currentTaskIndex + 1, | |
currentTaskState = WorkflowTaskState( | |
id = UUIDUtils.unique20(), | |
input = currentTaskState.output!! | |
) | |
) | |
} | |
} | |
WorkflowTaskStatus.Failure -> { | |
return state.copy( | |
status = WorkflowStatus.Failure, | |
completedAt = currentTaskState.completedAt | |
) | |
} | |
} | |
} | |
WorkflowStatus.Success, | |
WorkflowStatus.Failure -> return state | |
} | |
} | |
@Suppress("UNCHECKED_CAST") | |
suspend fun <I, R, O> WorkflowTask<I, R, O>.process(state: WorkflowTaskState): WorkflowTaskState { | |
return when (state.status) { | |
WorkflowTaskStatus.NotStarted -> { | |
val ref = start(state.input as I) | |
state.copy( | |
status = WorkflowTaskStatus.InProgress, | |
ref = ref, | |
startedAt = Instant.now() | |
) | |
} | |
WorkflowTaskStatus.InProgress -> { | |
if (isDone(state.ref as R)) { | |
try { | |
val output = getResult(state.ref as R) | |
state.copy( | |
status = WorkflowTaskStatus.Success, | |
output = output, | |
completedAt = Instant.now() | |
) | |
} catch (e: Exception) { | |
// todo: log | |
if (state.retriesLeft > 0) { | |
val ref = start(state.input as I) | |
state.copy( | |
status = WorkflowTaskStatus.InProgress, | |
ref = ref, | |
retriesLeft = state.retriesLeft - 1, | |
lastRetryAt = Instant.now() | |
) | |
} else { | |
state.copy( | |
status = WorkflowTaskStatus.Failure, | |
completedAt = Instant.now() | |
) | |
} | |
} | |
} else { | |
state | |
} | |
} | |
WorkflowTaskStatus.Success, | |
WorkflowTaskStatus.Failure -> return state | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment