Skip to content

Instantly share code, notes, and snippets.

@colinmarc
Created September 4, 2014 12:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save colinmarc/ca4a54b9ae9364471b8d to your computer and use it in GitHub Desktop.
Save colinmarc/ca4a54b9ae9364471b8d to your computer and use it in GitHub Desktop.
A scalding source for Sequins data
import com.twitter.scalding._
import cascading.pipe.Pipe
import spray.json.JsonWriter
import org.apache.hadoop.io.BytesWritable
case class Sequins[V](p : String)(implicit writer: JsonWriter[V])
extends FixedPathSource(p)
with WritableSequenceFileScheme
with TypedSink[(String, V)] {
import Dsl._
override val fields = Dsl.intFields(0 to 1)
override val keyType = classOf[BytesWritable]
override val valueType = classOf[BytesWritable]
override def setter[Z <: (String, V)] = TupleSetter.asSubSetter(TupleSetter.of[(String, V)])
override def transformForWrite(pipe: Pipe) : Pipe = {
pipe.map((0, 1) -> (0, 1)) { pair: (String, V) =>
val jsonBytes = writer.write(pair._2).compactPrint.getBytes
(new BytesWritable(pair._1.getBytes), new BytesWritable(jsonBytes))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment