Skip to content

Instantly share code, notes, and snippets.

@jilen
Last active December 15, 2015 12:47
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jilen/10a664cd588af10b7d09 to your computer and use it in GitHub Desktop.
Save jilen/10a664cd588af10b7d09 to your computer and use it in GitHub Desktop.
import scalaz.stream._
import scalaz.stream.tcp.Connection
import scodec.bits.ByteVector
import java.net.InetSocketAddress
import java.io._
object Foo {
implicit val S = scalaz.concurrent.Strategy.DefaultStrategy
implicit val AG = tcp.DefaultAsynchronousChannelGroup
val Delimiter = ByteVector("\u0000".getBytes)
val addr = new InetSocketAddress("localhost", 3333)
def logged( prefix: String): Process1[String, String] = {
process1.id[String].map { str =>
println(s"$prefix-$str")
str
}
}
def serverLog(prefix: String) = logged(prefix)
def clientLog(prefix: String) = logged(prefix)
val readStr = {
def readUnitl(delimiter: ByteVector): Process[Connection, ByteVector] = {
tcp.reads(1024).flatMap { bytes =>
if(bytes.endsWith(delimiter)) {
Process.emit(bytes)
} else {
Process.emit(bytes) ++ readUnitl(delimiter)
}
}
}
readUnitl(Delimiter).map { bytes =>
new String(bytes.toArray)
}
}
def runServer() {
def writeStr(str: String) = tcp.write(ByteVector(str.getBytes))
val echoServer = (readStr |> serverLog("[Server] Receiving")).flatMap(writeStr)
val server = tcp.server(addr, concurrentRequests = 1)(echoServer.repeat)
server.flatMap(_.drain).run.run
}
def runClient() {
val topic = async.topic[String]()
val echoClient = (topic.subscribe |> clientLog("[Client] Inputing")).map { str =>
tcp.write(ByteVector(str.getBytes) ++ Delimiter) ++ (readStr |> clientLog("[Client] Receiving"))
}
val client = tcp.connect(addr)(tcp.lift(echoClient))
client.run.runAsync(println)
io.stdInLines.to(topic.publish).run.run
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment