Skip to content

Instantly share code, notes, and snippets.

@krishnanraman
Last active December 18, 2015 09:29
Show Gist options
  • Save krishnanraman/5761643 to your computer and use it in GitHub Desktop.
Save krishnanraman/5761643 to your computer and use it in GitHub Desktop.
Scalding job to read a Tsv of ~6000 "Sample" records & write to pail based on some partition criteria. Partition Criteria: All Samples with a common service go to the same pail subdirectory. Notice that each record goes to 1 pail => ~6000 pailfiles will be created.
$ tree pailtest/tfe
pailtest/tfe
├── part-000130.pailfile
├── part-000140.pailfile
├── part-000440.pailfile
├── part-000451.pailfile
├── part-000460.pailfile
├── part-000470.pailfile
├── part-000481.pailfile
├── part-000491.pailfile
├── part-000531.pailfile
├── part-000540.pailfile
├── part-000550.pailfile
├── part-000560.pailfile
├── part-000570.pailfile
├── part-000580.pailfile
├── part-000980.pailfile
├── part-000990.pailfile
├── part-001000.pailfile
├── part-001010.pailfile
├── part-001321.pailfile
├── part-001330.pailfile
├── part-001340.pailfile
├── part-001352.pailfile
├── part-001360.pailfile
├── part-001371.pailfile
├── part-0013825.pailfile
├── part-001392.pailfile
├── part-001400.pailfile
├── part-001410.pailfile
├── part-001420.pailfile
├── part-001430.pailfile
├── part-001440.pailfile
├── part-001821.pailfile
├── part-001830.pailfile
├── part-001850.pailfile
├── part-001860.pailfile
├── part-001870.pailfile
├── part-001880.pailfile
├── part-002190.pailfile
├── part-002200.pailfile
├── part-002210.pailfile
├── part-002220.pailfile
├── part-002230.pailfile
├── part-002240.pailfile
├── part-002250.pailfile
├── part-002260.pailfile
├── part-002280.pailfile
└── part-002290.pailfile
0 directories, 47 files
import com.twitter.scalding._
import com.twitter.scalding.commons.source.PailSource
import com.twitter.bijection.Injection
import scala.util.control.Exception.allCatch
case class Sample(service:String, source:String, metrics:String) {
def metricKeyValues: List[(String,String)] = {
metrics.split(",").toList.flatMap {
metric =>
println(metric)
val arr = metric.trim.split("#")
if (arr.size != 2) None else Some(arr(0), arr(1))
}
}
}
class SampleReadWrite(args : Args) extends Job(args) {
args("io") match {
case "read" => readjob
case "write" => writepail
}
def rootpath = "./pailtest2"
def delim = "~~~"
// alternate target Fn - its too granular : List("./" + List(sample.service, sample.source).mkString("/")))
def targetFn = ((sample: Sample) => List("./" + sample.service))
def validator = ((x:List[String])=>true)
def mytype = classOf[Sample]
// injection related mess
def to = ((a:Sample) => List(a.service, a.source, a.metrics).mkString(delim).getBytes)
def from = ((b:Array[Byte]) => allCatch.opt {
val str:String = new String(b)
val arr = str.split(delim)
Sample(arr(0), arr(1), arr(2))
})
def myinjection = Injection.build(to)(from)
def mkPailSink = PailSource.sink[Sample](rootpath, targetFn, validator, mytype, myinjection)
def mkPailSrc(subdir: Array[List[String]]) = PailSource.source[Sample](rootpath, validator, mytype, myinjection, subdir).read
def readjob = {
mkPailSrc(Array(List("tfe")))
.flatMapTo(1 -> ('metricKey, 'metricValue)){
s:Sample =>
val res = s.metricKeyValues.unzip
Some(res._1,res._2)
}.groupBy('metricKey){
_.size
}.write(Tsv("dummy"))
}
def sanitize(s:String) = s.replaceAll("#", "-").replaceAll(":", "-")
def writepail = {
val inputpipe = Tsv("obslogs.tsv", ('service, 'source, 'time, 'granularity, 'metrics))
.read
.mapTo(('service, 'source, 'metrics)->'sample){
x:(String, String, String) =>
val (service, source, metrics) = x
Sample(sanitize(service), sanitize(source), metrics)
} // .limit(10000)
val sink = mkPailSink
inputpipe.write(sink)
}
}
pailtest
├── [ 136] -0-serviceName
├── [ 748] kestrel2
├── [ 1224] macaw-relevance
├── [ 680] mobileweb
├── [ 680] monorail
├── [ 748] ostrichagg
├── [ 3468] services.msys
├── [ 15504] services.ntpd
├── [ 15504] services.scribe
├── [ 4284] services.zookeeper
├── [ 15504] system.bios
├── [ 15504] system.cpu
├── [ 15504] system.diskstats
├── [ 612] system.hpacucli
├── [ 612] system.ipmi_fru
├── [ 612] system.ipmi_sensors
├── [ 15504] system.kernel
├── [ 15504] system.load
├── [ 15504] system.mcelog
├── [ 15504] system.mdstat
├── [ 15504] system.memory
├── [ 15504] system.monit
├── [ 15504] system.mount
├── [ 15504] system.msr
├── [ 15504] system.network
├── [ 15504] system.smart
├── [ 15504] system.uptime
├── [ 4080] tfe
├── [ 15504] twitter.appapp
├── [ 15504] twitter.apps
├── [ 15504] twitter.git
├── [ 15504] twitter.ldapupdateusers
├── [ 15504] twitter.lsof
├── [ 15368] twitter.mesos
├── [ 15504] twitter.puppetreport
├── [ 15504] twitter.svn_syncdelta
├── [ 15504] vex
└── [ 4080] woodstar
38 directories, 5984 files
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment