Skip to content

Instantly share code, notes, and snippets.

@muller
Created November 16, 2016 13:47
Show Gist options
  • Save muller/d03c05ffaddecf6dab7c0b564e74c6f7 to your computer and use it in GitHub Desktop.
Save muller/d03c05ffaddecf6dab7c0b564e74c6f7 to your computer and use it in GitHub Desktop.
import java.security.cert.X509Certificate
import javax.net.ssl.{KeyManager, SSLContext, X509TrustManager}
import akka.http.scaladsl.ConnectionContext
object NoSSL {
def apply() = {
object NoCheckX509TrustManager extends X509TrustManager {
override def checkClientTrusted(chain: Array[X509Certificate], authType: String) = ()
override def checkServerTrusted(chain: Array[X509Certificate], authType: String) = ()
override def getAcceptedIssuers = Array[X509Certificate]()
}
val context = SSLContext.getInstance("TLS")
context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), null)
ConnectionContext.https(context)
}
}
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Framing, Source}
import akka.util.ByteString
import play.api.libs.json._
import scala.collection.immutable
import scala.concurrent.ExecutionContext
object ReactiveNakadiApp extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = materializer.executionContext
new ReactiveNakadi(args(0), args(1)).read().runForeach(println)
}
class ReactiveNakadi(token: String, uri: String)(implicit system: ActorSystem, mat: ActorMaterializer, ec: ExecutionContext) {
val authorization = Authorization(OAuth2BearerToken(token))
val request = HttpRequest(
uri = Uri(uri),
headers = immutable.Seq(authorization))
implicit val eventUnmarshaller =
Unmarshaller.strict[ByteString, JsValue] { string =>
Json.parse(string.utf8String)
}
val tail = Flow[HttpResponse]
.flatMapConcat(_.entity.getDataBytes)
.via(Framing.delimiter(ByteString('\n'), Int.MaxValue))
val parse = Flow[ByteString].mapAsync(Int.MaxValue)(Unmarshal(_).to[JsValue])
def read() = {
val response = Http() singleRequest(request, NoSSL())
Source.fromFuture(response)
.via(tail)
.via(parse)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment