Skip to content

Instantly share code, notes, and snippets.

Last active October 26, 2020 10:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ikagios/81577d0be47033659e5a5cf0197a305a to your computer and use it in GitHub Desktop.
Save ikagios/81577d0be47033659e5a5cf0197a305a to your computer and use it in GitHub Desktop.
Application for detecting fake news in Twitter
# Import necessary libraries
from __future__ import print_function
import numpy as np
import pandas as pd
import tweepy
import json
import time
import datetime
from tweepy import Stream, StreamListener, OAuthHandler
# pip install mysql-connector-python
import mysql.connector
# pip install --only-binary :all: mysqlclient
# for Python Shell instead run in cmd: pip install mysqlclient-1.4.4-cp37-cp37m-win32.whl
import MySQLdb
from dateutil import parser
from http.client import IncompleteRead as http_incompleteRead
from urllib3.exceptions import IncompleteRead as urllib3_incompleteRead
from urllib3.exceptions import ProtocolError
from ssl import SSLError
from requests.exceptions import Timeout, ConnectionError
from urllib3.exceptions import ReadTimeoutError
# Enter hashtags or keywords you wish to find tweets
input_key_hash = input("Enter hashtags or keywords separated by commas, e.g. #bigdata, #AI, #datascience: ")
WORDS = input_key_hash.split(",")
# Enter time to stop collecting tweets
date_entry = input('Enter date in YYYY-MM-DD format, to stop collecting tweets: ')
time_entry = input('Enter time of day in HH:MM:SS format, to stop collecting tweets: ')
year, month, day = map(int, date_entry.split('-'))
hour, minute, second = map(int, time_entry.split(':'))
STOPDATE = datetime.datetime(year, month, day, hour, minute, second)
# Enter Twitter Developer Credetials
CONSUMER_SECRET = 'iXKdbG4BthZQNi8spy2LJx2Wz2RdO1KL5byKwg4n2UCJeJrXkm'
ACCESS_TOKEN = '997022971620347904-Dm5CDg8e7wPxbob1ssikc6Z9CSVM168'
# Enter MySQL Database Name and Credentials
HOST = 'localhost'
USER = 'FakeNews'
PASSWD = 'dynatothtes23ZX'
DATABASE = 'fake'
# This function takes the 'WORDS', 'tweet_id', 'screen_name', 'user_id_str', 'created_at', 'text', 'reaction_user',
# react_screen_name, retweet_OR_reply and stores it # into MySQL database "fake", table "greek_test2"
def store_data1(WORDS, tweet_id, screen_name, user_id_str, created_at, text, reaction_user, react_screen_name, retweet_OR_reply):
db=MySQLdb.connect(host=HOST, user=USER, passwd=PASSWD, db=DATABASE, charset="utf8mb4")
cursor = db.cursor()
insert_query = "INSERT INTO greek_test2 (WORDS, tweet_id, screen_name, user_id_str, created_at, text, reaction_user, react_screen_name, retweet_OR_reply) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)"
cursor.execute(insert_query, (WORDS, tweet_id, screen_name, user_id_str, created_at, text, reaction_user, react_screen_name, retweet_OR_reply))
# This function selects columns from table "greek_test2" and inserts them into table "dataforgraph_greek3". These are the
# columns we are going to use later on to plot our network graph
def store_data2(WORDS, tweet_id, created_at, user_id_str, screen_name, reaction_user, react_screen_name, retweet_OR_reply):
db=MySQLdb.connect(host=HOST, user=USER, passwd=PASSWD, db=DATABASE, charset="utf8mb4")
cursor = db.cursor()
cursor.execute('SELECT WORDS, tweet_id, created_at, user_id_str, screen_name, reaction_user, react_screen_name, retweet_OR_reply from greek_test2')
for row in cursor.fetchall():
WORDS, tweet_id, created_at, user_id_str, screen_name, reaction_user, react_screen_name, retweet_OR_reply = row
insert_query = "INSERT INTO dataforgraph_greek3 (WORDS, tweet_id, created_at, user_id_str, screen_name, reaction_user, react_screen_name, retweet_OR_reply) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
cursor.execute(insert_query, row)
# Create a class inheriting from StreamListener
class StreamListener(tweepy.StreamListener):
#This is a class provided by tweepy to access the Twitter Streaming API.
def on_connect(self):
# Called initially to connect to the Streaming API
print("You are now connected to the streaming API.")
def on_error(self, status_code):
# On error - if an error occurs, display the error / status code
print('An Error has occured: ' + repr(status_code))
return True
def on_disconnect(self, notice):
#Called when twitter sends a disconnect notice
return True
def on_timeout(self):
print ('Timeout...')
return True
def on_exception(self, exception):
return True
def on_data(self, data):
# Connects to the MySQL database and stores the tweet
if < STOPDATE: # Tweets are collected until given time is reached
# Decode the JSON from Twitter
datajson = json.loads(data)
# grab the wanted data from the Tweet
text = datajson['text']
screen_name = datajson['user']['screen_name']
tweet_id = datajson['id']
created_at = parser.parse(datajson['created_at'])
retweet_OR_reply = ""
reaction_user = ""
react_screen_name = ""
if datajson['in_reply_to_status_id']:
reaction_user = datajson['in_reply_to_user_id_str']
react_screen_name = datajson['in_reply_to_screen_name']
retweet_OR_reply = "reply"
reaction_user = ""
react_screen_name = ""
user_id_str = ""
user_id_str = datajson['user']['id_str']
reaction_user += datajson['retweeted_status']['user']['id_str']
react_screen_name += datajson['retweeted_status']['user']['screen_name']
retweet_OR_reply = "retweet"
user_id_str = datajson['user']['id_str']
# print out a message that we have collected a tweet at a specific time and Authors' user_id
print("Tweet collected at " + str(created_at))
print("authored by user with user_id: " + str(user_id_str))
# insert the data into MySQL database "fake", table "greek_test2"
store_data1(WORDS, tweet_id, screen_name, user_id_str, created_at, text, reaction_user, react_screen_name, retweet_OR_reply)
# grab the data from table "greek_test2" and insert them into table "datafrograph_greek3"
store_data2(WORDS, tweet_id, created_at, user_id_str, screen_name, reaction_user, react_screen_name, retweet_OR_reply)
except BaseException as e:
print("Error on_data: %s, Pausing..." % str(e))
return True
except http_incompleteRead as e:
print("http.client Incomplete Read error: %s" % str(e))
print("~~~ Restarting stream search in 5 seconds... ~~~")
#restart stream - simple as return true just like previous exception?
return True
except urllib3_incompleteRead as e:
print("urllib3 Incomplete Read error: %s" % str(e))
print("~~~ Restarting stream search in 5 seconds... ~~~")
return True
except (ProtocolError, AttributeError) as e:
print("Incomplete Read error: %s" % str(e))
print("~~~ Restarting stream search in 5 seconds... ~~~")
return True
except IncompleteRead as e:
print("Incomplete Read error: %s" % str(e))
print("~~~ Restarting stream search in 5 seconds... ~~~")
return True
except (Timeout, SSLError, ReadTimeoutError, ConnectionError) as e:
logging.warning("Network error occurred...", str(e))
return True
# if <= STOPDATE the collection of tweets stops and the stream disconnects
print('Timed out!')
return False
# Authentication Procedure
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
# Set up the listener. The 'wait_on_rate_limit=True' is needed to help with Twitter API rate limiting.
listener = StreamListener(api = tweepy.API(wait_on_rate_limit=True) )
#Establish a streaming session and route messages to StreamListener
streamer = tweepy.Stream(auth=auth, listener=listener)
print("Tracking: " + str(WORDS))
# We use filter to stream all tweets containing the given word. The track parameter is an array of search terms to stream.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment