Skip to content

Instantly share code, notes, and snippets.

@animeshtrivedi
Last active January 11, 2017 13:24
Show Gist options
  • Save animeshtrivedi/d3876938412bf8b8adcdb5e56e4c3066 to your computer and use it in GitHub Desktop.
Save animeshtrivedi/d3876938412bf8b8adcdb5e56e4c3066 to your computer and use it in GitHub Desktop.
Null writer class, that discards all Row/InternalRow passed to it. The class, however, does some accounting.
package org.apache.spark.sql.execution.datasources.atr
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.OutputWriter
import scala.tools.jline_embedded.internal.Log
/**
* Created by atr on 16.12.16.
*/
class AtrOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter {
private val init = System.nanoTime()
private var start = 0L
private var itemsInternalRow:Long = 0L
override def close(): Unit = {
val end = System.nanoTime()
val tx = (end - start)
val str = "closing Atr(Null)OutputWriter. " +
"initPause: " + (start - init).toFloat/1000 + " usec " +
" RunTimes: " + tx.toFloat/1000 + " usec " +
" InternalRow: " + this.itemsInternalRow +
" time/row: " + tx/this.itemsInternalRow + " nsec"
Log.info(str)
}
override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
override def writeInternal(row: InternalRow): Unit = {
if(this.itemsInternalRow == 0) {
start = System.nanoTime()
}
this.itemsInternalRow+=1
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment