Skip to content

Instantly share code, notes, and snippets.

@zcox

zcox/Main.scala

Created Mar 5, 2015
Embed
What would you like to do?
organization := "com.banno"
name := "random-tranquility"
version := "0.1.0"
scalaVersion := "2.10.4"
resolvers ++= Seq(
"Metamarkets" at "https://metamx.artifactoryonline.com/metamx/pub-libs-releases-local/",
"clojars" at "http://clojars.org/repo/"
)
libraryDependencies ++= Seq(
"com.metamx" %% "tranquility" % "0.3.4"
)
✗ sbt run
[info] Set current project to random-tranquility (in build file:/Users/zcox/code/druid-docker/random-tranquility/)
[info] Running com.banno.Main
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[error] (run-main) java.lang.NoSuchMethodError: com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer$.handledType()Ljava/lang/Class;
java.lang.NoSuchMethodError: com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer$.handledType()Ljava/lang/Class;
at com.fasterxml.jackson.module.scala.deser.NumberDeserializers$.<init>(ScalaNumberDeserializersModule.scala:49)
at com.fasterxml.jackson.module.scala.deser.NumberDeserializers$.<clinit>(ScalaNumberDeserializersModule.scala)
at com.fasterxml.jackson.module.scala.deser.ScalaNumberDeserializersModule$class.$init$(ScalaNumberDeserializersModule.scala:61)
at com.fasterxml.jackson.module.scala.DefaultScalaModule.<init>(DefaultScalaModule.scala:19)
at com.fasterxml.jackson.module.scala.DefaultScalaModule$.<init>(DefaultScalaModule.scala:35)
at com.fasterxml.jackson.module.scala.DefaultScalaModule$.<clinit>(DefaultScalaModule.scala)
at com.metamx.common.scala.Jackson$$anonfun$newObjectMapper$1.apply(Jackson.scala:70)
at com.metamx.common.scala.Jackson$$anonfun$newObjectMapper$1.apply(Jackson.scala:68)
at com.metamx.common.scala.Predef$EffectOps.withEffect(Predef.scala:44)
at com.metamx.common.scala.Jackson$class.newObjectMapper(Jackson.scala:67)
at com.metamx.common.scala.Jackson$.newObjectMapper(Jackson.scala:10)
at com.metamx.common.scala.Jackson$class.newObjectMapper(Jackson.scala:64)
at com.metamx.common.scala.Jackson$.newObjectMapper(Jackson.scala:10)
at com.metamx.common.scala.Jackson$class.$init$(Jackson.scala:14)
at com.metamx.common.scala.Jackson$.<init>(Jackson.scala:10)
at com.metamx.common.scala.Jackson$.<clinit>(Jackson.scala)
at com.metamx.tranquility.druid.DruidBeams$BuilderConfig$$anon$8.<init>(DruidBeams.scala:233)
at com.metamx.tranquility.druid.DruidBeams$BuilderConfig.buildAll(DruidBeams.scala:232)
at com.metamx.tranquility.druid.DruidBeams$Builder.buildBeam(DruidBeams.scala:153)
at com.metamx.tranquility.druid.DruidBeams$Builder.buildService(DruidBeams.scala:202)
at com.banno.Main$delayedInit$body.apply(Main.scala:60)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at com.banno.Main$.main(Main.scala:23)
at com.banno.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
[trace] Stack trace suppressed: run last compile:run for the full output.
java.lang.RuntimeException: Nonzero exit code: 1
at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last compile:run for the full output.
[error] (compile:run) Nonzero exit code: 1
[error] Total time: 2 s, completed Mar 5, 2015 2:46:27 PM
package com.banno
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import com.metamx.tranquility.druid.{DruidBeams, DruidLocation, DruidRollup, SpecificDruidDimensions}
import com.metamx.tranquility.beam.ClusteredBeamTuning
import com.metamx.common.Granularity
import io.druid.query.aggregation.{CountAggregatorFactory, LongSumAggregatorFactory}
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory
import io.druid.granularity.QueryGranularity
import org.joda.time.{DateTime, Period}
import scala.util.Random
case class Event(category: String, value: Int, timestamp: DateTime)
object Event {
val categories = Seq("C1", "C2", "C3")
def randomCategory = categories(Random.nextInt(categories.size))
def randomValue = Random.nextInt(100)
def newRandomEvent = Event(randomCategory, randomValue, new DateTime)
}
object Main extends App {
val indexService = "overlord" // Your overlord's druid.service, with slashes replaced by colons.
val firehosePattern = "druid:firehose:%s" // Make up a service pattern, include %s somewhere in it.
val discoveryPath = "/druid/discovery" // Your overlord's druid.discovery.curator.path.
val dataSource = "tweets"
val dimensions = IndexedSeq("category")
val aggregators = Seq(
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("total_value", "value"))
// Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
val timestamper = (event: Event) => event.timestamp
val curator = CuratorFrameworkFactory
.builder()
.connectString("192.168.59.103:2181") //TODO config
.retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
.build();
curator.start()
// Tranquility needs to be able to serialize your object type. By default this is done with Jackson. If you want to
// provide an alternate serializer, you can provide your own via ```.objectWriter(...)```. In this case, we won't
// provide one, so we're just using Jackson:
val druidService = DruidBeams
.builder(timestamper)
.curator(curator)
.discoveryPath(discoveryPath)
.location(DruidLocation(indexService, firehosePattern, dataSource))
.rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularity.NONE))
.tuning(
ClusteredBeamTuning(
segmentGranularity = Granularity.MINUTE,
windowPeriod = new Period("PT10M"),
partitions = 1,
replicants = 1
)
)
.buildService()
while (true) {
druidService(Seq(Event.newRandomEvent))
Thread.sleep(100)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment