Skip to content

Instantly share code, notes, and snippets.

@jvz
Created July 19, 2017 14:58
Show Gist options
  • Save jvz/864c8f1405a384918fd1727346747982 to your computer and use it in GitHub Desktop.
Save jvz/864c8f1405a384918fd1727346747982 to your computer and use it in GitHub Desktop.
Trivial TCP server for Avro IPC
package avro.akka.ipc
import java.nio.ByteBuffer
import java.util.{ArrayList => AList, List => JList}
import akka.NotUsed
import akka.actor.{Actor, Props}
import akka.stream.Materializer
import akka.stream.scaladsl.Tcp.ServerBinding
import akka.stream.scaladsl.{BidiFlow, Flow, Framing, Tcp}
import akka.util.ByteString
import org.apache.avro.ipc.{Responder => IpcResponder}
import scala.collection.JavaConverters._
class TcpStreamServer(port: Int, responder: IpcResponder)(implicit materializer: Materializer) extends Actor {
private val handler: Flow[ByteString, ByteString, NotUsed] = {
// TODO: this size limit should be configurable
val framingProtocol = Framing.simpleFramingProtocol(1024 * 1024 * 100)
val bufferTranslator = BidiFlow.fromFunctions(
(bytes: ByteString) => new AList(bytes.asByteBuffers.asJavaCollection),
(buffers: JList[ByteBuffer]) => buffers.asScala.map(ByteString(_)).reduce(_ ++ _)
)
val responseHandler = Flow.fromFunction(responder.respond)
framingProtocol.atop(bufferTranslator).join(responseHandler)
}
private var binding: ServerBinding = _
override def preStart(): Unit = {
import akka.pattern.pipe
import context.dispatcher
Tcp(context.system).bindAndHandle(handler, "0.0.0.0", port) pipeTo self
()
}
override def receive: Receive = {
case s: ServerBinding =>
binding = s
}
override def postStop(): Unit = {
binding.unbind()
()
}
}
object TcpStreamServer {
def props(port: Int, responder: IpcResponder)(implicit materializer: Materializer) =
Props(new TcpStreamServer(port, responder))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment