Last active
December 18, 2015 09:29
-
-
Save krishnanraman/5761643 to your computer and use it in GitHub Desktop.
Scalding job to read a Tsv of ~6000 "Sample" records & write to pail based on some partition criteria. Partition Criteria: All Samples with a common service go to the same pail subdirectory.
Notice that each record goes to 1 pail => ~6000 pailfiles will be created.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ tree pailtest/tfe | |
pailtest/tfe | |
├── part-000130.pailfile | |
├── part-000140.pailfile | |
├── part-000440.pailfile | |
├── part-000451.pailfile | |
├── part-000460.pailfile | |
├── part-000470.pailfile | |
├── part-000481.pailfile | |
├── part-000491.pailfile | |
├── part-000531.pailfile | |
├── part-000540.pailfile | |
├── part-000550.pailfile | |
├── part-000560.pailfile | |
├── part-000570.pailfile | |
├── part-000580.pailfile | |
├── part-000980.pailfile | |
├── part-000990.pailfile | |
├── part-001000.pailfile | |
├── part-001010.pailfile | |
├── part-001321.pailfile | |
├── part-001330.pailfile | |
├── part-001340.pailfile | |
├── part-001352.pailfile | |
├── part-001360.pailfile | |
├── part-001371.pailfile | |
├── part-0013825.pailfile | |
├── part-001392.pailfile | |
├── part-001400.pailfile | |
├── part-001410.pailfile | |
├── part-001420.pailfile | |
├── part-001430.pailfile | |
├── part-001440.pailfile | |
├── part-001821.pailfile | |
├── part-001830.pailfile | |
├── part-001850.pailfile | |
├── part-001860.pailfile | |
├── part-001870.pailfile | |
├── part-001880.pailfile | |
├── part-002190.pailfile | |
├── part-002200.pailfile | |
├── part-002210.pailfile | |
├── part-002220.pailfile | |
├── part-002230.pailfile | |
├── part-002240.pailfile | |
├── part-002250.pailfile | |
├── part-002260.pailfile | |
├── part-002280.pailfile | |
└── part-002290.pailfile | |
0 directories, 47 files |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.twitter.scalding._ | |
import com.twitter.scalding.commons.source.PailSource | |
import com.twitter.bijection.Injection | |
import scala.util.control.Exception.allCatch | |
case class Sample(service:String, source:String, metrics:String) { | |
def metricKeyValues: List[(String,String)] = { | |
metrics.split(",").toList.flatMap { | |
metric => | |
println(metric) | |
val arr = metric.trim.split("#") | |
if (arr.size != 2) None else Some(arr(0), arr(1)) | |
} | |
} | |
} | |
class SampleReadWrite(args : Args) extends Job(args) { | |
args("io") match { | |
case "read" => readjob | |
case "write" => writepail | |
} | |
def rootpath = "./pailtest2" | |
def delim = "~~~" | |
// alternate target Fn - its too granular : List("./" + List(sample.service, sample.source).mkString("/"))) | |
def targetFn = ((sample: Sample) => List("./" + sample.service)) | |
def validator = ((x:List[String])=>true) | |
def mytype = classOf[Sample] | |
// injection related mess | |
def to = ((a:Sample) => List(a.service, a.source, a.metrics).mkString(delim).getBytes) | |
def from = ((b:Array[Byte]) => allCatch.opt { | |
val str:String = new String(b) | |
val arr = str.split(delim) | |
Sample(arr(0), arr(1), arr(2)) | |
}) | |
def myinjection = Injection.build(to)(from) | |
def mkPailSink = PailSource.sink[Sample](rootpath, targetFn, validator, mytype, myinjection) | |
def mkPailSrc(subdir: Array[List[String]]) = PailSource.source[Sample](rootpath, validator, mytype, myinjection, subdir).read | |
def readjob = { | |
mkPailSrc(Array(List("tfe"))) | |
.flatMapTo(1 -> ('metricKey, 'metricValue)){ | |
s:Sample => | |
val res = s.metricKeyValues.unzip | |
Some(res._1,res._2) | |
}.groupBy('metricKey){ | |
_.size | |
}.write(Tsv("dummy")) | |
} | |
def sanitize(s:String) = s.replaceAll("#", "-").replaceAll(":", "-") | |
def writepail = { | |
val inputpipe = Tsv("obslogs.tsv", ('service, 'source, 'time, 'granularity, 'metrics)) | |
.read | |
.mapTo(('service, 'source, 'metrics)->'sample){ | |
x:(String, String, String) => | |
val (service, source, metrics) = x | |
Sample(sanitize(service), sanitize(source), metrics) | |
} // .limit(10000) | |
val sink = mkPailSink | |
inputpipe.write(sink) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pailtest | |
├── [ 136] -0-serviceName | |
├── [ 748] kestrel2 | |
├── [ 1224] macaw-relevance | |
├── [ 680] mobileweb | |
├── [ 680] monorail | |
├── [ 748] ostrichagg | |
├── [ 3468] services.msys | |
├── [ 15504] services.ntpd | |
├── [ 15504] services.scribe | |
├── [ 4284] services.zookeeper | |
├── [ 15504] system.bios | |
├── [ 15504] system.cpu | |
├── [ 15504] system.diskstats | |
├── [ 612] system.hpacucli | |
├── [ 612] system.ipmi_fru | |
├── [ 612] system.ipmi_sensors | |
├── [ 15504] system.kernel | |
├── [ 15504] system.load | |
├── [ 15504] system.mcelog | |
├── [ 15504] system.mdstat | |
├── [ 15504] system.memory | |
├── [ 15504] system.monit | |
├── [ 15504] system.mount | |
├── [ 15504] system.msr | |
├── [ 15504] system.network | |
├── [ 15504] system.smart | |
├── [ 15504] system.uptime | |
├── [ 4080] tfe | |
├── [ 15504] twitter.appapp | |
├── [ 15504] twitter.apps | |
├── [ 15504] twitter.git | |
├── [ 15504] twitter.ldapupdateusers | |
├── [ 15504] twitter.lsof | |
├── [ 15368] twitter.mesos | |
├── [ 15504] twitter.puppetreport | |
├── [ 15504] twitter.svn_syncdelta | |
├── [ 15504] vex | |
└── [ 4080] woodstar | |
38 directories, 5984 files | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment