Skip to content

Instantly share code, notes, and snippets.

@colinmarc
Created July 26, 2014 17:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save colinmarc/454c34a4b366b41aa939 to your computer and use it in GitHub Desktop.
Save colinmarc/454c34a4b366b41aa939 to your computer and use it in GitHub Desktop.
mongo-hadoop + scalding
import java.io.IOException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SourceCall;
import cascading.scheme.SinkCall;
import cascading.tuple.Tuple;
import cascading.tap.Tap;
import com.mongodb.hadoop.mapred.MongoInputFormat;
import com.mongodb.hadoop.util.MongoConfigUtil;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.MongoURI;
public class MongoScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
String uri;
public MongoScheme(String uri) {
this.uri = uri;
}
@SuppressWarnings("rawtypes")
@Override
public void sourceConfInit(FlowProcess<JobConf> fp,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
jobConf.setInputFormat(MongoInputFormat.class);
jobConf.setMapSpeculativeExecution(false);
MongoConfigUtil.setInputURI(jobConf, new MongoURI(uri));
MongoConfigUtil.setCreateInputSplits(jobConf, false);
}
@SuppressWarnings("unchecked")
@Override
public boolean source(FlowProcess<JobConf> fp, SourceCall<Object[], RecordReader> sc)
throws IOException {
BSONWritable value = (BSONWritable) sc.getInput().createValue();
if (!sc.getInput().next(new BSONWritable(), value)) return false;
sc.getIncomingEntry().setTuple(new Tuple(value.getDoc()));
return true;
}
@SuppressWarnings("rawtypes")
@Override
public void sinkConfInit(FlowProcess<JobConf> arg0,
Tap<JobConf, RecordReader, OutputCollector> arg1, JobConf arg2) {
throw new UnsupportedOperationException("MongoScheme does not support Sinks");
}
@Override
public void sink(FlowProcess<JobConf> arg0, SinkCall<Object[], OutputCollector> arg1)
throws IOException {
throw new UnsupportedOperationException("MongoScheme does not support Sinks");
}
@Override
public boolean isSink() { return false; }
}
case class MongoSource(uri : String) extends Source
with TypedSource[BSONObject]
with java.io.Serializable {
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode) = new MongoTap(uri).asInstanceOf[Tap[_,_,_]]
override def converter[T >: BSONObject] = TupleConverter.asSuperConverter[BSONObject, T](TupleConverter.of[BSONObject])
}
import java.io.IOException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import cascading.flow.FlowProcess;
import cascading.tap.SourceTap;
import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
import cascading.tuple.TupleEntryIterator;
import cascading.tuple.TupleEntrySchemeIterator;
import com.mongodb.MongoURI;
class MongoTap extends SourceTap<JobConf, RecordReader> {
String uri;
public MongoTap(String uri) {
super(new MongoScheme(uri));
this.uri = uri;
}
@Override
public TupleEntryIterator openForRead(FlowProcess<JobConf> flowProcess, RecordReader reader) throws IOException {
return new HadoopTupleEntrySchemeIterator(flowProcess, this, reader);
}
@Override
public boolean resourceExists(JobConf jobConf) throws IOException {
return true;
}
@Override
public long getModifiedTime(JobConf jobConf) throws IOException {
return System.currentTimeMillis();
}
@Override
public String getIdentifier() {
return this.uri;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment