Created
March 30, 2014 10:01
-
-
Save japgolly/9870478 to your computer and use it in GitHub Desktop.
JunctionExperiment
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalaz.stream.merge | |
import scalaz.\/ | |
import scalaz.stream.merge.Junction._ | |
import scalaz.stream.Process._ | |
object JunctionExperiment { | |
val debug = !true | |
/** Typed constructor helper to create Junction.Strategy */ | |
def junction[W, I, O](f: JunctionSignal[W, I, O] => JunctionStrategy[W, I, O]): JunctionStrategy[W, I, O] = { | |
receive1[JunctionSignal[W, I, O], JunctionAction[W, O]](s => { | |
if (debug) { | |
Console.flush() | |
println(s"--> JunctionSignal: ${s.toString.replace("),", ")\n ,")}") | |
Console.flush() | |
val a = f(s) | |
// println(s"<-- JunctionAction: $a") | |
// Console.flush() | |
a | |
} else f(s) | |
}) | |
} | |
// ================================================================================================================== | |
/** | |
* P = external state representing the worker pool (or something else?) | |
* PI = data that can be sent to the junction to indicate/trigger a change in P. | |
*/ | |
def haha[P, Q, PI, A, W] | |
( | |
allowDownstream: P => Boolean | |
, onPoolInput: (P, PI) => P | |
, onSendWork: P => P | |
, recv: (Q, Seq[A]) => Q | |
, pop: Q => Option[(Q, A)] | |
, query: (P, Q) => W | |
)(initialP: P, initialQ: Q): JunctionStrategy[W, PI \/ A, A] = { | |
// type S = (P,Q) | |
type I = PI \/ A | |
type O = A | |
type JS = JunctionStrategy[W, I, O] | |
type JA = JunctionAction[W, A] | |
type JXX = JX[W, PI \/ A, A] | |
def partition_\/[A, B](is: Seq[A \/ B]): (List[A], List[B]) = | |
// shithouse perf | |
(is.toList.filter(_.isLeft).map(_.swap.toOption.get) | |
, is.toList.filter(_.isRight).map(_.toOption.get) | |
) | |
def genActionsO(p0: P, q0: Q, refs: Seq[DownRefO]): (P, Q, List[WriteO[W, A]]) = { | |
// vars for performance for now, this fn is still RT | |
var p = p0 | |
var q = q0 | |
var jas = List.empty[WriteO[W, A]] | |
val i = refs.iterator | |
var goodToEmit = allowDownstream(p) | |
while (goodToEmit && i.hasNext) | |
pop(q) match { | |
case None => | |
goodToEmit = false | |
case Some((q2, work)) => | |
q = q2 | |
p = onSendWork(p) | |
jas ::= WriteO[W, A](work :: Nil, i.next()) | |
goodToEmit = allowDownstream(p) | |
} | |
(p, q, jas) | |
} | |
def emitOW(p: P, q: Q, jx: JXX, actionsO: List[WriteO[W, A]]): JS = { | |
val actionsW: List[JA] = genActionsW(p, q, jx) | |
emitAll(actionsW ::: actionsO) | |
} | |
def genActionsW(p: P, q: Q, jx: JXX): List[WriteW[W, A]] = | |
if (jx.downW.isEmpty) | |
Nil | |
else { | |
val w: W = query(p, q) | |
val ws = w :: Nil | |
(List.empty[WriteW[W, A]] /: jx.downW)((acc, ref) => WriteW[W, A](ws, ref) :: acc) | |
} | |
def genActionsO_emitOW(jx: JXX, p: P, q: Q, refs: Seq[DownRefO]): (P, Q, JS) = { | |
val (p2, q2, ao) = genActionsO(p, q, refs) | |
val js = emitOW(p2, q2, jx, ao) | |
(p2, q2, js) | |
} | |
def drain(p: P, q: Q, rsn: Throwable): JS = { | |
lazy val go: JS = | |
junction[W, PI \/ A, A] { | |
case Open(jx, ref: UpRef) => jx.close(ref, rsn) fby go | |
case Open(jx, ref: DownRefW) => jx.writeW(query(p, q), ref) fby go | |
case Receive(jx, _, ref) => jx.close(ref, rsn) fby go | |
case Ready(jx, ref: DownRefO) => | |
if (pop(q).isDefined) { | |
// TODO what about when !allowDownstream | |
val (p2, q2, actions) = genActionsO_emitOW(jx, p, q, ref :: Nil) | |
actions fby drain(p2, q2, rsn) | |
} else | |
// next fby (if (nq.size > 0) drain(nq, rsn) else Halt(rsn)) | |
Halt(rsn) | |
case o => | |
go | |
} | |
go | |
} | |
def go(p: P, q: Q): JS = { | |
lazy val nop: JS = | |
junction[W, PI \/ A, A] { | |
case Open(jx, ref: UpRef) => | |
//if (bounded && q.size >= max) nop else | |
jx.more(ref) fby nop | |
case Open(jx, ref: DownRefW) => | |
jx.writeW(query(p, q), ref) fby nop | |
case Receive(jx, input, ref) => | |
val (poolMsgs, work) = partition_\/(input) | |
val p1 = (p /: poolMsgs)(onPoolInput) | |
val q1 = recv(q, work) | |
val (p2, q2, actions) = genActionsO_emitOW(jx, p1, q1, jx.downReadyO) | |
actions fby jx.more(ref) fby go(p2, q2) | |
case Ready(jx, ref: DownRefO) => | |
val (p2, q2, actions) = genActionsO_emitOW(jx, p, q, ref :: Nil) | |
actions fby jx.moreAll fby go(p2, q2) | |
case DoneDown(jx, rsn) => | |
if (jx.downO.nonEmpty) // && pop(q).isDefined) | |
jx.closeAllUp(rsn) fby drain(p, q, rsn) | |
else | |
Halt(rsn) | |
// case Done(_,_,_) | DoneDown(_,_) | DoneUp(_,_) | Open(_, _:DownRefO) => nop | |
case other => | |
//println(s" --> Ignoring JunctionSignal: $other") | |
nop | |
} | |
nop | |
} | |
go(initialP, initialQ) | |
} | |
// ================================================================================================================== | |
// ================================================================================================================== | |
// ================================================================================================================== | |
// ================================================================================================================== | |
// ================================================================================================================== | |
// ================================================================================================================== | |
// ================================================================================================================== | |
/** | |
* When downstream processes are available and ready for data, pops data off the queue and sends it on. | |
* When there are no downstream processes ready for data, accepts data from upstream and puts it on the queue. | |
* Queue and its details are entirely externalised. | |
*/ | |
def haha2[Q, A, W] | |
( | |
recv: (Q, Seq[A]) => Q | |
, pop: Q => Option[(Q, A)] | |
, queueFull: Q => Boolean | |
, query: Q => W | |
)(initialQ: Q): JunctionStrategy[W, A, A] = { | |
type I = A | |
type O = A | |
type JS = JunctionStrategy[W, I, O] | |
type JA = JunctionAction[W, A] | |
type JXX = JX[W, A, A] | |
def genActionsO(q0: Q, refs: Seq[DownRefO]): (Q, List[WriteO[W, A]]) = { | |
// vars for performance for now, this fn is still RT | |
var q = q0 | |
var jas = List.empty[WriteO[W, A]] | |
val i = refs.iterator | |
var goodToEmit = true // allowDownstream(p) | |
while (goodToEmit && i.hasNext) | |
pop(q) match { | |
case None => | |
goodToEmit = false | |
case Some((q2, work)) => | |
q = q2 | |
jas ::= WriteO[W, A](work :: Nil, i.next()) | |
// goodToEmit = allowDownstream(p) | |
} | |
(q, jas) | |
} | |
// def genActionsO_1(q: Q, ref: DownRefO): (Q, List[WriteO[W, A]]) = { | |
// pop(q) match { | |
// case None => (q, Nil) | |
// case Some((q2, work)) => (q2, WriteO[W, A](work :: Nil, ref) :: Nil) | |
// } | |
def emitOW(q: Q, jx: JXX, actionsO: List[WriteO[W, A]]): JS = { | |
val actionsW: List[JA] = genActionsW(q, jx) | |
emitAll(actionsW ::: actionsO) | |
} | |
def genActionsW(q: Q, jx: JXX): List[WriteW[W, A]] = | |
if (jx.downW.isEmpty) | |
Nil | |
else { | |
val w: W = query(q) | |
val ws = w :: Nil | |
(List.empty[WriteW[W, A]] /: jx.downW)((acc, ref) => WriteW[W, A](ws, ref) :: acc) | |
} | |
def genActionsO_emitOW(jx: JXX, q: Q, refs: Seq[DownRefO]): (Q, JS) = { | |
val (q2, ao) = genActionsO(q, refs) | |
val js = emitOW(q2, jx, ao) | |
(q2, js) | |
} | |
def drain(q: Q, rsn: Throwable): JS = { | |
lazy val go: JS = | |
junction[W, A, A] { | |
case Open(jx, ref: UpRef) => jx.close(ref, rsn) fby go | |
case Open(jx, ref: DownRefW) => jx.writeW(query(q), ref) fby go | |
case Receive(jx, _, ref) => jx.close(ref, rsn) fby go | |
case Ready(jx, ref: DownRefO) => | |
if (pop(q).isDefined) { | |
val (q2, actions) = genActionsO_emitOW(jx, q, ref :: Nil) | |
actions fby drain(q2, rsn) | |
} else | |
Halt(rsn) | |
case o => | |
go | |
} | |
go | |
} | |
def recvMoreThen(q: Q, cond2: Boolean = true)(more: => JS, next: JS): JS = | |
if (cond2 && !queueFull(q)) more fby next else next | |
def go(q: Q): JS = { | |
lazy val nop: JS = | |
junction[W, A, A] { | |
case Open(jx, ref: UpRef) => | |
recvMoreThen(q)(jx.more(ref), nop) | |
case Open(jx, ref: DownRefW) => | |
jx.writeW(query(q), ref) fby nop | |
case Receive(jx, input, ref) => | |
val q1 = recv(q, input) | |
val (q2, actions) = genActionsO_emitOW(jx, q1, jx.downReadyO) | |
//println(s"STATE CHANGE: $q2 <~~ $q") | |
actions fby recvMoreThen(q2)(jx.more(ref), go(q2)) | |
case Ready(jx, ref: DownRefO) => | |
val (q2, actions) = genActionsO_emitOW(jx, q, ref :: Nil) | |
//println(s"STATE CHANGE: $q2 <~~ $q2")` | |
actions fby recvMoreThen(q2, jx.upReady.nonEmpty)(jx.moreAll, go(q2)) | |
case DoneDown(jx, rsn) => | |
if (jx.downO.nonEmpty) | |
jx.closeAllUp(rsn) fby drain(q, rsn) | |
else | |
Halt(rsn) | |
// case Done(_,_,_) | DoneDown(_,_) | DoneUp(_,_) | Open(_, _:DownRefO) => nop | |
case other => | |
// println(s" --> Ignoring JunctionSignal: $other") | |
nop | |
} | |
nop | |
} | |
go(initialQ) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalaz.stream.merge | |
import java.util.concurrent.{TimeUnit, CountDownLatch, ScheduledExecutorService, ExecutorService} | |
import org.specs2.mutable.Specification | |
import scala.concurrent.SyncVar | |
import scalaz.concurrent.{Strategy, Task} | |
import scalaz.stream.Process.End | |
import scalaz.stream._ | |
import scalaz.stream.{Process => P} | |
import scalaz.syntax.bind._ | |
import scalaz.{\/-, \/} | |
class JunctionExperimentTest extends Specification { | |
implicit def SSS: Strategy = throw new RuntimeException("Strategy") | |
implicit def pool: ExecutorService = throw new RuntimeException("ExecutorService") | |
implicit def scheduler:ScheduledExecutorService = throw new RuntimeException("ScheduledExecutorService") | |
def emitOne[A](a: A, sink: Sink[Task, A]): Task[Unit] = | |
P.eval(Task.now(a)).to(sink).run | |
def emitAll[A](as: Seq[A], sink: Sink[Task, A]): Task[Unit] = | |
Task.delay (as.foreach(a => emitOne(a, sink).run)) | |
def runAsync(t: Task[Unit]): Unit = t.runAsync(_ => ()) | |
def testPassThrough[I](up: Sink[Task, I], j: Junction[_, _, Int])(fi: Int => I) = { | |
val l = List(19,17,15,13,11) | |
val t1 = emitAll(l.map(fi), up) >> j.downstreamClose(End) | |
val collected = new SyncVar[Throwable\/IndexedSeq[Int]] | |
j.downstreamO.runLog.runAsync(collected.put) | |
runAsync(t1) | |
collected.get(8000) must be_==(Some(\/-(l.toVector))) | |
} | |
"boundedQueue" in { | |
val j = Junction(JunctionStrategies.boundedQ[Int](0), Process.halt)(Strategy.DefaultStrategy) | |
testPassThrough(j.upstreamSink, j)(identity) | |
} | |
"myJS" in { | |
val js = JunctionExperiment.haha2[List[Int], Int, String]( | |
(q,as) => as.toList ::: q //, recv: (Q, Seq[A]) => Q | |
,q => if (q.isEmpty) None else Some((q.init, q.last)) //, pop: Q => Option[(Q, A)] | |
,_ => false // , queueFull: Q => Boolean | |
,q => s"Size is = " + q.size //, query: (P, Q) => W | |
)(Nil) | |
def newJ: Junction[_, Int, Int] = Junction.apply(js, Process.halt)() | |
val j = newJ | |
testPassThrough(j.upstreamSink, j)(identity) | |
} | |
implicit val intOrder = scalaz.Order.fromScalaOrdering[Int] | |
"myJS: pri test" in { | |
// [Q, A, W] | |
val js = JunctionExperiment.haha2[List[Int], Int, Int]( | |
(q, as) => (as.toList ::: q).sorted //, recv: (Q, Seq[A]) => Q | |
, q => if (q.isEmpty) None else Some((q.init, q.last)) //, pop: Q => Option[(Q, A)] | |
, _ => false // , queueFull: Q => Boolean | |
, q => q.size //, query: (P, Q) => W | |
)(List.empty[Int]) | |
val initialWorkerCount = 4 | |
val workBatch2 = List(1, 50, 3, 52, 2, 51) | |
val latch1 = new CountDownLatch(initialWorkerCount + 1) | |
val latch2 = new CountDownLatch(initialWorkerCount + 1) | |
val latch3 = new CountDownLatch(initialWorkerCount + 1 + workBatch2.size) | |
@volatile var got = Vector.empty[Int] // No sync because it will be single-threaded when it matters | |
val captureS: Sink[Task, Int] = | |
P.constant((i: Int) => Task.delay { | |
got :+= i | |
latch1.countDown() | |
latch1.await(2, TimeUnit.SECONDS) | |
latch2.countDown() | |
latch2.await(2, TimeUnit.SECONDS) | |
latch3.countDown() | |
}) | |
val j: Junction[_, Int, Int] = Junction.apply(js, Process.halt)() | |
try { | |
val workSink: Sink[Task, Int] = j.upstreamSink | |
val captureP = j.downstreamO.to(captureS) | |
// Start initialWorkerCount workers | |
runAsync(captureP.run) | |
for (i <- 2 to initialWorkerCount) | |
runAsync(captureP.take(1).run) | |
// Give all workers a job but don't let them finish | |
runAsync(emitAll((1 to initialWorkerCount).toList, workSink)) | |
latch1.countDown() | |
latch1.await(2, TimeUnit.SECONDS) aka "All workers should have a job" must beTrue | |
// Queue up new jobs while all workers are busy | |
runAsync(emitAll(workBatch2, workSink)) | |
j.downstreamW.take(1).runLog.run.toList must_== List(6) | |
got = Vector.empty | |
latch2.getCount ==== 1 | |
// Release workers | |
latch2.countDown() | |
latch2.await(2, TimeUnit.SECONDS) aka "All workers should finish" must beTrue | |
// Wait for second batch of jobs to finish | |
latch3.countDown() | |
latch3.await(2, TimeUnit.SECONDS) | |
latch3.getCount ==== 0 | |
// Confirm jobs were prioritised before doled out | |
got must be_==(workBatch2.sorted.reverse) | |
} finally { | |
runAsync(j.downstreamClose(End)) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment