Skip to content

Instantly share code, notes, and snippets.

@pallavipr
Last active November 24, 2015 11:21
Show Gist options
  • Save pallavipr/95dedf2e6ad512b582e5 to your computer and use it in GitHub Desktop.
Save pallavipr/95dedf2e6ad512b582e5 to your computer and use it in GitHub Desktop.
Insert Live Twitter User Data into DB2
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import java.util.Properties
import java.sql.DriverManager
import java.sql.PreparedStatement
object Db2WriteTwitter extends App{
Class.forName("com.ibm.db2.jcc.DB2Driver")
val con = DriverManager.getConnection("jdbc:db2://localhost:50000/sample:currentSchema=pallavipr;user=pallavipr;password=XXX;")
val SQL="insert into PALLAVIPR.TWITTERUSERS values (?)"
val ps = con.prepareStatement(SQL)
val consumerKey = "XXXX";
val consumerSecret = "XXXX";
val accessToken = "XXXX";
val accessTokenSecret = "XXXX";
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[16]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val filters = Array("Paris", "Paris")
val stream = TwitterUtils.createStream(ssc, None, filters)
val users = stream.map(status => status.getUser.getName)
val recentUsers = users.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
recentUsers.foreachRDD(rdd => {
println("\nNumber of users in last 60 seconds (%s total):".format(rdd.count()))
rdd.foreach{
case (user, tag) => println("%s ".format(user))
val singleUser = format(user)
ps.setString(1, singleUser)
ps.execute()
println("Inserted Twitter User into DB: " + singleUser)
}
})
ssc.start()
ssc.awaitTermination()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment