Last active
April 17, 2020 15:38
-
-
Save Mageswaran1989/5ff85e5482df42f32b785c387abb7b7d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
__author__ = "Mageswaran Dhandapani" | |
__copyright__ = "Copyright 2020, The Spark Structured Playground Project" | |
__credits__ = [] | |
__license__ = "Apache License" | |
__version__ = "2.0" | |
__maintainer__ = "Mageswaran Dhandapani" | |
__email__ = "mageswaran1989@gmail.com" | |
__status__ = "Education Purpose" | |
import argparse | |
import re | |
import json | |
import threading | |
from tweepy.auth import OAuthHandler | |
from tweepy.streaming import Stream | |
from tweepy.streaming import StreamListener | |
CEND = '\33[0m' | |
CBLUE = '\33[34m' | |
CYELLOW2 = '\33[93m' | |
CRED = '\33[31m' | |
CGREEN2 = '\33[92m' | |
def print_info(*args): | |
""" | |
Prints the string in green color | |
:param args: user string information | |
:return: stdout | |
""" | |
print(CGREEN2 + str(*args) + CEND) | |
def print_error(*args): | |
""" | |
Prints the string in red color | |
:param args: user string information | |
:return: stdout | |
""" | |
print(CRED + str(*args) + CEND) | |
def print_warn(*args): | |
""" | |
Prints the string in yellow color | |
:param args: user string information | |
:return: stdout | |
""" | |
print(CYELLOW2 + str(*args) + CEND) | |
def print_debug(*args): | |
""" | |
Prints the string in blue color | |
:param args: user string information | |
:return: stdout | |
""" | |
print(CBLUE + str(*args) + CEND) | |
class AIKeyWords(object): | |
AI = "#AI|Artificial Intelligence|robotics" | |
ML = "machinelearningengineer|Machine Learning|scikit|#ML|mathematics" | |
DL = "DeepLearning|Deep Learning|#DL|Tensorflow|Pytorch|Neural Network|NeuralNetwork" | |
CV = "computervision|computer vision|machine vision|machinevision|convolutional network|convnet|image processing" | |
NLP = "NLP|naturallanguageprocessing|natural language processing|text processing|text analytics|nltk|spacy" | |
DATA = "iot|datasets|dataengineer|analytics|bigdata|big data|data science|data analytics|data insights|data mining|distributed computing|parallel processing|apache spark|hadoop|apache hive|airflow|mlflow|apache kafka|hdfs|apache|kafka" | |
TWEET_HASH_TAGS = "dataanalysis|AugmentedIntelligence|datascience|machinelearning|rnd|businessintelligence|DigitalTransformation|datamanagement|ArtificialIntelligence" | |
FALSE_POSITIVE = "gpu|nvidia|maths|mathematics|intelligence|conspiracy|astrology|vedic|tamil|text|computer|ebook|pdf|learning|big|insights|processing|network|machine|artifical|data|science|parallel|computing|deep|vision|natural|language|data" | |
RANDOM_TOPICS = "nature|climate|space|earth|animals|plants|astrology|horoscope|occult|hidden science|conspiracy|hinduism|hindu|vedic" | |
POSITIVE = AI + "|" + ML + "|" + DL + "|" + CV + "|" + NLP + "|" + DATA + "|" + TWEET_HASH_TAGS | |
ALL = POSITIVE + "|" + FALSE_POSITIVE + "|" + RANDOM_TOPICS | |
def pick_text(text, rtext, etext): | |
""" | |
Twitter Json data has three level of text. This function picks what is available in the order etext > rtext > text | |
:param text: Plain text at top level of the Json with stipped content and an URL | |
:param rtext: Retweeted full text | |
:param etext: Extended retweeted full text | |
:return: | |
""" | |
ret = "" | |
if etext: | |
ret = etext | |
elif rtext: | |
ret = rtext | |
elif text: | |
ret = text | |
else: | |
ret = "" | |
return re.sub("\n|\r", "", ret).strip() | |
class TweetsListener(StreamListener): | |
""" | |
Tweepy StreamListener. | |
Reference: http://docs.tweepy.org/en/latest/streaming_how_to.html | |
:param is_ai: (bool) Used to differentiate AI tweets wuth green color and red for other category tweets | |
""" | |
def __init__(self, | |
is_ai=False): | |
StreamListener.__init__(self) | |
self._is_ai = is_ai | |
def on_data(self, data): | |
""" | |
Gets triggered by the Twitter stream API | |
:param data: Tweet Json data | |
:return: dumps the data into screen | |
""" | |
data_dict = json.loads(data) | |
# Debug info | |
if "text" in data_dict.keys(): | |
text = data_dict["text"] | |
else: | |
text = None | |
if "extended_tweet" in data_dict.keys(): | |
etext = data_dict["extended_tweet"]["full_text"] | |
else: | |
etext = None | |
if "retweeted_status" in data_dict.keys(): | |
if "extended_tweet" in data_dict["retweeted_status"].keys(): | |
rtext = data_dict["retweeted_status"]["extended_tweet"]["full_text"] | |
else: | |
rtext = None | |
else: | |
rtext = None | |
text = pick_text(text=text, rtext=rtext, etext=etext) | |
if self._is_ai: | |
print_info(text) | |
else: | |
print_error(text) | |
# with open("/tmp/tweets/{}.json".format(json.loads(data)["id_str"]), "wt", encoding='utf-8') as file: | |
# file.write(data) | |
return True | |
def if_error(self, status): | |
print(status) | |
return True | |
class TwitterProducer(object): | |
""" | |
Twitter data ingestion. Gets the twitter stream data. | |
:param twitter_consumer_key: (str) Twitter Consumer Key | |
:param twitter_consumer_secret: (str) Twitter Consumer secret | |
:param twitter_access_token: (str) Twitter Access token | |
:param twitter_access_secret: (str) Twitter Access secret | |
:param topic_1: (str) Tweet stream topic | |
:param topic_2: (str) Tweet stream topic | |
:param topic_2_filter_words: (list) Filter words to be used for second stream | |
""" | |
def __init__(self, | |
twitter_consumer_key='2DBNLKerbSN0RclFcCGA96z9D', | |
twitter_consumer_secret='888Gz6h6fsuHM5yqTnJSYjVc4K99Kl41yvduIcWqyoekSHKUux', | |
twitter_access_token='1203929079688220672-iSKyHRz6UeXYXcRW4iWSp5kKiSMb7t', | |
twitter_access_secret='A9ANwQrHilp8upBnIbwnXhQ5Xtly71dmN0lmo03Hc3LQA', | |
topic_1='ai_tweets_topic', | |
topic_2='mix_tweets_topic', | |
topic_2_filter_words=None): | |
self._twitter_consumer_key = twitter_consumer_key | |
self._twitter_consumer_secret = twitter_consumer_secret | |
self._twitter_access_token = twitter_access_token | |
self._twitter_access_secret = twitter_access_secret | |
self._topic_1 = topic_1 | |
self._topic_2 = topic_2 | |
self._topic_2_filter_words = topic_2_filter_words | |
def _twitter_stream(self, topic_name, keywords, is_ai=False): | |
""" | |
:param topic_name: | |
:param keywords: | |
:param is_ai: | |
:return: | |
""" | |
auth = OAuthHandler(self._twitter_consumer_key, self._twitter_consumer_secret) | |
auth.set_access_token(self._twitter_access_token, self._twitter_access_secret) | |
print_info("\n\n---------------------------------------------------------------------------------\n\n") | |
print_info(f"Topic name: {topic_name}") | |
print_info(f"Twitter Keywords : {keywords}") | |
print_info("\n\n---------------------------------------------------------------------------------\n\n") | |
while True: | |
try: | |
twitter_stream = Stream(auth, TweetsListener(is_ai=is_ai)) | |
# https://developer.twitter.com/en/docs/tweets/filter-realtime/api-reference/post-statuses-filter | |
# https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters | |
twitter_stream.filter(track=keywords, languages=["en"]) | |
except Exception as e: | |
print("Error: Restarting the twitter stream") | |
def run(self, stream="both"): | |
""" | |
Starts two tweet streams | |
:return: None | |
""" | |
if self._topic_2_filter_words is None: | |
self._topic_2_filter_words = AIKeyWords.ALL.split("|") | |
if stream == "topic1": | |
ai_stream = threading.Thread(target=self._twitter_stream, args=(self._topic_1, AIKeyWords.POSITIVE.split("|"), True,)) | |
ai_stream.setDaemon(True) | |
ai_stream.start() | |
ai_stream.join() | |
elif stream == "topic2": | |
non_ai_stream = threading.Thread(target=self._twitter_stream, | |
args=(self._topic_2, self._topic_2_filter_words,)) | |
non_ai_stream.setDaemon(True) | |
non_ai_stream.start() | |
non_ai_stream.join() | |
else: | |
ai_stream = threading.Thread(target=self._twitter_stream, | |
args=(self._topic_1, AIKeyWords.POSITIVE.split("|"), True,)) | |
non_ai_stream = threading.Thread(target=self._twitter_stream, | |
args=(self._topic_2, self._topic_2_filter_words,)) | |
ai_stream.setDaemon(True) | |
non_ai_stream.setDaemon(True) | |
ai_stream.start() | |
non_ai_stream.start() | |
ai_stream.join() | |
non_ai_stream.join() | |
if __name__ == "__main__": | |
optparse = argparse.ArgumentParser("Twitter Spark Text Processor pipeline:") | |
optparse.add_argument("-sw", "--mode", | |
type=str, | |
default="both", | |
required=False, | |
help="[topic1, topic2, both]") | |
parsed_args = optparse.parse_args() | |
producer = TwitterProducer() # TODO add you credential details! | |
producer.run(stream=parsed_args.mode) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment