Skip to content

Instantly share code, notes, and snippets.

@nicmarti
Created August 1, 2017 21:43
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save nicmarti/08e25ecf509b9e26fc1c8775ad4bb67c to your computer and use it in GitHub Desktop.
Save nicmarti/08e25ecf509b9e26fc1c8775ad4bb67c to your computer and use it in GitHub Desktop.
Simple Akka HTTP Server-sent-event client
/*
* Copyright 2017 Lunatech
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamdata
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding.Get
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
/**
* This is an Akka client that uses Server-Sent events, a lightweight technology that is used by Streamdata.io
* to push events and updates to the client.
* Since Akka HTTP 10.0.8 we have native support for SSE.
*
* @author Nicolas Martignole, Lunatech
*/
object AkkaServerSentClientApp {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
import system.dispatcher
// TODO create an account on streamdata.io and copy here your token
val YOUR_TOKEN:String = "HERE YOUR TOKEN"
println("Simple Akka Stream Server sent event client that tries to connect to streamdata")
println("--------------------------------------------------------------------------------")
println(s"Streamdata API Key ${YOUR_TOKEN}")
println("Type Ctrl-C to stop")
Http()
.singleRequest(Get(s"https://streamdata.motwin.net/http://stockmarket.streamdata.io/prices?X-Sd-Token=${YOUR_TOKEN}"))
.flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
.foreach(source => source.runForeach(elem => println(prettyPrint(elem)) ))
}
def prettyPrint(event:ServerSentEvent):String =
s"""
|ID: ${event.id.getOrElse("No ID")}
|DATA: ${event.data}
|EVENT_TYPE: ${event.eventType.getOrElse("?")}
|RETRY: ${event.retry.getOrElse("")}
""".stripMargin
}
# Store this file to src/main/resources
akka {
loglevel = DEBUG
stdout-loglevel = INFO
loggers = ["akka.event.slf4j.Slf4jLogger"]
log-dead-letters = 100
log-dead-letters-during-shutdown = true
}
name := "scala-streamdata-io"
version := "1.0"
scalaVersion := "2.12.1"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http" % "10.0.9",
"com.typesafe.akka" %% "akka-stream" % "2.5.3",
"com.typesafe.akka" %% "akka-slf4j" % "2.5.3",
"ch.qos.logback" % "logback-classic" % "1.1.7",
"com.typesafe.akka" %% "akka-stream-testkit" % "2.5.3" % Test
)
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- Save this file to src/main/resources -->
<statusListener class="ch.qos.logback.core.status.OnConsoleStatusListener" />
<logger name="akka" level="${LOGLEVEL_AKKA:-DEBUG}" />
<logger name="scalapenos" level="${LOGLEVEL_APP:-DEBUG}" />
<root level="${LOGLEVEL_ROOT:-INFO}">
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<encoder>
<pattern>%d{ISO8601} %-5level [%logger{0}] - %msg%n</pattern>
</encoder>
</appender>
</root>
</configuration>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment