Skip to content

Instantly share code, notes, and snippets.

@dkhenry
Created March 9, 2013 19:25
Show Gist options
  • Save dkhenry/5125408 to your computer and use it in GitHub Desktop.
Save dkhenry/5125408 to your computer and use it in GitHub Desktop.
Using the Aggregation framework for MongoDB in scala to bucketize time stamped data
import com.mongodb.casbah.Imports._
/**
* Scheme has this general format ( amongst other fields )
* { "_id" : ObjectId("5138c8bd5e0ee06dd30a6ee1"),
* "LAST_SIZE" : NumberLong(200),
* "SYMBOL" : "AAPL",
* "LAST_PRICE" : 428.31,
* "TOTAL_VOLUME" : NumberLong(8923961),
* "TIMESTAMP" : ISODate("2013-03-07T17:05:01.144Z"),
*}
*
* Also note there are indexes on TIMESTAMP and SYMBOL
*
*/
object TickerQuery {
RegisterJodaTimeConversionHelpers()
val _transactions = MongoDBConnection("transactions")
def ticksInRange(symbol: String) = {
val pipebuilder = MongoDBList.newBuilder
// Build the Date filter
val datebuilder = MongoDBObject.newBuilder
datebuilder += ("$gt" -> new DateTime())
datebuilder += ("$lt" -> new DateTime().minusDays(1))
// Filter
pipebuilder += MongoDBObject("$match" -> MongoDBObject(
"SYMBOL" -> symbol,
"TIMESTAMP" -> datebuilder.result()
))
// Build the Bucket
val bucketbuilder = MongoDBObject.newBuilder
bucketbuilder += ("year" -> MongoDBObject( "$year" -> "$TIMESTAMP"))
bucketbuilder += ("day" -> MongoDBObject( "$dayOfYear" -> "$TIMESTAMP"))
bucketbuilder += ("hour" -> MongoDBObject( "$hour" -> "$TIMESTAMP"))
bucketbuilder += ("minute" -> MongoDBObject( "$minute" -> "$TIMESTAMP"))
// Project
pipebuilder += MongoDBObject("$project" -> MongoDBObject(
"bucket" -> bucketbuilder.result(),
"price" -> "$LAST_PRICE"
))
//Group
pipebuilder += MongoDBObject("$group"-> MongoDBObject(
"_id" -> "$bucket",
"total" -> MongoDBObject("$sum" -> 1),
"avg_price" -> MongoDBObject("$avg" -> "$price"),
"max_price" -> MongoDBObject("$max" -> "$price"),
"min_price" -> MongoDBObject("$min" -> "$price")
))
// Get the Pipeline
val pipeline = pipebuilder.result()
val rvalue = _transactions.command(MongoDBObject("aggregate" -> "ticks", "pipeline" -> pipeline)).get("result") match {
case list: BasicDBList => list.foreach(println(_))
case _ => Nil
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment