Created
December 19, 2014 16:50
-
-
Save Alien2150/d0c74d99c19df59109ff to your computer and use it in GitHub Desktop.
TLS Example with Reactive Streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package connection | |
import java.net.InetSocketAddress | |
import java.security.{SecureRandom, KeyStore} | |
import javax.net.ssl.{TrustManagerFactory, KeyManagerFactory, SSLContext} | |
import akka.actor.Actor.Receive | |
import akka.stream.FlowMaterializer | |
import akka.stream.actor.ActorPublisherMessage.Request | |
import akka.stream.actor.ActorSubscriberMessage.{OnComplete, OnNext} | |
import akka.stream.actor.{ActorPublisher, OneByOneRequestStrategy, RequestStrategy, ActorSubscriber} | |
import akka.stream.io.StreamTcp.IncomingConnection | |
import akka.util.ByteString | |
// import actor.ClientConnectionActor | |
import scala.concurrent.{ promise } | |
import akka.actor.{Actor, ActorLogging, Props} | |
import akka.stream.io.SslTlsCipher.{InboundSession, SessionNegotiation} | |
import akka.stream.io.{SslTlsCipherActor, SslTlsCipher, StreamTcp} | |
import akka.stream.scaladsl._ | |
import scala.concurrent.duration._ | |
/** | |
* Do some logging here (listener) | |
*/ | |
class SomeLoggingActor extends ActorSubscriber with ActorLogging { | |
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy | |
override def receive: Receive = { | |
case OnNext(data: ByteString) => | |
log.info(s"Got some binary data ${data.toArray.length} bytes") | |
case OnNext(s: InboundSession) => | |
// Log everything coming for the session here | |
log.info(s"subscribing to data") | |
s.data.subscribe(ActorSubscriber(context.system.actorOf(Props[SomeLoggingActor], "log_data"))) | |
log.info(s"I can haz session") | |
case OnComplete => | |
log.info("Seems like the stream has been closed (Client shutdown)") | |
case x => | |
log.info(s"foo: ${x.getClass}") | |
} | |
} | |
class SomeRandomBytesGeneratorActor extends ActorPublisher[ByteString] with ActorLogging { | |
implicit val ec = context.system.dispatcher | |
case class EmitRandomBytes(data: ByteString) | |
context.system.scheduler.schedule( | |
1 second, | |
10 seconds, | |
self, | |
EmitRandomBytes(ByteString.fromString("hello world")) | |
) | |
override def receive: Actor.Receive = { | |
case EmitRandomBytes(d) => | |
log.info(s"Emitting .... ${d}") | |
// Emit message | |
onNext(d) | |
case Request(n) => | |
log.info(s"Got a request for ${n} data") | |
case x => | |
log.info(s"Received: ${x} ${x.getClass}") | |
} | |
} | |
class Server extends Actor with ActorLogging { | |
implicit val system = context.system | |
implicit val ec = system.dispatcher | |
implicit val materializer = FlowMaterializer() | |
var i = 0 | |
def initSslContext(): SSLContext = { | |
val password = "changeme" | |
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) | |
keyStore.load(getClass.getResourceAsStream("/keystore"), password.toCharArray) | |
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType) | |
trustStore.load(getClass.getResourceAsStream("/truststore"), password.toCharArray) | |
log.info(s"Using key manager algorithm: ${KeyManagerFactory.getDefaultAlgorithm}") | |
val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) | |
keyManagerFactory.init(keyStore, password.toCharArray) | |
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) | |
trustManagerFactory.init(trustStore) | |
val context = SSLContext.getInstance("TLS") // How to get v1.2 | |
context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom) | |
for (protocol <- context.getSupportedSSLParameters.getProtocols) { | |
log.info(s"Supported protocol: $protocol") | |
} | |
context | |
} | |
val sslEngine = initSslContext().createSSLEngine() | |
sslEngine.setEnabledCipherSuites(Array("TLS_RSA_WITH_AES_128_CBC_SHA")) | |
sslEngine.setUseClientMode(false) | |
// TODO:: how to deal with multiple connections? | |
val p = promise[SslTlsCipher] | |
val f = p.future | |
val connectionHandler = ForeachSink[StreamTcp.IncomingConnection] { conn => handleClientFlow(conn) } | |
def handleClientFlow(conn: IncomingConnection) = { | |
log.info(s"Handling connection: ${conn.remoteAddress}") | |
i += 1 | |
context.system.actorOf(Props(classOf[SslTlsCipherActor], self, SessionNegotiation(sslEngine), true), s"session_negotiation_${i}") | |
f onSuccess { | |
case cipher: SslTlsCipher => | |
// Defining tls flow | |
log.info("Define TLS Flow") | |
// 1. subscribe all incoming data to cipher.cipherTextInbound | |
// 2. publish all outgoing data to cipher.cipherTextOutbound | |
conn.flow.join( | |
Flow( | |
Sink(cipher.cipherTextInbound), // Incoming data (From the client) | |
Source(cipher.cipherTextOutbound) // Outgoing data (To the client) | |
) | |
).run() | |
// Define source (that emits the data to the client (in this case the "plainTextOutbound" subscriber) | |
Source(ActorPublisher(context.system.actorOf(Props[SomeRandomBytesGeneratorActor], "generator"))).to(Sink(cipher.plainTextOutbound)).run() | |
// Define workflow to read the plain data | |
// Logging (and also listener) | |
// cipher.cipherTextOutbound.subscribe(ActorSubscriber(context.system.actorOf(Props[SomeLoggingActor], "foo"))) | |
cipher.sessionInbound.subscribe(ActorSubscriber(context.system.actorOf(Props[SomeLoggingActor], "session_inbound_actor"))) | |
} | |
} | |
val serverBinding = StreamTcp().bind(new InetSocketAddress("0.0.0.0", 16500)) | |
val materializedServer = serverBinding.connections.runWith(connectionHandler) | |
// Define receive method | |
def receive = { | |
case c: SslTlsCipher => p.success(c) | |
case x => log.info(s"Something different: ${x}") | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment