Last active
August 29, 2015 13:58
-
-
Save hellertime/10020639 to your computer and use it in GitHub Desktop.
Using cascading-hive in Scalding
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.collection.JavaConversions._ | |
import cascading.scheme.Scheme | |
import cascading.tap.SinkMode | |
import cascading.tuple.Fields | |
import com.twitter.scalding.{FixedPathSource, HadoopSchemeInstance, SchemedSource} | |
import org.apache.hadoop.mapred.{JobConf, OutputCollector, RecordReader} | |
trait HiveScheme extends SchemedSource { | |
// cascading-hive Schemes take two arrays as arguments | |
// 1: the array of column names | |
// 2: the array of types for each column | |
// | |
// these arrays are kept in parallel | |
// implementations should override these as required | |
val columns:List[String] | |
val columnTypes:List[String] = null | |
val defaultType:String = "string" | |
// This is a convenince method which will default missing column types to | |
// the type "STRING", it presumes that a partial columnTypes array is | |
// correctly specified for the columns up to the end, and then just | |
// populates the remaining types | |
def defaultTypes:List[String] = { | |
columnTypes match { | |
case null => List.fill(columns.size)(defaultType) | |
case _ => if (columnTypes.size < columns.size) | |
columnTypes ++ List.fill(columns.size - columnTypes.size)(defaultType) | |
else columnTypes | |
} | |
} | |
} | |
case class RCFile(p:String | |
,override val columns:List[String] | |
,override val columnTypes:List[String]=null | |
,override val sinkMode:SinkMode = SinkMode.REPLACE) | |
extends FixedPathSource(p) | |
with HiveScheme { | |
// This is the most important step as its what inserts this scheme into the source | |
override val hdfsScheme = new cascading.hive.RCFile(columns.toArray,defaultTypes.toArray) | |
.asInstanceOf[Scheme[JobConf, RecordReader[_,_], OutputCollector[_,_],_,_]] | |
} | |
// The ORCFile follows the same pattern as RCFile | |
case class ORCFile(p:String | |
,override val columns:List[String] | |
,override val columnTypes:List[String]=null | |
,override val sinkMode:SinkMode = SinkMode.REPLACE) | |
extends FixedPathSource(p) | |
with HiveScheme { | |
override val hdfsScheme = new cascading.hive.ORCFile(columns.toArray,defaultTypes.toArray) | |
.asInstanceOf[Scheme[JobConf, RecordReader[_,_], OutputCollector[_,_],_,_]] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment