Last active
August 29, 2015 13:57
-
-
Save mandubian/9365064 to your computer and use it in GitHub Desktop.
Look at the awakeEvery on client... only first element of the process is emitted and no other...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Process._ | |
import java.net.InetSocketAddress | |
import scala.Some | |
import scala.concurrent.SyncVar | |
import scala.util.Random | |
import scalaz.-\/ | |
import scalaz.\/ | |
import scalaz.\/- | |
import scalaz.concurrent.Task | |
import scala.concurrent.duration._ | |
import scalaz.stream.Process.Halt | |
import scalaz.stream.ReceiveY.HaltL | |
import scalaz.stream.ReceiveY.HaltR | |
import scalaz.stream.ReceiveY.ReceiveL | |
import scalaz.stream.ReceiveY.ReceiveR | |
import scalaz.stream._ | |
object NioServer { | |
import Process._ | |
def apply(address: InetSocketAddress, w: Writer1[Bytes, Bytes, Bytes]): Process[Task, Bytes] = { | |
val srv = | |
nio.server(address).map { | |
_.flatMap { | |
ex => ex.readThrough(w).runReceive | |
} | |
} | |
merge.mergeN(srv) | |
} | |
def echo(address: InetSocketAddress): Process[Task, Bytes] = { | |
def echoAll: Writer1[Bytes, Bytes, Bytes] = { | |
receive1[Bytes, Bytes \/ Bytes]({ | |
i => emitSeq(Seq(\/-(i), -\/(i))) fby echoAll | |
}) | |
} | |
apply(address, echoAll) | |
} | |
def limit(address: InetSocketAddress, size:Int) : Process[Task, Bytes] = { | |
def remaining(sz:Int): Writer1[Bytes, Bytes, Bytes] = { | |
receive1[Bytes, Bytes \/ Bytes]({ | |
i => | |
val toEcho = i.take(sz) | |
if (sz - toEcho.size <= 0) emitSeq(Seq(\/-(toEcho), -\/(toEcho))) fby halt | |
else emitSeq(Seq(\/-(toEcho), -\/(toEcho))) fby remaining(sz -toEcho.size) | |
}) | |
} | |
apply(address,remaining(size)) | |
} | |
} | |
object NioClient { | |
import Process._ | |
def echo(address: InetSocketAddress, data: Process[Task, Bytes]): Process[Task, Bytes] = { | |
def echoSent: WyeW[Bytes, Bytes, Bytes, Bytes] = { | |
def go(collected: Int, expected: Int): WyeW[Bytes, Bytes, Bytes, Bytes] = { | |
receiveBoth { | |
case ReceiveL(rcvd) => | |
emitO(rcvd) fby | |
(if (collected + rcvd.size >= expected) halt | |
else go(collected + rcvd.size, expected)) | |
case ReceiveR(data) => tell(data) fby go(collected, expected + data.size) | |
case HaltL(rsn) => Halt(rsn) | |
case HaltR(_) => go(collected, expected) | |
} | |
} | |
go(0, 0) | |
} | |
for { | |
ex <- nio.connect(address) | |
rslt <- ex.wye(echoSent).run(data) | |
} yield { | |
rslt | |
} | |
} | |
} | |
def localAddress(port:Int) = new InetSocketAddress("127.0.0.1", port) | |
import scalaz._ | |
import Scalaz._ | |
//////////////////////////////////////////////////////////// | |
// SERVER1 | |
val stop = async.signal[Boolean] | |
stop.set(false).run | |
val local1 = localAddress(11100) | |
val recv1 = new SyncVar[Vector[String]] | |
val server1 = | |
( stop.discrete wye NioServer.echo(local1) )(wye.interrupt) map { bytes => | |
new String(bytes.toArray) | |
} | |
val data1 = Seq( | |
"1,1", | |
"1,2", | |
"1,3", | |
"1,4", | |
"1,5", | |
"1,6" | |
) | |
val clientProcess1 = Process.emitAll(data1.map { s => Bytes.of((s+"\n").getBytes) }) | |
////////////////////////////////////////////////// | |
////////////////////////////////////////////////// | |
////////////////////////////////////////////////// | |
// THIS IS THE IMPORTANT CODE | |
val cp = (clientProcess1 zipWith Process.awakeEvery(100 milliseconds)) { (i, b) => i } | |
////////////////////////////////////////////////// | |
////////////////////////////////////////////////// | |
////////////////////////////////////////////////// | |
val client1 = NioClient.echo(local1, cp) | |
server1.runLog.runAsync { | |
case -\/(e) => println("e:"+e.getMessage); throw e | |
case \/-(td) => recv1.put(td.toVector) | |
} | |
Thread.sleep(300) | |
client1.runLog.timed(3000).run | |
stop.set(true).run | |
println("RECVDATA1:"+recv1.get(5000)) | |
// RESULT | |
scala> println("RECVDATA1:"+recv1.get(5000)) | |
RECVDATA1:Some(Vector(1,1 | |
)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment