Skip to content

Instantly share code, notes, and snippets.

@sblack4
Last active February 26, 2018 21:13
Show Gist options
  • Save sblack4/589ddf15796e6f7491fdee50bee55520 to your computer and use it in GitHub Desktop.
Save sblack4/589ddf15796e6f7491fdee50bee55520 to your computer and use it in GitHub Desktop.
Spark-twitter stuff for POC
config.py
.vscode
*.pyc
*.jar
#!/bin/python
from __future__ import print_function
# general packages
import sys
import config
import logging
import json
from random import choice
# pyspark streaming
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
# spark sql
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql.functions import desc
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
# tweet_struct = StructType([
# StructField('id', LongType(), False),
# StructField('created_at', StringType(), False),
# StructField('text', StringType(), False),
# StructField('favorite_count', IntegerType(), True),
# StructField('quote_count', IntegerType(), True),
# StructField('retweet_count', IntegerType(), True),
# StructField('reply_count', IntegerType(), True),
# StructField('lang', StringType(), True),
# StructField('coordinates', StringType(), True),
# StructField('place', StringType(), True),
# StructField('possibly_sensitive', StringType(), True),
# StructField('user', MapType(StringType(), StringType()), False)
# ])
def HandleJson(df):
# fill na
# check for possibly_sensitive
# get rid of sensitive material
if df.select("possibly_sensitive").show() == "true":
return
tweets = df.select("id",
"created_at",
expr('COALESCE(text, "null") AS text'),
expr('COALESCE(favorite_count, 0) AS favorite_count'),
expr('COALESCE(retweet_count, 0) AS retweet_count'),
expr('COALESCE(quote_count, 0) AS quote_count'),
expr('COALESCE(reply_count, 0) as reply_count'),
expr('COALESCE(lang, "und") as lang'),
expr('COALESCE(coordinates, 0) as coordinates'),
expr('COALESCE(place, "null") as place'),
col("user.id").alias("user_id"),
expr("good_day() as date"),
expr("rand_state() as state"),
expr("rand_provider() as provider")
# expr('concat("2018-02-", substring(created_at, 9, 2), "T", substring(created_at,12,8), ".000") as datetime')
)
tweets.write.mode("append").insertInto("default.tweets")
users = df.select("user.id",
"user.name",
"user.description",
"user.followers_count",
"user.location",
"user.friends_count",
"user.screen_name"
)
users.write.mode("append").insertInto("default.users")
def handleRDD(rdd):
if not rdd:
return
try:
# df=sqlContext.createDataFrame(json.loads(rdd.map(lambda x: x[1].encode('utf-8'))), samplingRatio=0.5)
df=sqlContext.read.json(rdd.map(lambda x: x[1]))
HandleJson(df)
except Exception as ex:
print(ex)
if __name__ == "__main__":
sc = SparkContext("yarn", "TweetConsumer")
ssc = StreamingContext(sc, 1)
sqlContext = HiveContext(sc)
# ssc.checkpoint("file:///" + getcwd())
days = sqlContext.sql("select distinct date from transactions").collect()
def good_day():
return choice(days)[0].encode('utf-8')
sqlContext.udf.register("good_day", good_day)
states = sqlContext.sql("select State from us_states").collect()
def rand_state():
return choice(states)[0].encode('utf-8')
sqlContext.udf.register("rand_state", rand_state)
def rand_provider():
return choice(['pandora', 'spotify'])
sqlContext.udf.register("rand_provider", rand_provider)
zkQuorum, topic = config.zkQuorum, config.topic
# lines = KafkaUtils.createStream(ssc, [zkQuorum], "", [topic]
lines = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": zkQuorum})
lines.foreachRDD(handleRDD)
ssc.start()
ssc.awaitTermination()
#!/usr/bin/env python
# standard libraries
from __future__ import print_function
import config
import json
import logging
# tweepy
import tweepy
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
# kafka-python libraries
from kafka import KafkaProducer
from kafka.errors import KafkaError
from any_broker import run_listener
class AnyListener(StreamListener):
"""Custom StreamListener for streaming data."""
def __init__(self, callback):
self.callback = callback
# logging.basicConfig(filename='anylistener.log',level=config.log_level)
# logging.info("started logging")
def on_data(self, data):
try:
# msg = json.loads(data)
# logging.info(msg)
self.callback(data.encode('utf-8'))
return True
except BaseException as e:
# logging.error("Error on_data: %s" % str(e))
print(e)
return True
def on_error(self, status):
# logging.error(status)
return True
def run_listener(callback):
auth = OAuthHandler(config.consumer_key, config.consumer_secret)
auth.set_access_token(config.access_token, config.access_secret)
api = tweepy.API(auth)
twitter_stream = Stream(auth, AnyListener(callback))
twitter_stream.filter(track=config.track_string.split(" "))
# logging.info("listening for " + config.track_string)
# producer.send(config.topic, rec)
def run_kafka_broker():
# Kafka Stuff
# put your broker hostname:port in single quotes inside those bracketse
logging.basicConfig(filename='run_kafka_broker.log',level=config.log_level)
logging.info("logging to " + config.kafka_host)
mykafkaservers = [config.kafka_host]
producer = KafkaProducer(
bootstrap_servers=mykafkaservers,
value_serializer=lambda m: m.encode('utf-8'),
api_version=(0,10,1)
)
produce = lambda vals: producer.send(config.topic, vals)
run_listener(produce)
if __name__ == "__main__":
run_kafka_broker()
#!/bin/python
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
from pyspark.sql import HiveContext
from os import getcwd
import logging
from pyspark.sql.functions import col
sc = SparkContext("local[2]", "TweetConsumer")
ssc = StreamingContext(sc, 10)
sqlContext = HiveContext(sc)
# ssc.checkpoint("file:///" + getcwd())
socket_stream = ssc.socketTextStream("127.0.0.1", 5555)
lines = socket_stream.window(20)
def HandleJson(df):
# fill na
# check for possibly_sensitive
# get rid of sensitive material
if df.select("possibly_sensitive").show() == "true":
return
tweets = df.select("id",
"created_at",
"text",
"favorite_count",
"retweet_count",
"quote_count",
"reply_count",
"lang",
"coordinates",
"place",
col("user.id").alias("user_id")
)
tweets.write.mode("append").insertInto("default.tweets")
users = df.select("user.id",
"user.name",
"user.description",
"user.followers_count",
"user.location",
"user.friends_count",
"user.screen_name"
)
users.write.mode("append").insertInto("default.users")
def handleRDD(rdd):
if not rdd:
return
df=sqlContext.read.json(rdd)
try:
HandleJson(df)
except Exception as ex:
print(ex)
lines.foreachRDD(handleRDD)
ssc.start()
ssc.awaitTermination()
#!/bin/python
from __future__ import print_function
import tweepy
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
import time
import config
import json
import socket
import logging
class PortListener(StreamListener):
"""Custom StreamListener for streaming data."""
def __init__(self, filename):
self.client_socket = filename
def on_data(self, data):
try:
# msg = json.loads(data)
# logging.debug(msg)
self.client_socket.send(data)
return True
except BaseException as e:
logging.error("Error on_data: %s" % str(e))
return True
def on_error(self, status):
logging.error(status)
return True
def run(c_socket):
auth = OAuthHandler(config.consumer_key, config.consumer_secret)
auth.set_access_token(config.access_token, config.access_secret)
api = tweepy.API(auth)
twitter_stream = Stream(auth, PortListener(c_socket))
twitter_stream.filter(track=config.track_string.split(" "))
if __name__ == "__main__":
logging.basicConfig(filename="port_broker.log",
level=config.log_level)
s = socket.socket() # Create a socket object
host = "127.0.0.1" # Get local machine name
port = 5555 # Reserve a port for your service.
s.bind((host, port)) # Bind to the port
logging.info("Listening on port: %s" % str(port))
s.listen(5) # Now wait for client connection.
c, addr = s.accept() # Establish connection with client.
logging.info("Received request from: " + str(addr))
run(c)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment