Skip to content

Instantly share code, notes, and snippets.

@remeniuk
Created September 27, 2013 08:44
Show Gist options
  • Save remeniuk/6725792 to your computer and use it in GitHub Desktop.
Save remeniuk/6725792 to your computer and use it in GitHub Desktop.
package com.viaden.crm.server
import java.net.InetSocketAddress
import akka.actor._
import akka.io._
import akka.io.Tcp._
import akka.io.IO
import akka.io.Tcp.Connected
import akka.actor.Terminated
import akka.io.Tcp.Bind
import akka.io.Tcp.Bound
import akka.actor.SupervisorStrategy.Restart
import com.viaden.crm.config.Configuration
import scala.concurrent.{ExecutionContext, Future}
import akka.dispatch.MessageDispatcher
import java.util.concurrent.{Callable, ThreadPoolExecutor, ExecutorService}
import akka.dispatch.Futures.future;
/**
* User: Maxim Korolyov
*/
class SslProtobufEndpoint(local: InetSocketAddress) extends Actor with ActorLogging with SslConfiguration {
override val supervisorStrategy =
OneForOneStrategy() {
case _ => Restart
}
implicit def system = context.system
IO(Tcp) ! Bind(self, local)
log.debug(s"BindTCP connection to ${local}")
def receive: Receive = {
case _: Bound ⇒
log.debug("Successfully bound TCP connection")
context.become(bound(sender))
}
def bound(listener: ActorRef): Receive = {
case Connected(remote, _) ⇒
val dispatcher = system.dispatchers.lookup(Configuration.SSL_INIT_THREAD_POOL_DISPATCHER)
val f = Future {
val init = TcpPipelineHandler.withLogger(log,
new TcpReadWriteAdapter >>
new SslTlsSupport(sslEngine(remote, client = false)) >>
new BackpressureBuffer(lowBytes = 100, highBytes = 1000, maxBytes = 1000000))
val connection = sender
val handler = context.actorOf(Props(new ProtobufRequestHandler(init, connection)).
withDispatcher(Configuration.PROTOBUF_HANDLER_DISPATCHER))
val pipeline = context.actorOf(TcpPipelineHandler.props(init, connection, handler))
connection ! Tcp.Register(pipeline)
}(dispatcher)
case _: Terminated ⇒
listener ! Unbind
context.become {
case Unbound ⇒ context stop self
}
}
def sslEngine(address: InetSocketAddress, client: Boolean) = {
val engine = sslContext.createSSLEngine(address.getHostName, address.getPort)
engine.setUseClientMode(client)
engine
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment