Skip to content

Instantly share code, notes, and snippets.

@al3x
Created November 26, 2009 01:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save al3x/243161 to your computer and use it in GitHub Desktop.
Save al3x/243161 to your computer and use it in GitHub Desktop.
import com.facebook.thrift.{TBase, TDeserializer}
import java.io.{BufferedInputStream, DataInputStream, File, FileInputStream, InputStream}
import java.util.zip.GZIPInputStream
import net.lag.logging.Logger
import scala.reflect.Manifest
// you'll want to import your generated Thrift package too!
class ThriftFileScanner[T <: TBase](implicit man: Manifest[T]) {
val log = Logger.get
var bufferedIn: BufferedInputStream = null
val BUFFER_SIZE = 2048
// ensure we're not at EOF
def streamHasNext: Boolean = {
bufferedIn.mark(1)
val nextByte = bufferedIn.read()
bufferedIn.reset()
nextByte != -1
}
def allRecordsFromFile(fileName: String)(f: T => Unit) {
val file = new File(fileName)
bufferedIn = if (fileName.endsWith(".gz")) {
new BufferedInputStream(new GZIPInputStream(new FileInputStream(file), BUFFER_SIZE), BUFFER_SIZE)
} else {
new BufferedInputStream(new FileInputStream(file), BUFFER_SIZE)
}
lazy val stream = new DataInputStream(bufferedIn)
val deserializer = new TDeserializer()
while (streamHasNext) {
val record = man.erasure.newInstance.asInstanceOf[T]
val arraySize = stream.readInt()
if (arraySize > 0) {
val bytes: Array[Byte] = new Array[Byte](arraySize)
stream.read(bytes)
deserializer.deserialize(record, bytes)
f(record)
} else {
log.error("negative object size in stream")
}
}
stream.close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment