Skip to content

Instantly share code, notes, and snippets.

@weizhou2273
Created August 13, 2017 00:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save weizhou2273/40b46b617aeaca434cb1c5be85511a08 to your computer and use it in GitHub Desktop.
Save weizhou2273/40b46b617aeaca434cb1c5be85511a08 to your computer and use it in GitHub Desktop.
from __future__ import print_function
import sys
import re
from operator import add
import pandas as pd
from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SQLContext
import json
import boto
import boto3
from boto.s3.key import Key
import boto.s3.connection
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import *
import credentials
#aws keys
aws_key_id = credentials.aws['key_id']
aws_key = credentials.aws['key']
def main():
conf = SparkConf().setAppName("first")
sc = SparkContext(conf=conf)
#print 40 * '-'
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId",aws_key_id)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",aws_key)
config_dict = {"fs.s3n.awsAccessKeyId":aws_key_id,
"fs.s3n.awsSecretAccessKey":aws_key}
bucket = "project4capstones3"
prefix = "/2017/07/*/*/*"
filename = "s3n://{}/Trump/{}".format(bucket, prefix)
rdd = sc.hadoopFile(filename,
'org.apache.hadoop.mapred.TextInputFormat',
'org.apache.hadoop.io.Text',
'org.apache.hadoop.io.LongWritable',
conf=config_dict)
spark = SparkSession.builder.appName("PythonWordCount").config("spark.files.overwrite","true").getOrCreate()
df = spark.read.json(rdd.map(lambda x: x[1]))
data_rm_na = df.filter(df["status_id"]!='None')
features_of_interest = ["rt_status_user_followers_count",\
'rt_status_user_friends_count',\
'rt_status_user_statuses_count',\
'rt_status_retweet_count',\
'rt_status_user_listed_count',\
'rt_status_user_id',\
'rt_status_created_at',\
'status_created_at',\
'rt_status_user_name',\
'rt_status_num_user_mentions',\
'searched_names',\
'rt_status_sentMag',\
'rt_status_sentScore',\
'rt_status_favorite_count',\
'status_id']
df_reduce= data_rm_na.select(features_of_interest)
df_reduce = df_reduce.withColumn("rt_status_user_followers_count", df_reduce["rt_status_user_followers_count"].cast(IntegerType()))
df_reduce = df_reduce.withColumn("rt_status_user_friends_count", df_reduce["rt_status_user_friends_count"].cast(IntegerType()))
df_reduce = df_reduce.withColumn("rt_status_user_statuses_count", df_reduce["rt_status_user_statuses_count"].cast(IntegerType()))
df_reduce = df_reduce.withColumn("rt_status_retweet_count", df_reduce["rt_status_retweet_count"].cast(IntegerType()))
df_reduce = df_reduce.withColumn("rt_status_user_listed_count", df_reduce["rt_status_user_listed_count"].cast(IntegerType()))
df_reduce = df_reduce.withColumn("rt_status_favorite_count", df_reduce["rt_status_favorite_count"].cast(IntegerType()))
df_reduce = df_reduce.withColumn("rt_status_num_user_mentions", df_reduce["rt_status_num_user_mentions"].cast(IntegerType()))
url_ = "jdbc:mysql://twittertalker1.csjkhjjygutf.us-east-1.rds.amazonaws.com:3306/innodb"
table_name_ = "retweet"
mode_ = "overwrite"
df_reduce.write.format("jdbc").option("url", url_)\
.option("dbtable", table_name_)\
.option("driver", "com.mysql.jdbc.Driver")\
.option("user", "XXXXXX")\
.option("password", "XXXXXXXX")\
.mode(mode_)\
.save()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment