Created
August 18, 2016 21:17
-
-
Save ftrossbach/c3c7431cf74e24fbd604da23fd03869a to your computer and use it in GitHub Desktop.
Meetup Druid example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package de.ftrossbach.meetup | |
import java.util.concurrent.TimeUnit | |
import akka.actor.ActorSystem | |
import akka.{Done, NotUsed} | |
import akka.http.scaladsl.Http | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl._ | |
import akka.http.scaladsl.model._ | |
import akka.http.scaladsl.model.headers.RawHeader | |
import akka.http.scaladsl.model.ws._ | |
import com.lambdaworks.jacks.JacksMapper | |
import scala.annotation.tailrec | |
import scala.concurrent.Future | |
import scala.concurrent.Promise | |
object SingleWebSocketRequest { | |
def main(args: Array[String]) = { | |
implicit val system = ActorSystem() | |
implicit val materializer = ActorMaterializer() | |
import system.dispatcher | |
def extract(key: String, jsonMap: Map[String, Any]): Option[Any] = { | |
val keyList = key.split("\\.").toList | |
extractValue(keyList, jsonMap) | |
} | |
@tailrec | |
def extractValue(key: List[String], jsonMap: Map[String, Any]): Option[Any] = { | |
key match { | |
case x :: Nil => jsonMap.get(x) | |
case x :: xs => { | |
jsonMap.get(x) match { | |
case None => None | |
case Some(x) => extractValue(xs, x.asInstanceOf[Map[String, Any]]) | |
} | |
} | |
} | |
} | |
// print each incoming strict text message | |
def processMessage(msg: Message) = | |
try { | |
msg match { | |
case message: TextMessage.Strict => { | |
val msg = message.text | |
val jsonMap = JacksMapper.readValue[Map[String, Any]](msg) | |
val rsvpTime = extract("mtime", jsonMap).asInstanceOf[Option[Long]].get | |
val response = extract("response", jsonMap).asInstanceOf[Option[String]] | |
val guests = extract("guests", jsonMap).asInstanceOf[Option[Long]] | |
val eventName = extract("event.event_name", jsonMap).asInstanceOf[Option[String]] | |
val eventTime = extract("event.time", jsonMap).asInstanceOf[Option[Long]] | |
val groupName = extract("group.group_name", jsonMap).asInstanceOf[Option[String]] | |
val groupCity = extract("group.group_city", jsonMap).asInstanceOf[Option[String]] | |
val groupCountry = extract("group.group_country", jsonMap).asInstanceOf[Option[String]] | |
val memberName = extract("member.member_name", jsonMap).asInstanceOf[Option[String]] | |
val memberId = extract("member.member_id", jsonMap).asInstanceOf[Option[Long]] | |
val twitterName = extract("member.other_services.twitter.identifier", jsonMap).asInstanceOf[Option[String]] | |
val venueName = extract("venue.venue_name", jsonMap).asInstanceOf[Option[String]] | |
val venueLat = extract("venue.lat", jsonMap).asInstanceOf[Option[Double]] | |
val venueLong = extract("venue.lon", jsonMap).asInstanceOf[Option[Double]] | |
val rsvp = RSVP(rsvpTime, response, guests, eventName, eventTime, groupName, groupCity, groupCountry, memberName, memberId, twitterName, venueName, venueLat, venueLong) | |
val jsonString = JacksMapper.writeValueAsString(rsvp) | |
println(jsonString) | |
val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://localhost:8200/v1/post/meetup", method = HttpMethods.POST) | |
.withEntity(ContentTypes.`application/json`, jsonString.getBytes("UTF-8"))) | |
responseFuture.onComplete(f => println(f)) | |
} | |
case _ => //do nothing for streamed TextMessages. This is just lazyness. | |
} | |
} | |
catch { | |
case e: Exception => e.printStackTrace | |
} | |
val flow: Flow[Message, Message, Promise[Option[Message]]] = | |
Flow.fromSinkAndSourceMat( | |
Sink.foreach[Message](processMessage(_)), | |
Source.maybe[Message])(Keep.right) | |
val (upgradeResponse, closed) = | |
Http().singleWebSocketRequest(WebSocketRequest("ws://stream.meetup.com/2/rsvps"), flow) | |
val connected = upgradeResponse.map { upgrade => | |
if (upgrade.response.status == StatusCodes.SwitchingProtocols) { | |
Done | |
} else { | |
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment