Skip to content

Instantly share code, notes, and snippets.

@Mageswaran1989
Last active April 17, 2020 15:38
Show Gist options
  • Save Mageswaran1989/5ff85e5482df42f32b785c387abb7b7d to your computer and use it in GitHub Desktop.
Save Mageswaran1989/5ff85e5482df42f32b785c387abb7b7d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
__author__ = "Mageswaran Dhandapani"
__copyright__ = "Copyright 2020, The Spark Structured Playground Project"
__credits__ = []
__license__ = "Apache License"
__version__ = "2.0"
__maintainer__ = "Mageswaran Dhandapani"
__email__ = "mageswaran1989@gmail.com"
__status__ = "Education Purpose"
import argparse
import re
import json
import threading
from tweepy.auth import OAuthHandler
from tweepy.streaming import Stream
from tweepy.streaming import StreamListener
CEND = '\33[0m'
CBLUE = '\33[34m'
CYELLOW2 = '\33[93m'
CRED = '\33[31m'
CGREEN2 = '\33[92m'
def print_info(*args):
"""
Prints the string in green color
:param args: user string information
:return: stdout
"""
print(CGREEN2 + str(*args) + CEND)
def print_error(*args):
"""
Prints the string in red color
:param args: user string information
:return: stdout
"""
print(CRED + str(*args) + CEND)
def print_warn(*args):
"""
Prints the string in yellow color
:param args: user string information
:return: stdout
"""
print(CYELLOW2 + str(*args) + CEND)
def print_debug(*args):
"""
Prints the string in blue color
:param args: user string information
:return: stdout
"""
print(CBLUE + str(*args) + CEND)
class AIKeyWords(object):
AI = "#AI|Artificial Intelligence|robotics"
ML = "machinelearningengineer|Machine Learning|scikit|#ML|mathematics"
DL = "DeepLearning|Deep Learning|#DL|Tensorflow|Pytorch|Neural Network|NeuralNetwork"
CV = "computervision|computer vision|machine vision|machinevision|convolutional network|convnet|image processing"
NLP = "NLP|naturallanguageprocessing|natural language processing|text processing|text analytics|nltk|spacy"
DATA = "iot|datasets|dataengineer|analytics|bigdata|big data|data science|data analytics|data insights|data mining|distributed computing|parallel processing|apache spark|hadoop|apache hive|airflow|mlflow|apache kafka|hdfs|apache|kafka"
TWEET_HASH_TAGS = "dataanalysis|AugmentedIntelligence|datascience|machinelearning|rnd|businessintelligence|DigitalTransformation|datamanagement|ArtificialIntelligence"
FALSE_POSITIVE = "gpu|nvidia|maths|mathematics|intelligence|conspiracy|astrology|vedic|tamil|text|computer|ebook|pdf|learning|big|insights|processing|network|machine|artifical|data|science|parallel|computing|deep|vision|natural|language|data"
RANDOM_TOPICS = "nature|climate|space|earth|animals|plants|astrology|horoscope|occult|hidden science|conspiracy|hinduism|hindu|vedic"
POSITIVE = AI + "|" + ML + "|" + DL + "|" + CV + "|" + NLP + "|" + DATA + "|" + TWEET_HASH_TAGS
ALL = POSITIVE + "|" + FALSE_POSITIVE + "|" + RANDOM_TOPICS
def pick_text(text, rtext, etext):
"""
Twitter Json data has three level of text. This function picks what is available in the order etext > rtext > text
:param text: Plain text at top level of the Json with stipped content and an URL
:param rtext: Retweeted full text
:param etext: Extended retweeted full text
:return:
"""
ret = ""
if etext:
ret = etext
elif rtext:
ret = rtext
elif text:
ret = text
else:
ret = ""
return re.sub("\n|\r", "", ret).strip()
class TweetsListener(StreamListener):
"""
Tweepy StreamListener.
Reference: http://docs.tweepy.org/en/latest/streaming_how_to.html
:param is_ai: (bool) Used to differentiate AI tweets wuth green color and red for other category tweets
"""
def __init__(self,
is_ai=False):
StreamListener.__init__(self)
self._is_ai = is_ai
def on_data(self, data):
"""
Gets triggered by the Twitter stream API
:param data: Tweet Json data
:return: dumps the data into screen
"""
data_dict = json.loads(data)
# Debug info
if "text" in data_dict.keys():
text = data_dict["text"]
else:
text = None
if "extended_tweet" in data_dict.keys():
etext = data_dict["extended_tweet"]["full_text"]
else:
etext = None
if "retweeted_status" in data_dict.keys():
if "extended_tweet" in data_dict["retweeted_status"].keys():
rtext = data_dict["retweeted_status"]["extended_tweet"]["full_text"]
else:
rtext = None
else:
rtext = None
text = pick_text(text=text, rtext=rtext, etext=etext)
if self._is_ai:
print_info(text)
else:
print_error(text)
# with open("/tmp/tweets/{}.json".format(json.loads(data)["id_str"]), "wt", encoding='utf-8') as file:
# file.write(data)
return True
def if_error(self, status):
print(status)
return True
class TwitterProducer(object):
"""
Twitter data ingestion. Gets the twitter stream data.
:param twitter_consumer_key: (str) Twitter Consumer Key
:param twitter_consumer_secret: (str) Twitter Consumer secret
:param twitter_access_token: (str) Twitter Access token
:param twitter_access_secret: (str) Twitter Access secret
:param topic_1: (str) Tweet stream topic
:param topic_2: (str) Tweet stream topic
:param topic_2_filter_words: (list) Filter words to be used for second stream
"""
def __init__(self,
twitter_consumer_key='2DBNLKerbSN0RclFcCGA96z9D',
twitter_consumer_secret='888Gz6h6fsuHM5yqTnJSYjVc4K99Kl41yvduIcWqyoekSHKUux',
twitter_access_token='1203929079688220672-iSKyHRz6UeXYXcRW4iWSp5kKiSMb7t',
twitter_access_secret='A9ANwQrHilp8upBnIbwnXhQ5Xtly71dmN0lmo03Hc3LQA',
topic_1='ai_tweets_topic',
topic_2='mix_tweets_topic',
topic_2_filter_words=None):
self._twitter_consumer_key = twitter_consumer_key
self._twitter_consumer_secret = twitter_consumer_secret
self._twitter_access_token = twitter_access_token
self._twitter_access_secret = twitter_access_secret
self._topic_1 = topic_1
self._topic_2 = topic_2
self._topic_2_filter_words = topic_2_filter_words
def _twitter_stream(self, topic_name, keywords, is_ai=False):
"""
:param topic_name:
:param keywords:
:param is_ai:
:return:
"""
auth = OAuthHandler(self._twitter_consumer_key, self._twitter_consumer_secret)
auth.set_access_token(self._twitter_access_token, self._twitter_access_secret)
print_info("\n\n---------------------------------------------------------------------------------\n\n")
print_info(f"Topic name: {topic_name}")
print_info(f"Twitter Keywords : {keywords}")
print_info("\n\n---------------------------------------------------------------------------------\n\n")
while True:
try:
twitter_stream = Stream(auth, TweetsListener(is_ai=is_ai))
# https://developer.twitter.com/en/docs/tweets/filter-realtime/api-reference/post-statuses-filter
# https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters
twitter_stream.filter(track=keywords, languages=["en"])
except Exception as e:
print("Error: Restarting the twitter stream")
def run(self, stream="both"):
"""
Starts two tweet streams
:return: None
"""
if self._topic_2_filter_words is None:
self._topic_2_filter_words = AIKeyWords.ALL.split("|")
if stream == "topic1":
ai_stream = threading.Thread(target=self._twitter_stream, args=(self._topic_1, AIKeyWords.POSITIVE.split("|"), True,))
ai_stream.setDaemon(True)
ai_stream.start()
ai_stream.join()
elif stream == "topic2":
non_ai_stream = threading.Thread(target=self._twitter_stream,
args=(self._topic_2, self._topic_2_filter_words,))
non_ai_stream.setDaemon(True)
non_ai_stream.start()
non_ai_stream.join()
else:
ai_stream = threading.Thread(target=self._twitter_stream,
args=(self._topic_1, AIKeyWords.POSITIVE.split("|"), True,))
non_ai_stream = threading.Thread(target=self._twitter_stream,
args=(self._topic_2, self._topic_2_filter_words,))
ai_stream.setDaemon(True)
non_ai_stream.setDaemon(True)
ai_stream.start()
non_ai_stream.start()
ai_stream.join()
non_ai_stream.join()
if __name__ == "__main__":
optparse = argparse.ArgumentParser("Twitter Spark Text Processor pipeline:")
optparse.add_argument("-sw", "--mode",
type=str,
default="both",
required=False,
help="[topic1, topic2, both]")
parsed_args = optparse.parse_args()
producer = TwitterProducer() # TODO add you credential details!
producer.run(stream=parsed_args.mode)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment