Skip to content

Instantly share code, notes, and snippets.

@Alien2150
Created January 7, 2015 13:22
Show Gist options
  • Save Alien2150/9468c871135fd94869a2 to your computer and use it in GitHub Desktop.
Save Alien2150/9468c871135fd94869a2 to your computer and use it in GitHub Desktop.
SSL Example 2
package connection
import java.net.InetSocketAddress
import java.security.{KeyStore, SecureRandom}
import javax.net.ssl.{SSLEngine, KeyManagerFactory, SSLContext, TrustManagerFactory}
import akka.stream.FlowMaterializer
import akka.stream.actor.ActorPublisherMessage.Request
import akka.stream.actor.ActorSubscriberMessage.{OnComplete, OnNext}
import akka.stream.actor._
import akka.stream.ssl.SslTlsCipher.{InboundSession, SessionNegotiation}
import akka.stream.ssl.{SslTlsCipher, SslTlsCipherActor}
import akka.util.ByteString
// import actor.ClientConnectionActor
import akka.actor.{ActorRef, Actor, ActorLogging, Props}
import akka.stream.scaladsl._
import scala.concurrent.duration._
/**
* This actors manages the client-session (And is also responsible for connection - issue detection)
*/
class ClientSessionActor(conn: StreamTcp.IncomingConnection, sslEngine: SSLEngine) extends ActorSubscriber with ActorLogging
{
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
implicit val system = context.system
implicit val ec = system.dispatcher
implicit val materializer = FlowMaterializer()
var tlsCipherActor: Option[ActorRef] = None
var outgoingDataActor: Option[ActorRef] = None
var incomingDataActor: Option[ActorRef] = None
override def preStart() = {
log.info(s"Init connection for client: ${conn.remoteAddress}")
tlsCipherActor = Some(system.actorOf(Props(classOf[SslTlsCipherActor], self, SessionNegotiation(sslEngine), true)))
outgoingDataActor = Some(context.system.actorOf(Props[OutgoingDataActor]))
incomingDataActor = Some(system.actorOf(Props[IncomingClientData]))
}
def receive = {
case cipher: SslTlsCipher =>
log.info("Received cipher from SessionNegotiation")
// Setup flow
conn.flow.join(
Flow(
Sink(cipher.cipherTextInbound), // Incoming data (client -> sslActor -> server)
Source(cipher.cipherTextOutbound) // Outgoing data (server -> sslActor -> client)
)
).run()
// subscribe to session - events
cipher.sessionInbound.subscribe(ActorSubscriber(self))
// Message Generator -> to PlainTextOutbound (Which itself is a flow)
Source(ActorPublisher(outgoingDataActor.get)).to(Sink(cipher.plainTextOutbound)).run()
// Session received
case OnNext(session: InboundSession) =>
log.info("Established ssl-session")
session.data.subscribe(ActorSubscriber(incomingDataActor.get))
// Complete message
case OnComplete =>
log.info("Seems like the stream has been closed")
case x =>
log.info(s"received class ${x.getClass}")
}
}
/**
* Process incoming client-data
*/
class IncomingClientData extends ActorSubscriber with ActorLogging {
// Define header size
override protected def requestStrategy: RequestStrategy = WatermarkRequestStrategy(1024, 10)
//override def aroundReceive = {
override def receive: Actor.Receive = {
case OnNext(data: ByteString) =>
log.info(s"Got some binary data ${data.toArray.length} bytes")
// TODO :: Parse header + message
case x =>
log.info(s"foo: ${x.getClass}")
}
}
/**
* This actor simply transmit data to the client
*/
class OutgoingDataActor extends ActorPublisher[ByteString] with ActorLogging {
implicit val ec = context.system.dispatcher
case class EmitRandomBytes(data: ByteString)
var hasDemand = false
context.system.scheduler.schedule(
1 second,
10 seconds,
self,
EmitRandomBytes(ByteString.fromString("hello world"))
)
override def receive: Actor.Receive = {
case EmitRandomBytes(d) =>
if (hasDemand) {
log.info(s"Emitting bytes")
// Emit message
onNext(d)
hasDemand = false
}
case Request(n) =>
hasDemand = true
log.info(s"Got a request for ${n} data")
case x =>
log.info(s"Received: ${x} ${x.getClass}")
}
}
/**
* The server simply manages the incoming connection
* */
class Server extends Actor with ActorLogging {
implicit val system = context.system
implicit val ec = system.dispatcher
implicit val materializer = FlowMaterializer()
def initSslContext(): SSLContext = {
val password = "changeme"
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
keyStore.load(getClass.getResourceAsStream("/te_keystore"), password.toCharArray)
// Check if alias is available
if (!keyStore.containsAlias("myAlias")) {
throw new IllegalStateException("Alias myAlias not found")
}
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
trustStore.load(getClass.getResourceAsStream("/te_truststore"), password.toCharArray)
// Check if alias is available
if (!trustStore.containsAlias("myAlias")) {
throw new IllegalStateException("Alias myAlias not found")
}
log.info(s"Using key manager algorithm: ${KeyManagerFactory.getDefaultAlgorithm}")
val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
keyManagerFactory.init(keyStore, password.toCharArray)
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
trustManagerFactory.init(trustStore)
val context = SSLContext.getInstance("TLS") // How to get v1.2?
context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom)
for (protocol <- context.getSupportedSSLParameters.getProtocols) {
log.info(s"Supported protocol: $protocol")
}
context
}
override def preStart() = {
val sslContext = initSslContext()
val connectionHandler = ForeachSink[StreamTcp.IncomingConnection] {
conn =>
val sslEngine = sslContext.createSSLEngine()
sslEngine.setEnabledCipherSuites(Array("TLS_RSA_WITH_AES_128_CBC_SHA"))
sslEngine.setUseClientMode(false)
// Init connection actor
context.actorOf(Props(classOf[ClientSessionActor], conn, sslEngine))
}
val backlog = 100 // This specifies the number of unaccepted connections the O/S kernel will hold for this port before refusing connections
val serverBinding = StreamTcp().bind(new InetSocketAddress("0.0.0.0", 16500), backlog)
log.info(s"Server now listening on port 16500")
val materializedServer = serverBinding.connections.runWith(connectionHandler)
}
def receive = {
case x => log.info(s"Received ${x.getClass}")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment