Skip to content

Instantly share code, notes, and snippets.

@hellertime
Last active August 29, 2015 13:58
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hellertime/10020639 to your computer and use it in GitHub Desktop.
Save hellertime/10020639 to your computer and use it in GitHub Desktop.
Using cascading-hive in Scalding
import scala.collection.JavaConversions._
import cascading.scheme.Scheme
import cascading.tap.SinkMode
import cascading.tuple.Fields
import com.twitter.scalding.{FixedPathSource, HadoopSchemeInstance, SchemedSource}
import org.apache.hadoop.mapred.{JobConf, OutputCollector, RecordReader}
trait HiveScheme extends SchemedSource {
// cascading-hive Schemes take two arrays as arguments
// 1: the array of column names
// 2: the array of types for each column
//
// these arrays are kept in parallel
// implementations should override these as required
val columns:List[String]
val columnTypes:List[String] = null
val defaultType:String = "string"
// This is a convenince method which will default missing column types to
// the type "STRING", it presumes that a partial columnTypes array is
// correctly specified for the columns up to the end, and then just
// populates the remaining types
def defaultTypes:List[String] = {
columnTypes match {
case null => List.fill(columns.size)(defaultType)
case _ => if (columnTypes.size < columns.size)
columnTypes ++ List.fill(columns.size - columnTypes.size)(defaultType)
else columnTypes
}
}
}
case class RCFile(p:String
,override val columns:List[String]
,override val columnTypes:List[String]=null
,override val sinkMode:SinkMode = SinkMode.REPLACE)
extends FixedPathSource(p)
with HiveScheme {
// This is the most important step as its what inserts this scheme into the source
override val hdfsScheme = new cascading.hive.RCFile(columns.toArray,defaultTypes.toArray)
.asInstanceOf[Scheme[JobConf, RecordReader[_,_], OutputCollector[_,_],_,_]]
}
// The ORCFile follows the same pattern as RCFile
case class ORCFile(p:String
,override val columns:List[String]
,override val columnTypes:List[String]=null
,override val sinkMode:SinkMode = SinkMode.REPLACE)
extends FixedPathSource(p)
with HiveScheme {
override val hdfsScheme = new cascading.hive.ORCFile(columns.toArray,defaultTypes.toArray)
.asInstanceOf[Scheme[JobConf, RecordReader[_,_], OutputCollector[_,_],_,_]]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment