Skip to content

Instantly share code, notes, and snippets.

@mandubian
Last active August 29, 2015 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mandubian/9365064 to your computer and use it in GitHub Desktop.
Save mandubian/9365064 to your computer and use it in GitHub Desktop.
Look at the awakeEvery on client... only first element of the process is emitted and no other...
import Process._
import java.net.InetSocketAddress
import scala.Some
import scala.concurrent.SyncVar
import scala.util.Random
import scalaz.-\/
import scalaz.\/
import scalaz.\/-
import scalaz.concurrent.Task
import scala.concurrent.duration._
import scalaz.stream.Process.Halt
import scalaz.stream.ReceiveY.HaltL
import scalaz.stream.ReceiveY.HaltR
import scalaz.stream.ReceiveY.ReceiveL
import scalaz.stream.ReceiveY.ReceiveR
import scalaz.stream._
object NioServer {
import Process._
def apply(address: InetSocketAddress, w: Writer1[Bytes, Bytes, Bytes]): Process[Task, Bytes] = {
val srv =
nio.server(address).map {
_.flatMap {
ex => ex.readThrough(w).runReceive
}
}
merge.mergeN(srv)
}
def echo(address: InetSocketAddress): Process[Task, Bytes] = {
def echoAll: Writer1[Bytes, Bytes, Bytes] = {
receive1[Bytes, Bytes \/ Bytes]({
i => emitSeq(Seq(\/-(i), -\/(i))) fby echoAll
})
}
apply(address, echoAll)
}
def limit(address: InetSocketAddress, size:Int) : Process[Task, Bytes] = {
def remaining(sz:Int): Writer1[Bytes, Bytes, Bytes] = {
receive1[Bytes, Bytes \/ Bytes]({
i =>
val toEcho = i.take(sz)
if (sz - toEcho.size <= 0) emitSeq(Seq(\/-(toEcho), -\/(toEcho))) fby halt
else emitSeq(Seq(\/-(toEcho), -\/(toEcho))) fby remaining(sz -toEcho.size)
})
}
apply(address,remaining(size))
}
}
object NioClient {
import Process._
def echo(address: InetSocketAddress, data: Process[Task, Bytes]): Process[Task, Bytes] = {
def echoSent: WyeW[Bytes, Bytes, Bytes, Bytes] = {
def go(collected: Int, expected: Int): WyeW[Bytes, Bytes, Bytes, Bytes] = {
receiveBoth {
case ReceiveL(rcvd) =>
emitO(rcvd) fby
(if (collected + rcvd.size >= expected) halt
else go(collected + rcvd.size, expected))
case ReceiveR(data) => tell(data) fby go(collected, expected + data.size)
case HaltL(rsn) => Halt(rsn)
case HaltR(_) => go(collected, expected)
}
}
go(0, 0)
}
for {
ex <- nio.connect(address)
rslt <- ex.wye(echoSent).run(data)
} yield {
rslt
}
}
}
def localAddress(port:Int) = new InetSocketAddress("127.0.0.1", port)
import scalaz._
import Scalaz._
////////////////////////////////////////////////////////////
// SERVER1
val stop = async.signal[Boolean]
stop.set(false).run
val local1 = localAddress(11100)
val recv1 = new SyncVar[Vector[String]]
val server1 =
( stop.discrete wye NioServer.echo(local1) )(wye.interrupt) map { bytes =>
new String(bytes.toArray)
}
val data1 = Seq(
"1,1",
"1,2",
"1,3",
"1,4",
"1,5",
"1,6"
)
val clientProcess1 = Process.emitAll(data1.map { s => Bytes.of((s+"\n").getBytes) })
//////////////////////////////////////////////////
//////////////////////////////////////////////////
//////////////////////////////////////////////////
// THIS IS THE IMPORTANT CODE
val cp = (clientProcess1 zipWith Process.awakeEvery(100 milliseconds)) { (i, b) => i }
//////////////////////////////////////////////////
//////////////////////////////////////////////////
//////////////////////////////////////////////////
val client1 = NioClient.echo(local1, cp)
server1.runLog.runAsync {
case -\/(e) => println("e:"+e.getMessage); throw e
case \/-(td) => recv1.put(td.toVector)
}
Thread.sleep(300)
client1.runLog.timed(3000).run
stop.set(true).run
println("RECVDATA1:"+recv1.get(5000))
// RESULT
scala> println("RECVDATA1:"+recv1.get(5000))
RECVDATA1:Some(Vector(1,1
))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment