Skip to content

Instantly share code, notes, and snippets.

@japgolly
Created March 30, 2014 10:01
Show Gist options
  • Save japgolly/9870478 to your computer and use it in GitHub Desktop.
Save japgolly/9870478 to your computer and use it in GitHub Desktop.
JunctionExperiment
package scalaz.stream.merge
import scalaz.\/
import scalaz.stream.merge.Junction._
import scalaz.stream.Process._
object JunctionExperiment {
val debug = !true
/** Typed constructor helper to create Junction.Strategy */
def junction[W, I, O](f: JunctionSignal[W, I, O] => JunctionStrategy[W, I, O]): JunctionStrategy[W, I, O] = {
receive1[JunctionSignal[W, I, O], JunctionAction[W, O]](s => {
if (debug) {
Console.flush()
println(s"--> JunctionSignal: ${s.toString.replace("),", ")\n ,")}")
Console.flush()
val a = f(s)
// println(s"<-- JunctionAction: $a")
// Console.flush()
a
} else f(s)
})
}
// ==================================================================================================================
/**
* P = external state representing the worker pool (or something else?)
* PI = data that can be sent to the junction to indicate/trigger a change in P.
*/
def haha[P, Q, PI, A, W]
(
allowDownstream: P => Boolean
, onPoolInput: (P, PI) => P
, onSendWork: P => P
, recv: (Q, Seq[A]) => Q
, pop: Q => Option[(Q, A)]
, query: (P, Q) => W
)(initialP: P, initialQ: Q): JunctionStrategy[W, PI \/ A, A] = {
// type S = (P,Q)
type I = PI \/ A
type O = A
type JS = JunctionStrategy[W, I, O]
type JA = JunctionAction[W, A]
type JXX = JX[W, PI \/ A, A]
def partition_\/[A, B](is: Seq[A \/ B]): (List[A], List[B]) =
// shithouse perf
(is.toList.filter(_.isLeft).map(_.swap.toOption.get)
, is.toList.filter(_.isRight).map(_.toOption.get)
)
def genActionsO(p0: P, q0: Q, refs: Seq[DownRefO]): (P, Q, List[WriteO[W, A]]) = {
// vars for performance for now, this fn is still RT
var p = p0
var q = q0
var jas = List.empty[WriteO[W, A]]
val i = refs.iterator
var goodToEmit = allowDownstream(p)
while (goodToEmit && i.hasNext)
pop(q) match {
case None =>
goodToEmit = false
case Some((q2, work)) =>
q = q2
p = onSendWork(p)
jas ::= WriteO[W, A](work :: Nil, i.next())
goodToEmit = allowDownstream(p)
}
(p, q, jas)
}
def emitOW(p: P, q: Q, jx: JXX, actionsO: List[WriteO[W, A]]): JS = {
val actionsW: List[JA] = genActionsW(p, q, jx)
emitAll(actionsW ::: actionsO)
}
def genActionsW(p: P, q: Q, jx: JXX): List[WriteW[W, A]] =
if (jx.downW.isEmpty)
Nil
else {
val w: W = query(p, q)
val ws = w :: Nil
(List.empty[WriteW[W, A]] /: jx.downW)((acc, ref) => WriteW[W, A](ws, ref) :: acc)
}
def genActionsO_emitOW(jx: JXX, p: P, q: Q, refs: Seq[DownRefO]): (P, Q, JS) = {
val (p2, q2, ao) = genActionsO(p, q, refs)
val js = emitOW(p2, q2, jx, ao)
(p2, q2, js)
}
def drain(p: P, q: Q, rsn: Throwable): JS = {
lazy val go: JS =
junction[W, PI \/ A, A] {
case Open(jx, ref: UpRef) => jx.close(ref, rsn) fby go
case Open(jx, ref: DownRefW) => jx.writeW(query(p, q), ref) fby go
case Receive(jx, _, ref) => jx.close(ref, rsn) fby go
case Ready(jx, ref: DownRefO) =>
if (pop(q).isDefined) {
// TODO what about when !allowDownstream
val (p2, q2, actions) = genActionsO_emitOW(jx, p, q, ref :: Nil)
actions fby drain(p2, q2, rsn)
} else
// next fby (if (nq.size > 0) drain(nq, rsn) else Halt(rsn))
Halt(rsn)
case o =>
go
}
go
}
def go(p: P, q: Q): JS = {
lazy val nop: JS =
junction[W, PI \/ A, A] {
case Open(jx, ref: UpRef) =>
//if (bounded && q.size >= max) nop else
jx.more(ref) fby nop
case Open(jx, ref: DownRefW) =>
jx.writeW(query(p, q), ref) fby nop
case Receive(jx, input, ref) =>
val (poolMsgs, work) = partition_\/(input)
val p1 = (p /: poolMsgs)(onPoolInput)
val q1 = recv(q, work)
val (p2, q2, actions) = genActionsO_emitOW(jx, p1, q1, jx.downReadyO)
actions fby jx.more(ref) fby go(p2, q2)
case Ready(jx, ref: DownRefO) =>
val (p2, q2, actions) = genActionsO_emitOW(jx, p, q, ref :: Nil)
actions fby jx.moreAll fby go(p2, q2)
case DoneDown(jx, rsn) =>
if (jx.downO.nonEmpty) // && pop(q).isDefined)
jx.closeAllUp(rsn) fby drain(p, q, rsn)
else
Halt(rsn)
// case Done(_,_,_) | DoneDown(_,_) | DoneUp(_,_) | Open(_, _:DownRefO) => nop
case other =>
//println(s" --> Ignoring JunctionSignal: $other")
nop
}
nop
}
go(initialP, initialQ)
}
// ==================================================================================================================
// ==================================================================================================================
// ==================================================================================================================
// ==================================================================================================================
// ==================================================================================================================
// ==================================================================================================================
// ==================================================================================================================
/**
* When downstream processes are available and ready for data, pops data off the queue and sends it on.
* When there are no downstream processes ready for data, accepts data from upstream and puts it on the queue.
* Queue and its details are entirely externalised.
*/
def haha2[Q, A, W]
(
recv: (Q, Seq[A]) => Q
, pop: Q => Option[(Q, A)]
, queueFull: Q => Boolean
, query: Q => W
)(initialQ: Q): JunctionStrategy[W, A, A] = {
type I = A
type O = A
type JS = JunctionStrategy[W, I, O]
type JA = JunctionAction[W, A]
type JXX = JX[W, A, A]
def genActionsO(q0: Q, refs: Seq[DownRefO]): (Q, List[WriteO[W, A]]) = {
// vars for performance for now, this fn is still RT
var q = q0
var jas = List.empty[WriteO[W, A]]
val i = refs.iterator
var goodToEmit = true // allowDownstream(p)
while (goodToEmit && i.hasNext)
pop(q) match {
case None =>
goodToEmit = false
case Some((q2, work)) =>
q = q2
jas ::= WriteO[W, A](work :: Nil, i.next())
// goodToEmit = allowDownstream(p)
}
(q, jas)
}
// def genActionsO_1(q: Q, ref: DownRefO): (Q, List[WriteO[W, A]]) = {
// pop(q) match {
// case None => (q, Nil)
// case Some((q2, work)) => (q2, WriteO[W, A](work :: Nil, ref) :: Nil)
// }
def emitOW(q: Q, jx: JXX, actionsO: List[WriteO[W, A]]): JS = {
val actionsW: List[JA] = genActionsW(q, jx)
emitAll(actionsW ::: actionsO)
}
def genActionsW(q: Q, jx: JXX): List[WriteW[W, A]] =
if (jx.downW.isEmpty)
Nil
else {
val w: W = query(q)
val ws = w :: Nil
(List.empty[WriteW[W, A]] /: jx.downW)((acc, ref) => WriteW[W, A](ws, ref) :: acc)
}
def genActionsO_emitOW(jx: JXX, q: Q, refs: Seq[DownRefO]): (Q, JS) = {
val (q2, ao) = genActionsO(q, refs)
val js = emitOW(q2, jx, ao)
(q2, js)
}
def drain(q: Q, rsn: Throwable): JS = {
lazy val go: JS =
junction[W, A, A] {
case Open(jx, ref: UpRef) => jx.close(ref, rsn) fby go
case Open(jx, ref: DownRefW) => jx.writeW(query(q), ref) fby go
case Receive(jx, _, ref) => jx.close(ref, rsn) fby go
case Ready(jx, ref: DownRefO) =>
if (pop(q).isDefined) {
val (q2, actions) = genActionsO_emitOW(jx, q, ref :: Nil)
actions fby drain(q2, rsn)
} else
Halt(rsn)
case o =>
go
}
go
}
def recvMoreThen(q: Q, cond2: Boolean = true)(more: => JS, next: JS): JS =
if (cond2 && !queueFull(q)) more fby next else next
def go(q: Q): JS = {
lazy val nop: JS =
junction[W, A, A] {
case Open(jx, ref: UpRef) =>
recvMoreThen(q)(jx.more(ref), nop)
case Open(jx, ref: DownRefW) =>
jx.writeW(query(q), ref) fby nop
case Receive(jx, input, ref) =>
val q1 = recv(q, input)
val (q2, actions) = genActionsO_emitOW(jx, q1, jx.downReadyO)
//println(s"STATE CHANGE: $q2 <~~ $q")
actions fby recvMoreThen(q2)(jx.more(ref), go(q2))
case Ready(jx, ref: DownRefO) =>
val (q2, actions) = genActionsO_emitOW(jx, q, ref :: Nil)
//println(s"STATE CHANGE: $q2 <~~ $q2")`
actions fby recvMoreThen(q2, jx.upReady.nonEmpty)(jx.moreAll, go(q2))
case DoneDown(jx, rsn) =>
if (jx.downO.nonEmpty)
jx.closeAllUp(rsn) fby drain(q, rsn)
else
Halt(rsn)
// case Done(_,_,_) | DoneDown(_,_) | DoneUp(_,_) | Open(_, _:DownRefO) => nop
case other =>
// println(s" --> Ignoring JunctionSignal: $other")
nop
}
nop
}
go(initialQ)
}
}
package scalaz.stream.merge
import java.util.concurrent.{TimeUnit, CountDownLatch, ScheduledExecutorService, ExecutorService}
import org.specs2.mutable.Specification
import scala.concurrent.SyncVar
import scalaz.concurrent.{Strategy, Task}
import scalaz.stream.Process.End
import scalaz.stream._
import scalaz.stream.{Process => P}
import scalaz.syntax.bind._
import scalaz.{\/-, \/}
class JunctionExperimentTest extends Specification {
implicit def SSS: Strategy = throw new RuntimeException("Strategy")
implicit def pool: ExecutorService = throw new RuntimeException("ExecutorService")
implicit def scheduler:ScheduledExecutorService = throw new RuntimeException("ScheduledExecutorService")
def emitOne[A](a: A, sink: Sink[Task, A]): Task[Unit] =
P.eval(Task.now(a)).to(sink).run
def emitAll[A](as: Seq[A], sink: Sink[Task, A]): Task[Unit] =
Task.delay (as.foreach(a => emitOne(a, sink).run))
def runAsync(t: Task[Unit]): Unit = t.runAsync(_ => ())
def testPassThrough[I](up: Sink[Task, I], j: Junction[_, _, Int])(fi: Int => I) = {
val l = List(19,17,15,13,11)
val t1 = emitAll(l.map(fi), up) >> j.downstreamClose(End)
val collected = new SyncVar[Throwable\/IndexedSeq[Int]]
j.downstreamO.runLog.runAsync(collected.put)
runAsync(t1)
collected.get(8000) must be_==(Some(\/-(l.toVector)))
}
"boundedQueue" in {
val j = Junction(JunctionStrategies.boundedQ[Int](0), Process.halt)(Strategy.DefaultStrategy)
testPassThrough(j.upstreamSink, j)(identity)
}
"myJS" in {
val js = JunctionExperiment.haha2[List[Int], Int, String](
(q,as) => as.toList ::: q //, recv: (Q, Seq[A]) => Q
,q => if (q.isEmpty) None else Some((q.init, q.last)) //, pop: Q => Option[(Q, A)]
,_ => false // , queueFull: Q => Boolean
,q => s"Size is = " + q.size //, query: (P, Q) => W
)(Nil)
def newJ: Junction[_, Int, Int] = Junction.apply(js, Process.halt)()
val j = newJ
testPassThrough(j.upstreamSink, j)(identity)
}
implicit val intOrder = scalaz.Order.fromScalaOrdering[Int]
"myJS: pri test" in {
// [Q, A, W]
val js = JunctionExperiment.haha2[List[Int], Int, Int](
(q, as) => (as.toList ::: q).sorted //, recv: (Q, Seq[A]) => Q
, q => if (q.isEmpty) None else Some((q.init, q.last)) //, pop: Q => Option[(Q, A)]
, _ => false // , queueFull: Q => Boolean
, q => q.size //, query: (P, Q) => W
)(List.empty[Int])
val initialWorkerCount = 4
val workBatch2 = List(1, 50, 3, 52, 2, 51)
val latch1 = new CountDownLatch(initialWorkerCount + 1)
val latch2 = new CountDownLatch(initialWorkerCount + 1)
val latch3 = new CountDownLatch(initialWorkerCount + 1 + workBatch2.size)
@volatile var got = Vector.empty[Int] // No sync because it will be single-threaded when it matters
val captureS: Sink[Task, Int] =
P.constant((i: Int) => Task.delay {
got :+= i
latch1.countDown()
latch1.await(2, TimeUnit.SECONDS)
latch2.countDown()
latch2.await(2, TimeUnit.SECONDS)
latch3.countDown()
})
val j: Junction[_, Int, Int] = Junction.apply(js, Process.halt)()
try {
val workSink: Sink[Task, Int] = j.upstreamSink
val captureP = j.downstreamO.to(captureS)
// Start initialWorkerCount workers
runAsync(captureP.run)
for (i <- 2 to initialWorkerCount)
runAsync(captureP.take(1).run)
// Give all workers a job but don't let them finish
runAsync(emitAll((1 to initialWorkerCount).toList, workSink))
latch1.countDown()
latch1.await(2, TimeUnit.SECONDS) aka "All workers should have a job" must beTrue
// Queue up new jobs while all workers are busy
runAsync(emitAll(workBatch2, workSink))
j.downstreamW.take(1).runLog.run.toList must_== List(6)
got = Vector.empty
latch2.getCount ==== 1
// Release workers
latch2.countDown()
latch2.await(2, TimeUnit.SECONDS) aka "All workers should finish" must beTrue
// Wait for second batch of jobs to finish
latch3.countDown()
latch3.await(2, TimeUnit.SECONDS)
latch3.getCount ==== 0
// Confirm jobs were prioritised before doled out
got must be_==(workBatch2.sorted.reverse)
} finally {
runAsync(j.downstreamClose(End))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment