Skip to content

Instantly share code, notes, and snippets.

@alfonsonishikawa
Forked from VJ310/sentenceDetector.pig
Last active December 22, 2015 15:48
Show Gist options
  • Save alfonsonishikawa/6494478 to your computer and use it in GitHub Desktop.
Save alfonsonishikawa/6494478 to your computer and use it in GitHub Desktop.
register 'opennlp-tools-1.5.1-incubating.jar';
register 'opennlp-maxent-3.0.1-incubating.jar';
register 'SentimentUDF-1.0-SNAPSHOT.jar';
define getSentences com.Sentiment.udfSentence();
--load reviews from json file
raw_review = LOAD 'review.json' USING JsonLoader('votes:(funny:int,useful:int,cool:int),user_id:chararray,review_id:chararray,stars:int,date:chararray,text:chararray,type:chararray,business_id:chararray');
--seperate sentences from given review text using java UDF
sentences = FOREACH raw_review GENERATE review_id as review_id ,business_id as business_id, flatten(getSentences(text)) as sentence:chararray;
--store seperated sentences to json file
STORE sentences INTO 'sentences' USING JsonStorage();
import java.io.IOException;
import java.io.InputStream;
import opennlp.tools.sentdetect.SentenceDetectorME;
import opennlp.tools.sentdetect.SentenceModel;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
/**
*
* Detects sentences from a given text using opennlp sentence tokenizer
*
*/
public class udfSentence extends EvalFunc<DataBag> {
private static final String ENGLISH_SENTMODEL_PATH = "opennlp/en-sent.bin";
private static TupleFactory tupleFactory = TupleFactory.getInstance();
private static BagFactory bagFactory = BagFactory.getInstance();
private SentenceModel model;
public udfSentence() throws IOException {
ClassLoader loader = getClass().getClassLoader();
//loads sentence model using given file location
InputStream in = loader.getResourceAsStream(ENGLISH_SENTMODEL_PATH);
if (in == null) {
String message = String.format("Failed to find resource for model"
+ " sentence detection model: %s", path);
throw new IOException(message);
}
//loads sentence model from input stream
model = new SentenceModel(in);
}
@Override
public DataBag exec(Tuple input) throws IOException {
if (input == null || input.size() == 0 || input.isNull(0))
return null;
DataBag bagOfSentences = bagFactory.newDefaultBag();
if (model != null) {
SentenceDetectorME sentenceDetector = new SentenceDetectorME(model);
String text = (String) input.get(0);
//seperates sentences from given text
String sentences[] = sentenceDetector.sentDetect(text);
for (String sentence : sentences) {
Tuple sentenceTuple = tupleFactory.newTuple(sentence);
bagOfSentences.add(sentenceTuple);
}
return bagOfSentences;
} else {
return null;
}
}
public Schema outputSchema(Schema input) {
// check input schema: expected a (chararray)
if (input == null) {
throw new RuntimeException("Unknown input schema") ;
// Maybe will want to accept unknown schemas...
}
if (input.size() != 1) {
throw new RuntimeException(
"Expected only one parameter: (chararray)") ;
}
try {
if (input.getField(0).type != DataType.CHARARRAY) {
throw new RuntimeException(
"Expected input (chararray), received schema ("
+ DataType.findTypeName(input.getField(0).type) + ")") ;
}
} catch (FrontendException e) {
throw new RuntimeException(e) ;
}
// define output schema
Schema bagSchema = new Schema();
bagSchema.add(new Schema.FieldSchema("sentences", DataType.TUPLE));
try {
return new Schema(new Schema.FieldSchema(getSchemaName(this
.getClass().getName().toLowerCase(), input), bagSchema,
DataType.BAG));
} catch (FrontendException e) {
return null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment