Skip to content

Instantly share code, notes, and snippets.

@sadache
Created June 16, 2013 16:58
Show Gist options
  • Save sadache/5792650 to your computer and use it in GitHub Desktop.
Save sadache/5792650 to your computer and use it in GitHub Desktop.
package controllers
import scala.concurrent._
import ExecutionContext.Implicits.global
import play.api.libs.iteratee._
import play.api._
import play.api.mvc._
import play.api.libs._
import play.api.libs.ws._
import play.api.libs.json._
import play.api.libs.functional._
import play.api.libs.functional.syntax._
import play.api.libs.json.Reads._
object Application extends Controller {
def index = Action { implicit req =>
Ok(views.html.index())
}
implicit val writeTweetAsJson = Json.writes[Tweet]
def search(query: String) = Action {
val asJson: Enumeratee[Tweet,JsValue] = Enumeratee.map {
case t => Json.toJson(t)
}
// Serve a 200 OK text/event-stream
Ok.feed(
Tweet.search(query) &> asJson &> EventSource()
).as("text/event-stream")
}
case class Tweet(id: String, text: String, image: String)
object Tweet {
/**
* Create a stream of Tweet object from a Twitter query (such as #devoxx)
*/
def search(query: String): Enumerator[Tweet] = {
// Flatenize an Enumerator[Seq[Tweet]] as n Enumerator[Tweet]
val flatenize = Enumeratee.mapConcat[Seq[Tweet]](identity)
// Schedule
val schedule = Enumeratee.mapM[Tweet](t => play.api.libs.concurrent.Promise.timeout(t, 1000))
// Create a stream of tweets from multiple twitter API calls
val tweets = Enumerator.unfoldM(1) {
case page => fetchPage(query, page).map { tweets =>
Option(tweets).filterNot(_.isEmpty).map( tweets => (page + 1, tweets) )
}
}
// Compose the final stream
tweets &> flatenize &> schedule
}
// Reads the twitter search API response, as a Seq[Tweet]
implicit val readTweet: Reads[Seq[Tweet]] =
// Start with 'results' property
(__ \ "statuses").read(
// It contains an array, so for each item
seq(
// Read the tweet id encoded as String
(__ \ "id_str").read[String] and
// Read the tweet text content
(__ \ "text").read[String] and
// If there is an {entities: {media: [] ... property
(__ \ "entities" \ "media").readNullable(
// It contains an array, so for each item
seq(
// Read the image URL
(__ \ "media_url").read[String]
)
// Let's transform the Option[Seq[String]] to an simple Option[String]
// since we care only about the first image URL if there is any.
).map(_.flatMap(_.headOption))
// Transform all this to a (String, String, Option[String]) tuple
tupled
)
.map(
// Keep only the tuple containing an Image (third part of the tuple is Some)
// and transform them to Tweet instances.
_.collect {
case (id, text, Some(image)) => Tweet(id, text, image)
}
)
)
def fetchPage(query: String, page: Int): Future[Seq[Tweet]] = {
// Fetch the twitter search API with the corresponding parameters (see the Twitter API documentation)
WS.url("https://api.twitter.com/1.1/search/tweets.json").withQueryString(
"include_entities" -> "true",
"q" -> "qcon",
"count" -> "1000"
).withHeaders(
"Authorization" -> """OAuth oauth_consumer_key="XXX", oauth_nonce="XXX", oauth_signature="XXX", oauth_signature_method="HMAC-SHA1", oauth_timestamp="1371136440", oauth_token="XXX", oauth_version="1.0""""
).get().map(r => r.status match {
// We got a 200 OK response, try to convert the JSON body to a Seq[Tweet]
// by using the previously defined implicit Reads[Seq[Tweet]]
case 200 => r.json.asOpt[Seq[Tweet]].getOrElse(Nil)
// Really? There is nothing todo for us
case x => sys.error(s"Bad response status $x")
})
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment