Skip to content

Instantly share code, notes, and snippets.

@ftrossbach
Created August 18, 2016 21:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ftrossbach/c3c7431cf74e24fbd604da23fd03869a to your computer and use it in GitHub Desktop.
Save ftrossbach/c3c7431cf74e24fbd604da23fd03869a to your computer and use it in GitHub Desktop.
Meetup Druid example
package de.ftrossbach.meetup
import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
import akka.{Done, NotUsed}
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.model.ws._
import com.lambdaworks.jacks.JacksMapper
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.Promise
object SingleWebSocketRequest {
def main(args: Array[String]) = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
def extract(key: String, jsonMap: Map[String, Any]): Option[Any] = {
val keyList = key.split("\\.").toList
extractValue(keyList, jsonMap)
}
@tailrec
def extractValue(key: List[String], jsonMap: Map[String, Any]): Option[Any] = {
key match {
case x :: Nil => jsonMap.get(x)
case x :: xs => {
jsonMap.get(x) match {
case None => None
case Some(x) => extractValue(xs, x.asInstanceOf[Map[String, Any]])
}
}
}
}
// print each incoming strict text message
def processMessage(msg: Message) =
try {
msg match {
case message: TextMessage.Strict => {
val msg = message.text
val jsonMap = JacksMapper.readValue[Map[String, Any]](msg)
val rsvpTime = extract("mtime", jsonMap).asInstanceOf[Option[Long]].get
val response = extract("response", jsonMap).asInstanceOf[Option[String]]
val guests = extract("guests", jsonMap).asInstanceOf[Option[Long]]
val eventName = extract("event.event_name", jsonMap).asInstanceOf[Option[String]]
val eventTime = extract("event.time", jsonMap).asInstanceOf[Option[Long]]
val groupName = extract("group.group_name", jsonMap).asInstanceOf[Option[String]]
val groupCity = extract("group.group_city", jsonMap).asInstanceOf[Option[String]]
val groupCountry = extract("group.group_country", jsonMap).asInstanceOf[Option[String]]
val memberName = extract("member.member_name", jsonMap).asInstanceOf[Option[String]]
val memberId = extract("member.member_id", jsonMap).asInstanceOf[Option[Long]]
val twitterName = extract("member.other_services.twitter.identifier", jsonMap).asInstanceOf[Option[String]]
val venueName = extract("venue.venue_name", jsonMap).asInstanceOf[Option[String]]
val venueLat = extract("venue.lat", jsonMap).asInstanceOf[Option[Double]]
val venueLong = extract("venue.lon", jsonMap).asInstanceOf[Option[Double]]
val rsvp = RSVP(rsvpTime, response, guests, eventName, eventTime, groupName, groupCity, groupCountry, memberName, memberId, twitterName, venueName, venueLat, venueLong)
val jsonString = JacksMapper.writeValueAsString(rsvp)
println(jsonString)
val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://localhost:8200/v1/post/meetup", method = HttpMethods.POST)
.withEntity(ContentTypes.`application/json`, jsonString.getBytes("UTF-8")))
responseFuture.onComplete(f => println(f))
}
case _ => //do nothing for streamed TextMessages. This is just lazyness.
}
}
catch {
case e: Exception => e.printStackTrace
}
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
Sink.foreach[Message](processMessage(_)),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, closed) =
Http().singleWebSocketRequest(WebSocketRequest("ws://stream.meetup.com/2/rsvps"), flow)
val connected = upgradeResponse.map { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Done
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment