Skip to content

Instantly share code, notes, and snippets.

@WillisN
WillisN / twitter_reader.py
Created September 30, 2020 02:51
twitter_reader
import json
import requests
import socket
import sys
import twitter_secure as ts
# call the Twitter API URL and return the response for a stream of tweets
my_auth = ts.secure()
host = "localhost"
@WillisN
WillisN / twitter_secure.py
Last active September 30, 2020 02:25
twitter_secure
import json
import requests_oauthlib
def secure():
path = '.../secret/'
# Load Twitter API secrets from a json file
secrets = json.loads(open(path + 'secrets.json').read())
api_key = secrets['API_key']
api_secret_key = secrets['API_secret_key']
access_token = secrets['access_token']
@WillisN
WillisN / streaming_structured.py
Created September 29, 2020 02:29
streaming_structured
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
def main():
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCount")\
.getOrCreate()
@WillisN
WillisN / spark_streaming.py
Created September 29, 2020 02:21
spark_streaming
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def updateCount(newCounts, state):
if state == None:
return sum(newCounts)
else:
return state + sum(newCounts)
@WillisN
WillisN / main.py
Created September 29, 2020 02:08
main
def main():
TCP_IP = "localhost"
TCP_PORT = 9009
conn = None
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)
print("Waiting for TCP connection...")
conn, addr = s.accept()
print("Connected... Starting getting tweets.")
@WillisN
WillisN / send_tweets_to_spark.py
Created September 29, 2020 02:03
send_tweets_to_spark
def send_tweets_to_spark(http_resp, tcp_connection):
for line in http_resp.iter_lines():
try:
full_tweet = json.loads(line)
datetime = full_tweet['created_at'][:20]
tweet = full_tweet['text']
print (f"---------------{datetime}--------------------------")
print(tweet)
@WillisN
WillisN / get_tweets.py
Last active September 29, 2020 02:00
code
def get_tweets():
url = 'https://stream.twitter.com/1.1/statuses/filter.json'
query_data = [('language', 'en'), ('locations', '-130,-20,100,50')]
query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data])
response = requests.get(query_url, auth=my_auth, stream=True)
return response