Created
August 24, 2014 02:34
-
-
Save thomasina-lee/6d5c21a92e48507badb1 to your computer and use it in GitHub Desktop.
Emitting file name with Scalding TextLine/MuitpleTextLineFiles
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.thomasinalee.example; | |
/** | |
* Created by Thomasina on 17/08/2014. | |
* | |
* This is for scalding 2.10 / cascading 2.5.5 | |
*/ | |
import java.io.IOException; | |
import java.nio.charset.Charset; | |
import cascading.flow.FlowProcess; | |
import cascading.scheme.SourceCall; | |
import cascading.tuple.Fields; | |
import cascading.tuple.TupleEntry; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.RecordReader; | |
import cascading.scheme.hadoop.TextLine ; | |
public class WholeTextLine extends TextLine{ | |
protected String charsetName = DEFAULT_CHARSET; | |
public static final Fields DEFAULT_SOURCE_FIELDS = new Fields( "offset", "line" , "filename"); | |
public WholeTextLine(Fields sourceFields, String charsetName){ | |
super(sourceFields, charsetName); | |
setCharsetName(charsetName); | |
} | |
protected void setCharsetName( String charsetName ) | |
{ | |
if( charsetName != null ) | |
this.charsetName = charsetName; | |
Charset.forName( this.charsetName ); | |
} | |
protected void verify( Fields sourceFields ) | |
{ | |
if( sourceFields.size() < 1 || sourceFields.size() > 3 ) | |
throw new IllegalArgumentException( "this scheme requires 1 to 3 source fields, given [" + sourceFields + "]" ); | |
} | |
@Override | |
public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) | |
{ | |
if( sourceCall.getContext() == null ) | |
sourceCall.setContext( new Object[ 4 ] ); | |
sourceCall.getContext()[ 0 ] = sourceCall.getInput().createKey(); | |
sourceCall.getContext()[ 1 ] = sourceCall.getInput().createValue(); | |
sourceCall.getContext()[ 2 ] = Charset.forName( DEFAULT_CHARSET ); | |
sourceCall.getContext()[3] = flowProcess.getStringProperty("cascading.source.path"); | |
} | |
@Override | |
public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException | |
{ | |
if( !sourceReadInput( sourceCall ) ) | |
return false; | |
sourceHandleInput( sourceCall ); | |
return true; | |
} | |
private boolean sourceReadInput( SourceCall<Object[], RecordReader> sourceCall ) throws IOException | |
{ | |
Object[] context = sourceCall.getContext(); | |
return sourceCall.getInput().next( context[ 0 ], context[ 1 ] ); | |
} | |
protected void sourceHandleInput( SourceCall<Object[], RecordReader> sourceCall ) | |
{ | |
TupleEntry result = sourceCall.getIncomingEntry(); | |
int index = 0; | |
Object[] context = sourceCall.getContext(); | |
// coerce into canonical forms | |
if( getSourceFields().size() >= 2 ) | |
result.setLong( index++, ( (LongWritable) context[ 0 ] ).get() ); | |
result.setString( index++, makeEncodedString( context ) ); | |
if( getSourceFields().size() == 3 ) | |
result.setString( index++, (String) context[3] ); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Created by Thomasina on 15/08/2014. | |
* | |
* This is for scalding 2.10 / cascading 2.5.5 | |
*/ | |
package com.thomasinalee.example | |
import bg.clnr.hadoop.{WholeTextLine => CHWholeTextLine} | |
import cascading.scheme.hadoop.{TextLine => CHTextLine} | |
import cascading.scheme.Scheme | |
import org.apache.hadoop.mapred.{OutputCollector, JobConf, RecordReader} | |
import com.twitter.scalding._ | |
trait WholeTextSourceScheme extends SchemedSource { | |
// The text-encoding to use when writing out the lines (default is UTF-8). | |
// scala do not inherit static members hence need to reference the parent class's static member | |
val textEncoding: String = CHTextLine.DEFAULT_CHARSET | |
override def localScheme = { | |
throw new UnsupportedOperationException("WholeTextSource cannot be used with --local"); | |
} | |
override def hdfsScheme = { | |
val chtl = new CHWholeTextLine(CHWholeTextLine.DEFAULT_SOURCE_FIELDS, textEncoding) | |
chtl.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] | |
} | |
} | |
trait WholeTextLineScheme extends WholeTextSourceScheme with SingleMappable[String] | |
case class WholeTextLineFiles(p: String*) extends FixedPathSource(p: _*) with WholeTextLineScheme |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi there, I am wanting to use this to get the filenames of the input I am processing. However, I am getting a compiler error because of the import package "bg.clnr.hadoop.{ WholeTextLine => CHWholeTextLine}"
Is that a package of your own?