Created
February 23, 2015 17:03
-
-
Save erasmas/0cd9c797159255f24b81 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mycompany; | |
import cascading.flow.Flow; | |
import cascading.flow.FlowConnector; | |
import cascading.flow.FlowDef; | |
import cascading.flow.hadoop2.Hadoop2MR1FlowConnector; | |
import cascading.pipe.GroupBy; | |
import cascading.pipe.Pipe; | |
import cascading.property.AppProps; | |
import cascading.scheme.Scheme; | |
import cascading.scheme.hadoop.TextDelimited; | |
import cascading.tap.Tap; | |
import cascading.tap.hadoop.Hfs; | |
import cascading.tuple.Fields; | |
import com.scaleunlimited.cascading.scheme.hadoop.SolrScheme; | |
import org.xml.sax.SAXException; | |
import javax.xml.parsers.ParserConfigurationException; | |
import java.io.IOException; | |
import java.util.Properties; | |
public class SolrIndexer { | |
public static void main(String[] args) throws ParserConfigurationException, SAXException, IOException { | |
final String filePath = "/data/in/inpatient/people.txt.bz2"; | |
final String outPath = "/data/out/inpatient-solr"; | |
final String solrHome = "/data/in/inpatient/solr"; | |
final int numSinkParts = 3; | |
final Fields fields = new Fields("name", "age", "description"); | |
Properties properties = new Properties(); | |
AppProps.setApplicationJarClass(properties, SolrIndexer.class); | |
FlowConnector flowConnector = new Hadoop2MR1FlowConnector(properties); | |
// create source and sink taps | |
Tap tap = new Hfs(new TextDelimited(fields, true, "^"), filePath); | |
final Scheme scheme = new SolrScheme(fields, solrHome); | |
scheme.setNumSinkParts(numSinkParts); | |
Tap sink = new Hfs(scheme, outPath); | |
Pipe pipe = new Pipe("Solr indexing"); | |
pipe = new GroupBy(pipe, new Fields("age")); | |
FlowDef flowDef = FlowDef.flowDef() | |
.setName("flow") | |
.addSource(pipe, tap) | |
.addTailSink(pipe, sink); | |
Flow wcFlow = flowConnector.connect(flowDef); | |
wcFlow.complete(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment