Skip to content

Instantly share code, notes, and snippets.

@samklr
Forked from epishkin/SaveCountersToHdfs.scala
Last active August 29, 2015 14:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samklr/2665496512ada116d244 to your computer and use it in GitHub Desktop.
Save samklr/2665496512ada116d244 to your computer and use it in GitHub Desktop.
import java.io.PrintWriter
import cascading.stats.CascadingStats
import com.twitter.scalding._
/**
* Writes all custom counters into a tsv file args("counters-file") if this property is set.
*
* Output format:
* counter_name value
*/
trait SaveCountersToHdfs extends Job {
private lazy val countersFile = args.optional("counters-file")
override protected def handleStats(statsData: CascadingStats) = {
super.handleStats(statsData)
if (statsData.isSuccessful && countersFile.nonEmpty) {
saveCounters(statsData, countersFile.get)
}
}
private def saveCounters(statsData: CascadingStats, outputPath: String) = {
implicit val statProvider = statsData
val jobStats = Stats.getAllCustomCounters
if (jobStats.nonEmpty) {
writeToFile(outputPath) { writer =>
jobStats.foreach {
case (counter, value) =>
writer.println("%s\t%s".format(counter, value))
}
}
}
}
private def writeToFile(location: String)(fn: (PrintWriter => Unit)) = {
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
val fs = FileSystem.get(new Configuration())
val path = new Path(location)
val writer = new PrintWriter(fs.create(path))
try {
fn(writer)
} finally {
writer.flush()
writer.close()
}
}
}
class SampleJob(args: Args) extends Job(args) with SaveCountersToHdfs {
val counterFoo = Stat("sample.foo")
val counterBar = Stat("sample.bar")
//implement your job here
//when the job finishes SaveCountersToHdfs should write all custom counters to a file args("counters-file")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment