Skip to content

Instantly share code, notes, and snippets.

@K41R1
Created June 14, 2020 02:16
Show Gist options
  • Save K41R1/04923a909679cb84ba6efe90c0f8df63 to your computer and use it in GitHub Desktop.
Save K41R1/04923a909679cb84ba6efe90c0f8df63 to your computer and use it in GitHub Desktop.
package twitter.jobs;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.from_json;
import static org.apache.spark.sql.functions.to_json;
import static org.apache.spark.sql.functions.current_timestamp;
import static org.apache.spark.sql.functions.unix_timestamp;
import static org.apache.spark.sql.functions.window;
import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.struct;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.split;
import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.lower;
import static org.apache.spark.sql.functions.regexp_replace;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation;
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations;
import edu.stanford.nlp.pipeline.Annotation;
import edu.stanford.nlp.pipeline.StanfordCoreNLP;
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations;
import edu.stanford.nlp.trees.Tree;
import edu.stanford.nlp.util.CoreMap;
import static twitter.sentiment.LanguageCheck.correctSpell;
public class CoronaSentimentTweets {
static Properties props = new Properties();
static {
props.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment");
}
static StanfordCoreNLP pipeline = new StanfordCoreNLP(props);
public static void main(String[] args) throws StreamingQueryException, TimeoutException {
SparkSession spark = getSparkSession();
spark.sparkContext().setLogLevel("ERROR");
spark.sqlContext().udf().register("sentiment", (String s) -> getSentiment(s), DataTypes.DoubleType);
spark.sqlContext().udf().register("sentimentToText", (Double d) -> sentimentToText(d), DataTypes.StringType);
Dataset<Row> stream = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "tweets")
.option("startingOffsets", "earliest")
.load();
Dataset<Row> jsonTweetDF = stream.selectExpr("CAST(value AS STRING)");
StructType schema = getTweetSchema();
Dataset<Row> tweetsDF = jsonTweetDF
.select(from_json(col("value"), schema).as("tweet"))
.select("tweet.*")
.select(split(col("text"), " ").as("words"), col("text"), col("user.name"))
.withColumn("tokens", explode(col("words")))
.filter(col("tokens").startsWith("#"))
.withColumn("lower_tokens", lower(col("tokens")))
.withColumn("hashtag", regexp_replace(col("lower_tokens"), "[^a-zA-Z]", ""))
.filter(col("hashtag").contains("covid").or(col("hashtag").contains("corona")))
.selectExpr("sentiment(text) as seVal", "sentimentToText(sentiment(text)) as sentiment")
.withColumn("EventTime", lit(current_timestamp()))
.withColumn("timestamp", unix_timestamp(col("EventTime"), "MM/dd/yyyy hh:mm:ss aa").cast(DataTypes.TimestampType))
.withWatermark("timestamp", "1 minutes")
.groupBy(col("sentiment"), window(col("timestamp"), "1 minutes"))
.count();
tweetsDF
.withColumn("value", to_json(struct(col("sentiment"), col("count"))))
.selectExpr("CAST(value as STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "corona_sentiment")
.option("checkpointLocation", "/tmp/checkpoints")
.outputMode(OutputMode.Append())
.start()
.awaitTermination();
spark.stop();
}
private static SparkSession getSparkSession() {
return SparkSession.builder()
.appName("Corona Tweets Sentiment")
.config("spark.executor.memory", "6g")
.config("spark.cores.max", "3")
.master("spark://127.0.0.1:7077")
.getOrCreate();
}
public static String sentimentToText(Double sentiment) {
String s;
if(sentiment < -1) {
s = "very negative";
}else if (sentiment >= -1 & sentiment < 0) {
s = "negative";
}else if(sentiment >= 0 && sentiment < 1) {
s = "neutral";
}else if (sentiment >= 1 && sentiment < 1) {
s = "positive";
}else {
s = "very positive";
}
return s;
}
public static Double getSentiment(String text) {
String checkedText = correctSpell(text);
Annotation document = new Annotation(checkedText);
pipeline.annotate(document);
List<CoreMap> sentences = document.get(SentencesAnnotation.class);
Double sum = 0.0;
for (CoreMap sentence : sentences) {
Tree tree = sentence.get(SentimentCoreAnnotations.SentimentAnnotatedTree.class);
int sentiment = RNNCoreAnnotations.getPredictedClass(tree);
int scaled = sentiment - 2;
sum = sum + scaled;
}
Double total = sum / sentences.size();
return total;
}
private static StructType getTweetSchema() {
StructType userSchema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("location", DataTypes.StringType, true),
DataTypes.createStructField("friendsCount", DataTypes.IntegerType, true)
});
StructType geoLocationType = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("longitude", DataTypes.DoubleType, true),
DataTypes.createStructField("latitude", DataTypes.DoubleType, true)
});
StructType tweetSchema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("createdAt", DataTypes.StringType, true),
DataTypes.createStructField("text", DataTypes.StringType, true),
DataTypes.createStructField("hashtags",DataTypes.createArrayType(DataTypes.StringType), true),
DataTypes.createStructField("user", userSchema, true),
DataTypes.createStructField("source", DataTypes.StringType, true),
DataTypes.createStructField("retweetCount", DataTypes.IntegerType, true),
DataTypes.createStructField("geo", geoLocationType, true)
});
return tweetSchema;
}
}
package twitter.sentiment;
import java.util.List;
import org.languagetool.JLanguageTool;
import org.languagetool.language.AmericanEnglish;
import org.languagetool.rules.RuleMatch;
/**
* LanguageCheck
*/
public class LanguageCheck {
static JLanguageTool langTool = new JLanguageTool(new AmericanEnglish());
public static String correctSpell(String text) {
String query = text;
try {
List<RuleMatch> matches = langTool.check(query);
String result = "";
int lastPos = 0;
String tmp = "";
for (RuleMatch ma : matches) {
try {
tmp = ma.getSuggestedReplacements().get(0);
result += query.substring(lastPos, ma.getFromPos());
result += tmp;
lastPos = ma.getToPos();
} catch (Exception e) {
return text;
}
}
if (lastPos < query.length()) {
result += query.substring(lastPos, query.length());
}
return result;
} catch (Exception e) {
return text;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment