Skip to content

Instantly share code, notes, and snippets.

@vicety
Created June 17, 2022 23:13
Show Gist options
  • Save vicety/d8feef411e3bd5894110b90fc9f30fc3 to your computer and use it in GitHub Desktop.
Save vicety/d8feef411e3bd5894110b90fc9f30fc3 to your computer and use it in GitHub Desktop.
package pods.workflows.examples
/** Multiple Producer Multiple Subscriber Event Count
*
* This example counts the event number from multiple upstream producers, which
* concurrently emit events to multiple downstream subscribers. This example
* can also be used as a test to show event handling for each processor is
* serial in our runtime.
*/
@main def MultiProducerExample() =
import pods.workflows.*
val builder = Workflows
.builder()
.withName("wf")
val source1 = builder
.source[Int]()
.withName("input1")
val source2 = builder
.source[Int]()
.withName("input2")
def eventCount: TaskContext[Int, Int] ?=> Int => Unit = ctx ?=>
_ => {
val key = "k"
ctx.state.get(key) match
case Some(n) =>
ctx.state.set(key, n.asInstanceOf[Int] + 1)
case None =>
ctx.state.set(key, 1)
ctx.emit(ctx.state.get(key).get.asInstanceOf[Int])
}
def printInterval(prefix: String): TaskContext[Int, Int] ?=> Int => Unit = ctx ?=>
i => {
if (i % 5000 == 0) println(prefix + i)
// println(prefix + i)
ctx.emit(i)
}
val mergedSource = builder
.merge(source1, source2)
val mid = mergedSource.id()
val flow = mergedSource
.processor(eventCount)
.processor(printInterval("output1 "))
.sink()
.withName("output1")
.select(mid)
// .from(mergedSource) // this starts building the graph from the sink, rather than the merged position
.processor(eventCount)
.processor(printInterval("output2 "))
.sink()
.withName("output2")
val wf = builder.build()
val system = Systems.local()
system.launch(wf)
val iref1: IStreamRef[Int] = system.registry("wf/input1").resolve()
val iref2: IStreamRef[Int] = system.registry("wf/input2").resolve()
val t1 = (new Thread() {
override def run(): Unit = for (i <- 1 to 10000) {
iref1.submit(i)
}
})
val t2 = (new Thread() {
override def run(): Unit = for (i <- 1 to 10000) {
iref2.submit(i)
}
})
t1.start()
t2.start()
t1.join()
t2.join()
system.shutdown()
@vicety
Copy link
Author

vicety commented Jun 17, 2022

Implementation for select function

  private[pods] def select[I, O](id: String): AtomicStream[Nothing, O] =
    latest = Some(id)
    this.asInstanceOf[AtomicStream[Nothing, O]]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment