Skip to content

Instantly share code, notes, and snippets.

@mandubian
Created March 3, 2014 08:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mandubian/9320732 to your computer and use it in GitHub Desktop.
Save mandubian/9320732 to your computer and use it in GitHub Desktop.
Chaining 2 server/client(s) prevent gives random results...
import java.net.InetSocketAddress
import scalaz.{-\/, \/, \/-}
import scalaz.concurrent.Task
import scalaz.stream._
import scala.concurrent.duration._
import ReceiveY._
import scala.concurrent.SyncVar
object Server {
def apply(address: InetSocketAddress, w: Writer1[Bytes, Bytes, Bytes]): Process[Task, Bytes] = {
val srv =
nio.server(address).map { client =>
client.flatMap {
ex => ex.readThrough(w).runReceive
}
}
merge.mergeN(srv)
}
def ack(address: InetSocketAddress): Process[Task, Bytes] = {
def ackAll: Writer1[Bytes, Bytes, Bytes] = {
Process.receive1[Bytes, Bytes \/ Bytes]({
i => println("server received"+new String(i.toArray)); Process.emitSeq(Seq(\/-(i), -\/(Bytes.of(Array(0x00: Byte))))) fby ackAll
})
}
apply(address, ackAll)
}
}
object Client {
import Process._
def send(address: InetSocketAddress, data: Process[Task, Bytes]): Process[Task, Bytes] = {
def ack: WyeW[Bytes, Bytes, Bytes, Bytes] = {
def go(collected: Int): WyeW[Bytes, Bytes, Bytes, Bytes] = {
receiveBoth {
case ReceiveL(rcvd) => println("client received "+rcvd); emitO(rcvd) fby halt //go(collected + rcvd.size)
case ReceiveR(data) => println("client sending "+new String(data.toArray)); tell(data) fby go(collected)
case HaltL(rsn) => Halt(rsn)
case HaltR(_) => go(collected)
}
}
go(0)
}
for {
ex <- nio.connect(address)
rslt <- ex.wye(ack).run(data)
} yield {
rslt
}
}
}
def localAddress(port:Int) = new InetSocketAddress("127.0.0.1", port)
import scalaz._
import Scalaz._
////////////////////////////////////////////////////////////
// SERVER1
val stop = async.signal[Boolean]
stop.set(false).run
val local1 = localAddress(11100)
val recv1 = new SyncVar[Vector[String]]
val server1 =
( stop.discrete wye Server.ack(local1) )(wye.interrupt) map { bytes =>
new String(bytes.toArray)
}
val data1 = Seq(
"1,1",
"1,2",
"1,3",
"1,4",
"1,5",
"1,6"
)
val clientProcess1 = Process.emitAll(data1.map { s => Bytes.of((s+"\n").getBytes) })
val client1 = Client.send(local1, clientProcess1)
server1.runLog.runAsync {
case -\/(e) => println("e:"+e.getMessage); throw e
case \/-(td) => recv1.put(td.toVector)
}
Thread.sleep(300)
client1.runLog.timed(3000).run
stop.set(true).run
println("RECVDATA1:"+recv1.get(5000))
////////////////////////////////////////////////////////////
// SERVER1
val stop2 = async.signal[Boolean]
stop2.set(false).run
val local2 = localAddress(11101)
val recv2 = new SyncVar[Vector[String]]
val server2 =
( stop2.discrete wye Server.ack(local2) )(wye.interrupt) map { bytes =>
new String(bytes.toArray)
}
val data2 = Seq(
"2,1",
"2,2",
"2,3",
"2,4",
"2,5",
"2,6"
)
val clientProcess2 = Process.emitAll(data2.map { s => Bytes.of((s+"\n").getBytes) })
val client2 = Client.send(local2, clientProcess2)
server2.runLog.runAsync {
case -\/(e) => throw e
case \/-(td) => recv2.put(td.toVector)
}
Thread.sleep(300)
client2.runLog.timed(3000).run
stop2.set(true).run
println("RECVDATA2:"+recv2.get(5000))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment