Created
January 7, 2015 13:22
-
-
Save Alien2150/9468c871135fd94869a2 to your computer and use it in GitHub Desktop.
SSL Example 2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package connection | |
import java.net.InetSocketAddress | |
import java.security.{KeyStore, SecureRandom} | |
import javax.net.ssl.{SSLEngine, KeyManagerFactory, SSLContext, TrustManagerFactory} | |
import akka.stream.FlowMaterializer | |
import akka.stream.actor.ActorPublisherMessage.Request | |
import akka.stream.actor.ActorSubscriberMessage.{OnComplete, OnNext} | |
import akka.stream.actor._ | |
import akka.stream.ssl.SslTlsCipher.{InboundSession, SessionNegotiation} | |
import akka.stream.ssl.{SslTlsCipher, SslTlsCipherActor} | |
import akka.util.ByteString | |
// import actor.ClientConnectionActor | |
import akka.actor.{ActorRef, Actor, ActorLogging, Props} | |
import akka.stream.scaladsl._ | |
import scala.concurrent.duration._ | |
/** | |
* This actors manages the client-session (And is also responsible for connection - issue detection) | |
*/ | |
class ClientSessionActor(conn: StreamTcp.IncomingConnection, sslEngine: SSLEngine) extends ActorSubscriber with ActorLogging | |
{ | |
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy | |
implicit val system = context.system | |
implicit val ec = system.dispatcher | |
implicit val materializer = FlowMaterializer() | |
var tlsCipherActor: Option[ActorRef] = None | |
var outgoingDataActor: Option[ActorRef] = None | |
var incomingDataActor: Option[ActorRef] = None | |
override def preStart() = { | |
log.info(s"Init connection for client: ${conn.remoteAddress}") | |
tlsCipherActor = Some(system.actorOf(Props(classOf[SslTlsCipherActor], self, SessionNegotiation(sslEngine), true))) | |
outgoingDataActor = Some(context.system.actorOf(Props[OutgoingDataActor])) | |
incomingDataActor = Some(system.actorOf(Props[IncomingClientData])) | |
} | |
def receive = { | |
case cipher: SslTlsCipher => | |
log.info("Received cipher from SessionNegotiation") | |
// Setup flow | |
conn.flow.join( | |
Flow( | |
Sink(cipher.cipherTextInbound), // Incoming data (client -> sslActor -> server) | |
Source(cipher.cipherTextOutbound) // Outgoing data (server -> sslActor -> client) | |
) | |
).run() | |
// subscribe to session - events | |
cipher.sessionInbound.subscribe(ActorSubscriber(self)) | |
// Message Generator -> to PlainTextOutbound (Which itself is a flow) | |
Source(ActorPublisher(outgoingDataActor.get)).to(Sink(cipher.plainTextOutbound)).run() | |
// Session received | |
case OnNext(session: InboundSession) => | |
log.info("Established ssl-session") | |
session.data.subscribe(ActorSubscriber(incomingDataActor.get)) | |
// Complete message | |
case OnComplete => | |
log.info("Seems like the stream has been closed") | |
case x => | |
log.info(s"received class ${x.getClass}") | |
} | |
} | |
/** | |
* Process incoming client-data | |
*/ | |
class IncomingClientData extends ActorSubscriber with ActorLogging { | |
// Define header size | |
override protected def requestStrategy: RequestStrategy = WatermarkRequestStrategy(1024, 10) | |
//override def aroundReceive = { | |
override def receive: Actor.Receive = { | |
case OnNext(data: ByteString) => | |
log.info(s"Got some binary data ${data.toArray.length} bytes") | |
// TODO :: Parse header + message | |
case x => | |
log.info(s"foo: ${x.getClass}") | |
} | |
} | |
/** | |
* This actor simply transmit data to the client | |
*/ | |
class OutgoingDataActor extends ActorPublisher[ByteString] with ActorLogging { | |
implicit val ec = context.system.dispatcher | |
case class EmitRandomBytes(data: ByteString) | |
var hasDemand = false | |
context.system.scheduler.schedule( | |
1 second, | |
10 seconds, | |
self, | |
EmitRandomBytes(ByteString.fromString("hello world")) | |
) | |
override def receive: Actor.Receive = { | |
case EmitRandomBytes(d) => | |
if (hasDemand) { | |
log.info(s"Emitting bytes") | |
// Emit message | |
onNext(d) | |
hasDemand = false | |
} | |
case Request(n) => | |
hasDemand = true | |
log.info(s"Got a request for ${n} data") | |
case x => | |
log.info(s"Received: ${x} ${x.getClass}") | |
} | |
} | |
/** | |
* The server simply manages the incoming connection | |
* */ | |
class Server extends Actor with ActorLogging { | |
implicit val system = context.system | |
implicit val ec = system.dispatcher | |
implicit val materializer = FlowMaterializer() | |
def initSslContext(): SSLContext = { | |
val password = "changeme" | |
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) | |
keyStore.load(getClass.getResourceAsStream("/te_keystore"), password.toCharArray) | |
// Check if alias is available | |
if (!keyStore.containsAlias("myAlias")) { | |
throw new IllegalStateException("Alias myAlias not found") | |
} | |
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType) | |
trustStore.load(getClass.getResourceAsStream("/te_truststore"), password.toCharArray) | |
// Check if alias is available | |
if (!trustStore.containsAlias("myAlias")) { | |
throw new IllegalStateException("Alias myAlias not found") | |
} | |
log.info(s"Using key manager algorithm: ${KeyManagerFactory.getDefaultAlgorithm}") | |
val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) | |
keyManagerFactory.init(keyStore, password.toCharArray) | |
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) | |
trustManagerFactory.init(trustStore) | |
val context = SSLContext.getInstance("TLS") // How to get v1.2? | |
context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom) | |
for (protocol <- context.getSupportedSSLParameters.getProtocols) { | |
log.info(s"Supported protocol: $protocol") | |
} | |
context | |
} | |
override def preStart() = { | |
val sslContext = initSslContext() | |
val connectionHandler = ForeachSink[StreamTcp.IncomingConnection] { | |
conn => | |
val sslEngine = sslContext.createSSLEngine() | |
sslEngine.setEnabledCipherSuites(Array("TLS_RSA_WITH_AES_128_CBC_SHA")) | |
sslEngine.setUseClientMode(false) | |
// Init connection actor | |
context.actorOf(Props(classOf[ClientSessionActor], conn, sslEngine)) | |
} | |
val backlog = 100 // This specifies the number of unaccepted connections the O/S kernel will hold for this port before refusing connections | |
val serverBinding = StreamTcp().bind(new InetSocketAddress("0.0.0.0", 16500), backlog) | |
log.info(s"Server now listening on port 16500") | |
val materializedServer = serverBinding.connections.runWith(connectionHandler) | |
} | |
def receive = { | |
case x => log.info(s"Received ${x.getClass}") | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment