Skip to content

Instantly share code, notes, and snippets.

@thomasina-lee
Created August 24, 2014 02:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thomasina-lee/6d5c21a92e48507badb1 to your computer and use it in GitHub Desktop.
Save thomasina-lee/6d5c21a92e48507badb1 to your computer and use it in GitHub Desktop.
Emitting file name with Scalding TextLine/MuitpleTextLineFiles
package com.thomasinalee.example;
/**
* Created by Thomasina on 17/08/2014.
*
* This is for scalding 2.10 / cascading 2.5.5
*/
import java.io.IOException;
import java.nio.charset.Charset;
import cascading.flow.FlowProcess;
import cascading.scheme.SourceCall;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import cascading.scheme.hadoop.TextLine ;
public class WholeTextLine extends TextLine{
protected String charsetName = DEFAULT_CHARSET;
public static final Fields DEFAULT_SOURCE_FIELDS = new Fields( "offset", "line" , "filename");
public WholeTextLine(Fields sourceFields, String charsetName){
super(sourceFields, charsetName);
setCharsetName(charsetName);
}
protected void setCharsetName( String charsetName )
{
if( charsetName != null )
this.charsetName = charsetName;
Charset.forName( this.charsetName );
}
protected void verify( Fields sourceFields )
{
if( sourceFields.size() < 1 || sourceFields.size() > 3 )
throw new IllegalArgumentException( "this scheme requires 1 to 3 source fields, given [" + sourceFields + "]" );
}
@Override
public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall )
{
if( sourceCall.getContext() == null )
sourceCall.setContext( new Object[ 4 ] );
sourceCall.getContext()[ 0 ] = sourceCall.getInput().createKey();
sourceCall.getContext()[ 1 ] = sourceCall.getInput().createValue();
sourceCall.getContext()[ 2 ] = Charset.forName( DEFAULT_CHARSET );
sourceCall.getContext()[3] = flowProcess.getStringProperty("cascading.source.path");
}
@Override
public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException
{
if( !sourceReadInput( sourceCall ) )
return false;
sourceHandleInput( sourceCall );
return true;
}
private boolean sourceReadInput( SourceCall<Object[], RecordReader> sourceCall ) throws IOException
{
Object[] context = sourceCall.getContext();
return sourceCall.getInput().next( context[ 0 ], context[ 1 ] );
}
protected void sourceHandleInput( SourceCall<Object[], RecordReader> sourceCall )
{
TupleEntry result = sourceCall.getIncomingEntry();
int index = 0;
Object[] context = sourceCall.getContext();
// coerce into canonical forms
if( getSourceFields().size() >= 2 )
result.setLong( index++, ( (LongWritable) context[ 0 ] ).get() );
result.setString( index++, makeEncodedString( context ) );
if( getSourceFields().size() == 3 )
result.setString( index++, (String) context[3] );
}
}
/**
* Created by Thomasina on 15/08/2014.
*
* This is for scalding 2.10 / cascading 2.5.5
*/
package com.thomasinalee.example
import bg.clnr.hadoop.{WholeTextLine => CHWholeTextLine}
import cascading.scheme.hadoop.{TextLine => CHTextLine}
import cascading.scheme.Scheme
import org.apache.hadoop.mapred.{OutputCollector, JobConf, RecordReader}
import com.twitter.scalding._
trait WholeTextSourceScheme extends SchemedSource {
// The text-encoding to use when writing out the lines (default is UTF-8).
// scala do not inherit static members hence need to reference the parent class's static member
val textEncoding: String = CHTextLine.DEFAULT_CHARSET
override def localScheme = {
throw new UnsupportedOperationException("WholeTextSource cannot be used with --local");
}
override def hdfsScheme = {
val chtl = new CHWholeTextLine(CHWholeTextLine.DEFAULT_SOURCE_FIELDS, textEncoding)
chtl.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
}
}
trait WholeTextLineScheme extends WholeTextSourceScheme with SingleMappable[String]
case class WholeTextLineFiles(p: String*) extends FixedPathSource(p: _*) with WholeTextLineScheme
@jonbonJoeB
Copy link

Hi there, I am wanting to use this to get the filenames of the input I am processing. However, I am getting a compiler error because of the import package "bg.clnr.hadoop.{ WholeTextLine => CHWholeTextLine}"

Is that a package of your own?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment