Skip to content

Instantly share code, notes, and snippets.

@Alien2150
Created December 19, 2014 16:50
Show Gist options
  • Save Alien2150/d0c74d99c19df59109ff to your computer and use it in GitHub Desktop.
Save Alien2150/d0c74d99c19df59109ff to your computer and use it in GitHub Desktop.
TLS Example with Reactive Streams
package connection
import java.net.InetSocketAddress
import java.security.{SecureRandom, KeyStore}
import javax.net.ssl.{TrustManagerFactory, KeyManagerFactory, SSLContext}
import akka.actor.Actor.Receive
import akka.stream.FlowMaterializer
import akka.stream.actor.ActorPublisherMessage.Request
import akka.stream.actor.ActorSubscriberMessage.{OnComplete, OnNext}
import akka.stream.actor.{ActorPublisher, OneByOneRequestStrategy, RequestStrategy, ActorSubscriber}
import akka.stream.io.StreamTcp.IncomingConnection
import akka.util.ByteString
// import actor.ClientConnectionActor
import scala.concurrent.{ promise }
import akka.actor.{Actor, ActorLogging, Props}
import akka.stream.io.SslTlsCipher.{InboundSession, SessionNegotiation}
import akka.stream.io.{SslTlsCipherActor, SslTlsCipher, StreamTcp}
import akka.stream.scaladsl._
import scala.concurrent.duration._
/**
* Do some logging here (listener)
*/
class SomeLoggingActor extends ActorSubscriber with ActorLogging {
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
override def receive: Receive = {
case OnNext(data: ByteString) =>
log.info(s"Got some binary data ${data.toArray.length} bytes")
case OnNext(s: InboundSession) =>
// Log everything coming for the session here
log.info(s"subscribing to data")
s.data.subscribe(ActorSubscriber(context.system.actorOf(Props[SomeLoggingActor], "log_data")))
log.info(s"I can haz session")
case OnComplete =>
log.info("Seems like the stream has been closed (Client shutdown)")
case x =>
log.info(s"foo: ${x.getClass}")
}
}
class SomeRandomBytesGeneratorActor extends ActorPublisher[ByteString] with ActorLogging {
implicit val ec = context.system.dispatcher
case class EmitRandomBytes(data: ByteString)
context.system.scheduler.schedule(
1 second,
10 seconds,
self,
EmitRandomBytes(ByteString.fromString("hello world"))
)
override def receive: Actor.Receive = {
case EmitRandomBytes(d) =>
log.info(s"Emitting .... ${d}")
// Emit message
onNext(d)
case Request(n) =>
log.info(s"Got a request for ${n} data")
case x =>
log.info(s"Received: ${x} ${x.getClass}")
}
}
class Server extends Actor with ActorLogging {
implicit val system = context.system
implicit val ec = system.dispatcher
implicit val materializer = FlowMaterializer()
var i = 0
def initSslContext(): SSLContext = {
val password = "changeme"
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
keyStore.load(getClass.getResourceAsStream("/keystore"), password.toCharArray)
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
trustStore.load(getClass.getResourceAsStream("/truststore"), password.toCharArray)
log.info(s"Using key manager algorithm: ${KeyManagerFactory.getDefaultAlgorithm}")
val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
keyManagerFactory.init(keyStore, password.toCharArray)
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
trustManagerFactory.init(trustStore)
val context = SSLContext.getInstance("TLS") // How to get v1.2
context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom)
for (protocol <- context.getSupportedSSLParameters.getProtocols) {
log.info(s"Supported protocol: $protocol")
}
context
}
val sslEngine = initSslContext().createSSLEngine()
sslEngine.setEnabledCipherSuites(Array("TLS_RSA_WITH_AES_128_CBC_SHA"))
sslEngine.setUseClientMode(false)
// TODO:: how to deal with multiple connections?
val p = promise[SslTlsCipher]
val f = p.future
val connectionHandler = ForeachSink[StreamTcp.IncomingConnection] { conn => handleClientFlow(conn) }
def handleClientFlow(conn: IncomingConnection) = {
log.info(s"Handling connection: ${conn.remoteAddress}")
i += 1
context.system.actorOf(Props(classOf[SslTlsCipherActor], self, SessionNegotiation(sslEngine), true), s"session_negotiation_${i}")
f onSuccess {
case cipher: SslTlsCipher =>
// Defining tls flow
log.info("Define TLS Flow")
// 1. subscribe all incoming data to cipher.cipherTextInbound
// 2. publish all outgoing data to cipher.cipherTextOutbound
conn.flow.join(
Flow(
Sink(cipher.cipherTextInbound), // Incoming data (From the client)
Source(cipher.cipherTextOutbound) // Outgoing data (To the client)
)
).run()
// Define source (that emits the data to the client (in this case the "plainTextOutbound" subscriber)
Source(ActorPublisher(context.system.actorOf(Props[SomeRandomBytesGeneratorActor], "generator"))).to(Sink(cipher.plainTextOutbound)).run()
// Define workflow to read the plain data
// Logging (and also listener)
// cipher.cipherTextOutbound.subscribe(ActorSubscriber(context.system.actorOf(Props[SomeLoggingActor], "foo")))
cipher.sessionInbound.subscribe(ActorSubscriber(context.system.actorOf(Props[SomeLoggingActor], "session_inbound_actor")))
}
}
val serverBinding = StreamTcp().bind(new InetSocketAddress("0.0.0.0", 16500))
val materializedServer = serverBinding.connections.runWith(connectionHandler)
// Define receive method
def receive = {
case c: SslTlsCipher => p.success(c)
case x => log.info(s"Something different: ${x}")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment