Skip to content

Instantly share code, notes, and snippets.

@pchlupacek
Last active August 29, 2015 14:01
Show Gist options
  • Save pchlupacek/b2fbd11beb02b1e3b76b to your computer and use it in GitHub Desktop.
Save pchlupacek/b2fbd11beb02b1e3b76b to your computer and use it in GitHub Desktop.
issues with object
package scalaz.stream2
import Process._
import org.scalacheck.Prop._
import org.scalacheck.Properties
import scalaz.concurrent.{Strategy, Task}
import scalaz.{\/, stream2, \/-, -\/}
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.SyncVar
import scala.concurrent.duration._
/**
* Created by pach on 07/03/14.
*/
object ExperimentSpec extends Properties("Experiments") {
implicit val S = TestUtil.DefaultSpecExecutorService
implicit val scheduler = scalaz.stream2.DefaultScheduler
property("here") = secure {
try {
val cleanups = new AtomicInteger(0)
val srcCleanup = new AtomicInteger(0)
//just change this here between val (passing) and def (deadlocking)
def delayEach10 = Process.awakeEvery(10 seconds)
def oneUp(index:Int) = (emit(index).toSource ++ delayEach10.map(_=>index)) onComplete
eval(Task.delay{val i = cleanups.incrementAndGet();Thread.sleep(100);i})
val ps =
(emitAll(for (i <- 0 until 10) yield oneUp(i)).toSource ++ delayEach10.drain) onComplete
eval_(Task.delay(srcCleanup.set(99)))
merge.mergeN(ps).takeWhile(_ < 9).runLog.timed(3000).run
(cleanups.get == 10) :| s"Cleanups were called on upstreams: ${cleanups.get}" &&
(srcCleanup.get == 99) :| "Cleanup on source was called"
} catch {
case t: Throwable => t.printStackTrace(); throw t
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment