Skip to content

Instantly share code, notes, and snippets.

@selahlynch
Last active August 29, 2015 14:13
Show Gist options
  • Save selahlynch/4e9a39b53a8b2465b86f to your computer and use it in GitHub Desktop.
Save selahlynch/4e9a39b53a8b2465b86f to your computer and use it in GitHub Desktop.
social media posts gathering
# get_tweets.py
# a collection of useful functions for getting tweets from the topsy api
# and getting them into a mysql database
# required packages:
# pip install requests
# pip install MySQL-python
from datetime import datetime, timedelta
import requests
import json
from xpath_json.xpath_json import get_value_by_xpath
import time
import warnings
topsy_datafields = {
'message_id' : 'tweet/id',
'user_handle' : 'author/nick',
'orig_user_id' : 'tweet/user/id',
'orig_user_handle' : 'tweet/user/screen_name',
'orig_date_posted' : 'firstpost_date',
'date_posted' : 'citation_date',
'created_at' : 'tweet/created_at',
'message' : 'tweet/text',
'orig_user_location' : 'tweet/user/location',
'tweet_location' : 'tweet/place/full_name',
'longitude' : 'tweet/coordinates/coordinates/0',
'latitude' : 'tweet/coordinates/coordinates/1',
'type' : 'type'
}
data_locations_daniel = {
'message_id' : 'id',
'user_handle' : 'user/screen_name',
'user_id' : 'user/id',
'user_location' : 'user/location',
'created_at' : 'created_at',
'retweet_count' : 'retweet_count',
'text' : 'text',
'longitude' : 'geo/coordinates/0',
'latitude' : 'geo/coordinates/1',
'retweeted' : 'retweeted'
}
#TODO - find a way to verify these fields match the table columns?
def drop_table_if_exists(table_name, db_conn):
cr = db_conn.cursor()
cr.execute("DROP TABLE IF EXISTS {0}".format(table_name))
def create_table_daniel(table_name, db_conn):
db_cursor = db_conn.cursor()
db_cursor.execute("CREATE TABLE {0} ("
"id int NOT NULL AUTO_INCREMENT,"
"message_id bigint(20) NOT NULL,"
"user_handle varchar(128),"
"user_id bigint(20),"
"user_location varchar(128),"
"created_at datetime,"
"retweet_count int,"
"text text CHARACTER SET utf8 COLLATE utf8_general_ci ,"
"longitude float, latitude float,"
"retweeted varchar(128) DEFAULT NULL,"
"PRIMARY KEY (id),"
"UNIQUE unique_message_user(message_id, user_handle),"
"KEY userindex (user_id))".format(table_name))
def create_table_generic_topsy(table_name, db_conn):
cr = db_conn.cursor()
cr.execute("CREATE TABLE {0} ("
"id int NOT NULL AUTO_INCREMENT,"
"message_id bigint(20) NOT NULL,"
"user_handle varchar(128),"
"orig_user_id bigint(20),"
"orig_user_handle varchar(128),"
"type varchar(128),"
"created_at datetime,"
"orig_date_posted datetime,"
"date_posted datetime,"
"message text CHARACTER SET utf8 COLLATE utf8_general_ci ,"
"orig_user_location varchar(128) DEFAULT NULL,"
"tweet_location varchar(128) DEFAULT NULL,"
"longitude float,latitude float,"
"PRIMARY KEY (id),"
"UNIQUE unique_message_user(message_id, user_handle),"
"KEY userindex (orig_user_id))".format(table_name))
#TODO - make create insert_records also
def insert_record(fields, table_name, db_conn):
'''
NOTE - the fields keys must correspond to the column names of the table
'''
columns = ','.join(fields.keys())
wildcards = ','.join(['%s']*len(fields))
values = fields.values()
insert_statement = "INSERT INTO {0} ({1}) values({2})".format(table_name, columns, wildcards)
db_cursor = db_conn.cursor()
try:
db_cursor.execute(insert_statement, values)
except Exception as e:
print "Error inserting data: {}{}".format(type(e), e)
def utc_from_dt(dt):
return int((dt - datetime(1970, 1, 1)).total_seconds())
def dt_from_utc(utc):
return datetime(1970, 1, 1) + timedelta(seconds=utc)
def topsy_api_call(params, endpoint="tweets"):
'''
params - dict - all the query params except for the apikey
'''
#add apikey to params
#collect api parameters except for auth key
##See API docs here - http://api.topsy.com/doc/resources/content/tweets/
params['apikey'] = "xxxxxxxx"
api_url = "http://api.topsy.com/v2/content/{}.json".format(endpoint)
return requests.get(api_url, params=params).text
def get_object_values(some_object, xpath_dict):
'''
:param some_object: a python object generated from json that represents a tweet
:param xpath_dict: a python dict with keys that represent data fields,
that point to xpaths that detail the data fields location in the object
:return: a dictionary where the xpaths have been replaced by values
'''
#TODO - put a warning for anytime a value is of type dict or array
res = {}
for (key, xpath) in xpath_dict.iteritems():
try:
res[key] = get_value_by_xpath(some_object, xpath)
except KeyError as e :
warnings.warn("value not found at {} for {}".format(key, xpath), RuntimeWarning)
res[key] = None
return res
def utc_to_mysql_date(utc_formatted_date):
temp = dt_from_utc(utc_formatted_date);
return str(time.strftime("%Y-%m-%d %H:%M:%S", temp.timetuple()))
def reformat_date_to_mysql_string(date_string, date_format):
'''
:param date_string: a string that represents a date
:param date_format: a string that represents the format of the date as per time.strftime
:return: a string that represents a date in mysql format
'''
temp = time.strptime(date_string, date_format)
return str(time.strftime("%Y-%m-%d %H:%M:%S", temp))
def topsy_to_mysql_date(topsy_formatted_date):
temp = time.strptime(topsy_formatted_date, '%a %b %d %H:%M:%S +0000 %Y')
return str(time.strftime("%Y-%m-%d %H:%M:%S", temp))
def topsy_post_processing(fields):
fields['created_at'] = topsy_to_mysql_date(fields['created_at'])
fields['orig_date_posted'] = utc_to_mysql_date(fields['orig_date_posted'])
fields['date_posted'] = utc_to_mysql_date(fields['date_posted'])
return fields
def flatten_tweets(tweet_objects, data_xpaths, post_processing_func):
tweet_objects_flat = []
for tweet_object in tweet_objects:
fields = get_object_values(tweet_object, data_xpaths)
tweet_objects_flat.append(post_processing_func(fields))
return tweet_objects_flat
def put_in_database_table(tweet_objects_flat, table, db_conn):
'''
tweets - python dict presumably generated from json that came from topsy request
table - table name
NOTE - table must already exist
'''
#connect to db table
cr = db_conn.cursor()
for tweet_object_flat in tweet_objects_flat:
columns = ','.join(tweet_object_flat.keys())
wildcards = ','.join(['%s']*len(tweet_object_flat))
values = tweet_object_flat.values()
insert_statement = "INSERT INTO {0} ({1}) values({2})".format(table, columns, wildcards)
try:
cr.execute(insert_statement, values)
except Exception as e:
print "Error inserting data: {}{}".format(type(e), e)
def get_time_ranges(date_first, date_last, interval):
'''
return appropriate time ranges for api calls
first_date - can be before or after last date
interval - adjust so that the tweets returned from each api call do not exceed 500 limit
must be positive
'''
going_backwards = date_last < date_first
#TODO - wrap this up into an iterator?
if going_backwards:
interval = -interval
date_min = date_last
date_max = date_first
date_apicall_min = date_max + interval
date_apicall_max = date_max - timedelta(seconds=1) #subtract 1 second in order to avoid overlap
else:
date_min = date_first
date_max = date_last
date_apicall_min = date_min
date_apicall_max = date_min + interval - timedelta(seconds=1)
assert date_min < date_max
assert date_apicall_min < date_apicall_max
while date_min <= date_apicall_min and date_apicall_max <= date_max:
yield date_apicall_min, date_apicall_max
date_apicall_min += interval
date_apicall_max += interval
def log_get_tweets(log_file, query_text, geocode_only, table_name, date_first, date_last, interval):
log_file.write(query_text + '\n')
log_file.write(str(geocode_only) + '\n')
log_file.write(table_name + '\n')
log_file.write(str(date_first) + '\n')
log_file.write(str(date_last) + '\n')
log_file.write(str(interval) + '\n')
def log_get_tweets_error_over_500(log_file, date_range):
log_file.write("Over 500 tweets at time {}\n".format(date_range))
def get_tweets(query_text, geocode_only, table_name, date_first, date_last, interval, db_conn):
'''
Get tweets using topsy API
query_text - eg 'diabetes OR insulin'
geocode_only - bool
table_name - output table
date_first - from when to start collecting tweets
date_last - from when to finish collecting tweets
interval - the amount of time for each api call.. adjust so that
500 tweet limit is never hit
'''
log_file_name = "get_tweets_{}_{}.log".format(table_name, int(time.time()))
log_file = open(log_file_name, 'w')
log_get_tweets(log_file, query_text, geocode_only, table_name, date_first, date_last, interval)
api_time_ranges = get_time_ranges(date_first, date_last, interval)
for time_range in api_time_ranges:
#collect api parameters except for auth key
##See API docs here - http://api.topsy.com/doc/resources/content/tweets/
params = {
'q':query_text,
'mintime':utc_from_dt(time_range[0]),
'maxtime':utc_from_dt(time_range[1]),
'limit':500,
'sort_by':'-date', #order by time, oldest first
'latlong': 1 if geocode_only else 0,
'include_enrichment_all':1
}
try:
response_json_text = topsy_api_call(params)
response_json = json.loads(response_json_text)
tweets = response_json['response']['results']['list']
print "Got {} tweets from timerange: {} to {}".format(len(tweets), time_range[0], time_range[1])
put_in_database_table(tweets, table_name, topsy_datafields, topsy_post_processing, db_conn)
except Exception as e:
print "An exception occurred: {}".format(e)
print "Trudging on..."
if len(tweets) >= 500:
log_get_tweets_error_over_500(log_file, time_range)
log_file.close()
#!/usr/bin/env python
import datetime
import weibo
import time, sys, re
import MySQLdb, pprint
from weibo import APIClient
import xml.etree.ElementTree as ET
from warnings import filterwarnings
filterwarnings('ignore', category = MySQLdb.Warning)
client = APIClient(3682690754, 'xxxxxxxxx', 'http://www.upenn.edu')
client.set_access_token('2.xxxxxx', xxxxxxxxx)
db = MySQLdb.connect(db = "randomWeibo", charset = 'utf8')
cur = db.cursor()
MAX_INSERT_ATTEMPS = 5
MAX_LIMIT = 2000
WEIBO_WAIT_TIME = 60*10
MySQLtable = """message_id bigint(20) primary key,
message text,
created_time datetime,
source varchar(128),
reposts_count int(8),
comments_count int(8),
attitudes_count int(8),
retweeted_message_id bigint(20),
in_reply_to_status_id bigint(20),
in_reply_to_user_id bigint(20),
message_geo varchar(128),
user_id bigint(20),
gender char(1),
followers_count int(8),
friends_count int(8),
bi_followers_count int(8),
statuses_count int(8),
lang char(5),
profile_created_time datetime,
location varchar(20),
province int(2),
city int(5),
verified int(1)"""
MySQLcols = [i.strip().split()[0] for i in MySQLtable.strip().split(',\n')]
MySQLtable = re.sub("\n"," ",MySQLtable)
cols_corresp = {"message_id": "mid",
"message": "text",
"created_time": "created_at",
"source": "source",
"reposts_count": "reposts_count",
"comments_count": "comments_count",
"attitudes_count": "attitudes_count",
"retweeted_message_id": "retweeted_status.mid",
"in_reply_to_status_id": "in_reply_to_status_id",
"in_reply_to_user_id": "in_reply_to_user_id",
"message_geo": "geo",
"user_id": "user.idstr",
"gender": "user.gender",
"followers_count": "user.followers_count",
"friends_count": "user.friends_count",
"bi_followers_count": "user.bi_followers_count",
"statuses_count": "user.statuses_count",
"lang": "user.lang",
"profile_created_time": "user.created_at",
"location": "user.location",
"province": "user.province",
"city": "user.city",
"verified": "user.verified"
}
prevMonth = -1;
# Todos:
# - message table needs to be called something like "messages_2014_6"
# - messages need to be sorted per month, not enough to check what month it is when inserting
# - make use of "dictionary", "eval" when doing cleanup (i.e. when turning json weibo into row format)
# - make sure it runs even when encountering errors
def _connect():
db = MySQLdb.connect(db = "randomWeibo", charset = 'utf8')
cur = db.cursor()
return cur
def _execute(cur, query, verbose = True, attempt = 0):
if attempt >= 5:
print "This SQL query yielded too many errors, moving on (some data may have been lost)"
return
if verbose:
print "SQL:\t%s" % query[:200]
ret = None
try:
ret = cur.execute(query)
except Exception as e:
print "Error encountered: %s" % str(e), "[attempt: %d]" % attempt
ret = _execute(_connect(), query, False, attempt + 1)
return ret
def _executemany(cur, query, values, verbose = True, attempt = 0):
if attempt >= 5:
print "This SQL query yielded too many errors, moving on (some data may have been lost)"
return
if verbose:
print "SQL:\t%s" % query[:200]
ret = None
try:
ret = cur.executemany(query, values)
except Exception as e:
print "Error encountered: %s" % str(e), "[attempt: %d]" % attempt
ret = _executemany(_connect(), query, values, False, attempt + 1)
return ret
def wait(s):
for i in xrange(s):
print "\r\t%s left to wait" % datetime.timedelta(seconds = (s-i)),
sys.stdout.flush()
time.sleep(1)
print
def weiboTimeToMysql(timestr):
#Mon Jan 25 05:02:27 +0800 2010
return str(time.strftime("%Y-%m-%d %H:%M:%S", time.strptime(timestr, '%a %b %d %H:%M:%S +0800 %Y')))
def weiboMonth(timestr):
t = time.strptime(timestr, '%a %b %d %H:%M:%S +0800 %Y')
return time.strftime('%Y_%m',t)
def linearizeWeibo(jWeibo):
weibo = []
for myCol in MySQLcols:
item = None
try:
item = eval("jWeibo.%s" % cols_corresp[myCol])
item = item.encode('utf8')
if item == '':
item = None
if "time" in myCol:
item = weiboTimeToMysql(item)
if "source" in myCol and item:
try:
item = ET.fromstring(item).text
except Exception as e:
print "Parsing error on item: '%s'? [%s]" % (item,str(e))
except AttributeError as e:
pass
weibo.append(item)
return weibo
def createNewMySQLtable(tableName):
query = "create table %s (%s) engine=MyISAM" % (tableName, MySQLtable + ", index datedex (created_time)")
_execute(cur, query)
def insertWeibos(weiboDict, nbAttempts = 0):
# weiboDict = {"2014_06" : list of weibo rows}
if nbAttempts >= MAX_INSERT_ATTEMPS:
print "There was a problem with inserting rows, there will be some rows ignored."
return
for monthYear, weibos in weiboDict.iteritems():
# Check that monthly table exists
_execute(cur, "show tables like 'messages_%s'" % monthYear, False)
if len(cur.fetchall()) == 0:
# Create table if it doesn't exist
createNewMySQLtable("messages_%s" % monthYear)
# Disable keys for faster insertion
_execute(cur, "alter table messages_%s disable keys" % monthYear, False)
# Inserting rows
query = "REPLACE INTO messages_"+monthYear+" VALUES ("+", ".join("%s" for i in MySQLcols)+")"
print "SQL:\t%s" % query[:200]
try:
cur.executemany(query, weibos)
except Exception as e:
print "Oops, error when inserting:", e, "- retrying, attempt number %d" % (nbAttempts + 1)
insertWeibos(weiboDict, nbAttempts + 1)
# Reenable keys after insertion
_execute(cur, "alter table messages_%s enable keys" % monthYear, False)
# Confirm insertion
print "Sucessfully inserted %d weibos into 'messages_%s' [%s]" % (len(weibos), monthYear, time.strftime("%c"))
counter = 0
weibos = dict()
while True :
r = None
try:
r = client.statuses.public_timeline.get()
except weibo.APIError as e:
if ("error_code" in dir(e) and e.error_code == 10023) or "rate limit" in str(e):
# Rate Limit Reached
print "\rRate limit reached"
print "\rERROR: Weibo's out [%s], waiting %s, but saving all Weibos first (if there are any)" % (time.strftime("%c"), datetime.timedelta(seconds = WEIBO_WAIT_TIME))
if weibos and len(weibos.items()[0][1]):
insertWeibos(weibos)
weibos = dict()
counter = 0
wait(WEIBO_WAIT_TIME)
print "Time's up, trying again!"
except Exception as e:
# other error?
print "\rERROR: Encountered some unknown error [%s], waiting 2 seconds" % str(e)
time.sleep(2)
if not r: continue
for st in r.statuses:
weibo_month = weiboMonth(st.created_at)
lWeibo = linearizeWeibo(st)
if not lWeibo[0]: continue
try:
weibos[weibo_month].append(lWeibo)
if st.has_key("retweeted_status"):
try:
weibos[weiboMonth(st.retweeted_status.created_at)].append(linearizeWeibo(st.retweeted_status))
except KeyError:
weibos[weiboMonth(st.retweeted_status.created_at)] = [linearizeWeibo(st.retweeted_status)]
counter += 1
except KeyError:
weibos[weibo_month] = [lWeibo]
counter += 1
if counter % 9 == 0 :
print "\rGrabbed %d weibos" % (counter+1),
sys.stdout.flush()
if counter >= MAX_LIMIT:
print "\nReached limit, inserting %d weibos" % MAX_LIMIT
insertWeibos(weibos)
weibos = dict()
counter = 0
#! /usr/bin/python
__author__ = "Maarten Sap"
__email__ = "maartensap93@gmail.com"
__version__ = "0.3"
"""
TODOs:
pull retweeted message as well as original
Command Line interface
better geolocation?
"""
import datetime, time
import os, sys
import json, re
import MySQLdb
from TwitterAPI import TwitterAPI
from requests.exceptions import ChunkedEncodingError
import xml.etree.ElementTree as ET
from HTMLParser import HTMLParser
MAX_MYSQL_ATTEMPTS = 5
MAX_TWITTER_ATTEMPTS = 5
TWEET_LIMIT_BEFORE_INSERT = 100 #edited by selah
TWT_REST_WAIT = 15*60
DEFAULT_MYSQL_COL_DESC = ["user_id bigint(20)", "message_id bigint(20) primary key",
"message text", "created_time datetime",
"in_reply_to_message_id bigint(20)",
"in_reply_to_user_id bigint(20)", "retweet_message_id bigint(20)",
"source varchar(128)", "lang varchar(4)", "time_zone varchar(64)",
"friend_count int(6)", "followers_count int(6)",
"user_location varchar(200)", "tweet_location varchar(200)", "coordinates varchar(128)",
"coordinates_address varchar(64)", "coordinates_state varchar(3)",
"index useriddex (user_id)", "index datedex (created_time)"]
DEFAULT_TWEET_JSON_SQL_CORR = {'id': "['id_str']",
'message_id': "['id_str']",
'message': "['text']",
'created_time': "['created_at']",
'user_id': "['user']['id_str']",
'in_reply_to_message_id': "['in_reply_to_status_id_str']",
'in_reply_to_user_id': "['in_reply_to_user_id_str']",
'retweet_message_id': "['retweeted_status']['id']",
'user_location': "['user']['location']",
'tweet_location': "['place']['full_name']",
'friend_count': "['user']['friends_count']",
'followers_count': "['user']['followers_count']",
'time_zone': "['user']['id_str']",
'lang': "['lang']",
'source': "['source']",
}
class TwitterMySQL:
"""Wrapper for the integration of Twitter APIs into MySQL
Turns JSON tweets into row format
Failsafe connection to MySQL servers
Geolocates if tweet contains coordinates in the US
[TODO] Geolocates using the Google Maps API
"""
def _warn(self, *objs):
errorStream = open(self.errorFile, "a+") if self.errorFile else sys.stderr
print >> errorStream, "\rWARNING: ", " ".join(str(o) for o in objs)
def __init__(self, **kwargs):
"""
Required parameters:
- db MySQL database to connect to
- table table to insert Twitter responses in
- API_KEY Twitter API key (to connect to Twitter)
- API_SECRET Twitter API Secret
- ACCESS_TOKEN Twitter App Access token
- ACCESS_SECRET Twitter App Access token secret
Optional parameters:
- noWarnings disable MySQL warnings [Default: False]
- dropIfExists set to True to delete the existing table
- geoLocate a function that converts coordinates to state
and/or address.
Format:
(state, address) = your_method(lat, long)
- errorFile error logging file - warnings will be written to it
[Default: stderr]
- jTweetToRow JSON tweet to MySQL row tweet correspondence
(see help file for more info)
[Default: DEFAULT_TWEET_JSON_SQL_CORR]
- SQLfieldsExp SQL column description for MySQL table
[Default: DEFAULT_MYSQL_COL_DESC]
- host host where the MySQL database is on
[Default: localhost]
- any other MySQL.connect argument
"""
if "table" in kwargs:
self.table = kwargs["table"]
del kwargs["table"]
else:
raise ValueError("Table name missing")
if "dropIfExists" in kwargs:
self.dropIfExists = kwargs["dropIfExists"]
del kwargs["dropIfExists"]
else:
self.dropIfExists = False
if "geoLocate" in kwargs:
self.geoLocate = kwargs["geoLocate"]
del kwargs["geoLocate"]
else:
self.geoLocate = None
if "noWarnings" in kwargs and kwargs["noWarnings"]:
del kwargs["noWarnings"]
from warnings import filterwarnings
filterwarnings('ignore', category = MySQLdb.Warning)
if "errorFile" in kwargs:
self.errorFile = kwargs["errorFile"]
del kwargs["errorFile"]
else:
self.errorFile = None
if "jTweetToRow" in kwargs:
self.jTweetToRow = kwargs["jTweetToRow"]
del kwargs["jTweetToRow"]
else:
self.jTweetToRow = DEFAULT_TWEET_JSON_SQL_CORR
if "fields" in kwargs and "SQLfieldsExp" in kwargs:
# Fields from the JSON Tweet to pull out
self.columns = kwargs["fields"]
del kwargs["fields"]
self.columns_description = kwargs["SQLfieldsExp"]
del kwargs["SQLfieldsExp"]
if len([f for f in self.columns_description if "index" != f[:5]]) != len(self.columns):
raise ValueError("There was a mismatch between the number of columns in the 'fields' and the 'field_expanded' variable. Please check those and try again.")
elif "fields" in kwargs:
raise ValueError("Please provide a detailed MySQL column description of the fields you want grabbed. (keyword argument: 'SQLfieldsExp')")
elif "SQLfieldsExp" in kwargs:
self.columns_description = kwargs["SQLfieldsExp"]
del kwargs["SQLfieldsExp"]
self.columns = [f.split(' ')[0]
for f in self.columns_description
if f.split(' ')[0][:5] != "index"]
else:
self.columns_description = DEFAULT_MYSQL_COL_DESC
self.columns = [f.split(' ')[0]
for f in self.columns_description
if f.split(' ')[0][:5] != "index"]
if "api" in kwargs:
self._api = kwargs["api"]
del kwargs["api"]
elif ("API_KEY" in kwargs and
"API_SECRET" in kwargs and
"ACCESS_TOKEN" in kwargs and
"ACCESS_SECRET" in kwargs):
self._api = TwitterAPI(kwargs["API_KEY"], kwargs["API_SECRET"], kwargs["ACCESS_TOKEN"], kwargs["ACCESS_SECRET"])
del kwargs["API_KEY"], kwargs["API_SECRET"], kwargs["ACCESS_TOKEN"], kwargs["ACCESS_SECRET"]
else:
raise ValueError("TwitterAPI object or API_KEY, API_SECRET, ACCESS_TOKEN, ACCESS_SECRET needed to connect to Twitter. Please see dev.twitter.com for the keys.")
if not "charset" in kwargs:
kwargs["charset"] = 'utf8'
try:
self._connect(kwargs)
except TypeError as e:
print "You're probably using the wrong keywords, here's a list:\n"+self.__init__.__doc__
raise TypeError(e)
def _connect(self, kwargs = None):
"""Connecting to MySQL sometimes has to be redone"""
if kwargs:
self._SQLconnectKwargs = kwargs
elif not kwargs and self._SQLconnectKwargs:
kwargs = self._SQLconnectKwargs
self._connection = MySQLdb.connect(**kwargs)
self.cur = self._connection.cursor()
def _wait(self, t, verbose = True):
"""Wait function, offers a nice countdown"""
for i in xrange(t):
if verbose:
print "\rDone waiting in: %s" % datetime.timedelta(seconds=(t-i)),
sys.stdout.flush()
time.sleep(1)
if verbose:
print "\rDone waiting! "
def _execute(self, query, nbAttempts = 0, verbose = True):
if nbAttempts >= MAX_MYSQL_ATTEMPTS:
self._warn("Too many attempts to execute the query, moving on from this [%s]" % query[:300])
return 0
if verbose: print "SQL:\t%s" % query[:200]
try:
ret = self.cur.execute(query)
except Exception as e:
if "MySQL server has gone away" in str(e):
self._connect()
nbAttempts += 1
if not verbose: print "SQL:\t%s" % query[:200]
self._warn("%s [Attempt: %d]" % (str(e), nbAttempts))
self._wait(nbAttempts * 2)
ret = self._execute(query, nbAttempts, False)
return ret
def _executemany(self, query, values, nbAttempts = 0, verbose = True):
if nbAttempts >= MAX_MYSQL_ATTEMPTS:
self._warn("Too many attempts to execute the query, moving on from this [%s]" % query[:300])
return 0
if verbose: print "SQL:\t%s" % query[:200]
ret = None
try:
ret = self.cur.executemany(query, values)
except Exception as e:
if "MySQL server has gone away" in str(e):
self._connect()
nbAttempts += 1
if not verbose: print "SQL:\t%s" % query[:200]
self._warn("%s [Attempt: %d]" % (str(e), nbAttempts))
self._wait(nbAttempts * 2)
ret = self._executemany(query, values, nbAttempts, False)
return ret
def createTable(self, table = None):
"""
Creates the table specified during __init__().
By default, the table will be deleted if it already exists,
but there will be a 10 second grace period for the user
to cancel the deletion (by hitting CTRL-c).
To disable the grace period and have it be deleted immediately,
please use dropIfExists = True during construction
"""
table = self.table if not table else table
# Checking if table exists
SQL = """show tables like '%s'""" % table
self._execute(SQL)
SQL = """create table %s (%s) character set utf-8 collate utf8_general_ci """ % (table, ', '.join(self.columns_description))
if not self.cur.fetchall():
# Table doesn't exist
self._execute(SQL)
else:
# table does exist
if not self.dropIfExists:
USER_DELAY = 10
for i in xrange(USER_DELAY):
print "\rTable %s already exists, it will be deleted in %s, please hit CTRL-C to cancel the deletion" % (table, datetime.timedelta(seconds=USER_DELAY-i)),
sys.stdout.flush()
time.sleep(1)
print "\rTable %s already exists, it will be deleted" % table, " " * 150
SQL_DROP = """drop table %s""" % table
self._execute(SQL_DROP)
self._execute(SQL)
def insertRow(self, row, table = None, columns = None, verbose = True):
"""Inserts a row into the table specified using an INSERT SQL statement"""
return self.insertRows([row], table, columns, verbose)
def replaceRow(self, row, table = None, columns = None, verbose = True):
"""Inserts a row into the table specified using a REPLACE SQL statement."""
return self.replaceRows([row], table, columns, verbose)
def insertRows(self, rows, table = None, columns = None, verbose = True):
"""Inserts multiple rows into the table specified using an INSERT SQL statement
:param rows: list - list of rows, each row is itself a list with each column corresponding to a field
"""
table = self.table if not table else table
columns = self.columns if not columns else columns
EXISTS = "SHOW TABLES LIKE '%s'" % table
if not self._execute(EXISTS, verbose = False): self.createTable(table)
SQL = "INSERT INTO %s (%s) VALUES (%s)" % (table,
', '.join(columns),
', '.join("%s" for r in rows[0]))
return self._executemany(SQL, rows, verbose = verbose)
def replaceRows(self, rows, table = None, columns = None, verbose = True):
"""Inserts multiple rows into the table specified using a REPLACE SQL statement"""
table = self.table if not table else table
columns = self.columns if not columns else columns
EXISTS = "SHOW TABLES LIKE '%s'" % table
if not self._execute(EXISTS, verbose = False): self.createTable(table)
SQL = "REPLACE INTO %s (%s) VALUES (%s)" % (table,
', '.join(columns),
', '.join("%s" for r in rows[0]))
return self._executemany(SQL, rows, verbose = verbose)
#modify this as necessaary if I get a time in a format different than what is expected
def _tweetTimeToMysql(self, timestr, parseFormat = '%a %b %d %H:%M:%S +0000 %Y'):
# Mon Jan 25 05:02:27 +0000 2010
return str(time.strftime("%Y-%m-%d %H:%M:%S", time.strptime(timestr, parseFormat)))
def _yearMonth(self, mysqlTime):
return time.strftime("%Y_%m",time.strptime(mysqlTime,"%Y-%m-%d %H:%M:%S"))
def _prepTweet(self, jTweet):
"""
:param jTweet: dict - dictionary representation of one tweet
:returns: list - each element corresponding to a field in a database table row
"""
tweet = {}
for SQLcol in self.columns:
try:
if SQLcol in self.jTweetToRow:
tweet[SQLcol] = eval("jTweet%s" % self.jTweetToRow[SQLcol])
if isinstance(tweet[SQLcol], str) or isinstance(tweet[SQLcol], unicode):
tweet[SQLcol] = HTMLParser().unescape(tweet[SQLcol]).encode("utf-8")
if SQLcol == "created_time":
tweet[SQLcol] = self._tweetTimeToMysql(tweet[SQLcol])
if SQLcol == "source":
try:
tweet[SQLcol] = ET.fromstring(re.sub("&", "&amp;", tweet[SQLcol])).text
except Exception as e:
raise NotImplementedError("OOPS", type(e), e, [tweet[SQLcol]])
else:
tweet[SQLcol] = None
except KeyError:
tweet[SQLcol] = None
if not any(tweet.values()):
raise NotImplementedError("OOPS", jTweet, tweet)
# Coordinates state and address
if "coordinates" in jTweet and jTweet["coordinates"]:
lon, lat = map(lambda x: float(x), jTweet["coordinates"]["coordinates"])
if self.geoLocate:
(state, address) = self.geoLocate(lat, lon)
else:
(state, address) = (None, None)
tweet["coordinates"] = str(jTweet["coordinates"]["coordinates"])
tweet["coordinates_state"] = str(state) if state else None
tweet["coordinates_address"] = str(address) if address else str({"lon": lon, "lat": lat})
# Tweet is dictionary of depth one, now has to be linearized
tweet = [tweet[SQLcol] for SQLcol in self.columns]
return tweet
def _apiRequest(self, twitterMethod, params):
done = False
nbAttempts = 0
while not done and nbAttempts < MAX_TWITTER_ATTEMPTS:
try:
r = self._api.request(twitterMethod, params)
except Exception as e:
# If the request doesn't work
if "timed out" in str(e).lower():
self._warn("Time out encountered, reconnecting immediately.")
self._wait(1, False)
else:
self._warn("Unknown error encountered: [%s]" % str(e))
self._wait(10)
nbAttempts += 1
continue
# Request was successful in terms of http connection
try:
for i, response in enumerate(r.get_iterator()):
# Checking for error messages
if isinstance(response, int) or "delete" in response:
continue
if i == 0 and "message" in response and "code" in response:
if response['code'] == 88: # Rate limit exceeded
self._warn("Rate limit exceeded, waiting 15 minutes before a restart")
self._wait(TWT_REST_WAIT)
else:
self._warn("Error message received from Twitter %s" % str(response))
continue
yield self._prepTweet(response)
done = True
except ChunkedEncodingError as e:
# nbAttempts += 1
self._warn("ChunkedEncodingError encountered, reconnecting immediately: [%s]" % e)
continue
except Exception as e:
nbAttempts += 1
self._warn("unknown exception encountered, waiting %d second: [%s]" % (nbAttempts * 2, str(e)))
self._wait(nbAttempts * 2)
continue
# If it makes it all the way here, there was no error encountered
nbAttempts = 0
if nbAttempts >= MAX_TWITTER_ATTEMPTS:
self._warn("Request attempted too many times (%d), it will not be executed anymore [%s]" % (nbAttempts, twitterMethod + str(params)))
return
def apiRequest(self, twitterMethod, **params):
"""
Takes in a Twitter API request and yields formatted responses in return
Use as follows:
for tweet in twtSQL.apiRequest('statuses/filter', track="Twitter API"):
print tweet
For more info (knowing which twitterMethod to use) see:
http://dev.twitter.com/rest/public
http://dev.twitter.com/streaming/overview
"""
for response in self._apiRequest(twitterMethod, params):
yield response
def _tweetsToMySQL(self, tweetsYielder, replace = False, monthlyTables = False):
"""
Tool function to insert tweets into MySQL tables in chunks,
while outputting counts.
NOTE - the 4th column must be a date or else this function will throw an exception
"""
tweetsDict = {}
i = 0
# TWEET_LIMIT_BEFORE_INSERT = 100
for tweet in tweetsYielder:
i += 1
try:
#TODO - this should not be hardcoded, change to allow for columns
tweetsDict[self._yearMonth(tweet[3])].append(tweet)
except KeyError:
tweetsDict[self._yearMonth(tweet[3])] = [tweet]
if i % 10 == 0:
print "\rNumber of tweets grabbed: %d" % i,
sys.stdout.flush()
if i % TWEET_LIMIT_BEFORE_INSERT == 0:
print
if monthlyTables:
for yearMonth, twts in tweetsDict.iteritems():
table = self.table+"_"+yearMonth
if replace:
print "Sucessfully replaced %4d tweets into '%s' (%4d rows affected) [%s]" % (i, table, self.replaceRows(twts, table = table, verbose = False), time.strftime("%c"))
else:
print "Sucessfully inserted %4d tweets into '%s' [%s]" % (self.insertRows(twts, table = table, verbose = False), table, time.strftime("%c"))
else:
tweets = [twt for twts in tweetsDict.values() for twt in twts]
if replace:
print "Sucessfully replaced %4d tweets into '%s' (%4d rows affected) [%s]" % (i, self.table, self.replaceRows(tweets, verbose = False), time.strftime("%c"))
else:
print "Sucessfully inserted %4d tweets into '%s' [%s]" % (self.insertRows(tweets, verbose = False), self.table, time.strftime("%c"))
i, tweetsDict = (0, {})
# If there are remaining tweets
if any(tweetsDict.values()):
print
if monthlyTables:
for yearMonth, twts in tweetsDict.iteritems():
table = self.table+"_"+yearMonth
if replace:
print "Sucessfully replaced %4d tweets into '%s' (%4d rows affected) [%s]" % (i, table, self.replaceRows(twts, table = table, verbose = False), time.strftime("%c"))
else:
print "Sucessfully inserted %4d tweets into '%s' [%s]" % (self.insertRows(twts, table = table, verbose = False), table, time.strftime("%c"))
else:
tweets = [twt for twts in tweetsDict.values() for twt in twts]
if replace:
print "Sucessfully replaced %4d tweets into '%s' (%4d rows affected) [%s]" % (i, self.table, self.replaceRows(tweets, verbose = False), time.strftime("%c"))
else:
print "Sucessfully inserted %4d tweets into '%s' [%s]" % (self.insertRows(tweets, verbose = False), self.table, time.strftime("%c"))
i, tweetsDict = (0, {})
def tweetsToMySQL(self, twitterMethod, **params):
"""
Ultra uber awesome function that takes in a Twitter API
request and inserts it into MySQL, all in one call
Here's some examples on how to use it:
For the Search API
twtSQL.tweetsToMySQL('search/tweets', q='"Taylor Swift" OR "Jennifer Lawrence"')
For hydrating (getting all available details) for a tweet
twtSQL.tweetsToMySQL('statuses/lookup', id="504710715954188288")
For more twitterMethods and info on how to use them, see:
http://dev.twitter.com/rest/public
http://dev.twitter.com/streaming/overview
"""
# Replace SQL command instead of insert
if "replace" in params:
replace = params["replace"]
del params["replace"]
else:
replace = False
if "monthlyTables" in params:
monthlyTables = params["monthlyTables"]
del params["monthlyTables"]
else:
monthlyTables = False
self._tweetsToMySQL(self._apiRequest(twitterMethod, params), replace = replace, monthlyTables = monthlyTables)
def randomSampleToMySQL(self, replace = False, monthlyTables = True):
"""
Takes the random sample of all tweets (~ 1%) and
inserts it into monthly table [tableName_20YY_MM].
For more info, see:
http://dev.twitter.com/streaming/reference/get/statuses/sample
"""
self.tweetsToMySQL('statuses/sample', replace = replace, monthlyTables = monthlyTables)
def filterStreamToMySQL(self, **params):
"""
Use this to insert the tweets from the FilterStream into MySQL
Here's an example:
twtSQL.filterStreamToMySQL(track="Taylor Swift")
Here's a second example (Continental US bounding box):
twtSQL.filterStreamToMySQL(locations="-124.848974,24.396308,-66.885444,49.384358")
More info here:
http://dev.twitter.com/streaming/reference/post/statuses/filter
"""
self.tweetsToMySQL('statuses/filter', **params)
def userTimeline(self, **params):
"""
For a given user, returns all the accessible tweets from that user,
starting with the most recent ones (Twitter imposes a 3200 tweet limit).
Here's an example of how to use it:
for tweet in userTimeline(screen_name = "taylorswift13"):
print tweet
See http://dev.twitter.com/rest/reference/get/statuses/user_timeline for details
"""
ok = True
print "Finding tweets for %s" % ', '.join(str(k)+': '+str(v) for k,v in params.iteritems())
params["count"] = 200 # Twitter limits to 200 returns
i = 0
while ok:
tweets = [tweet for tweet in self._apiRequest('statuses/user_timeline', params)]
if not tweets:
# Warn about no tweets?
ok = False
if i != 0: print
else:
i += len(tweets)
print "\rNumber of tweets grabbed: %d" % i,
sys.stdout.flush()
params["max_id"] = str(long(tweets[-1][1])-1)
for tweet in tweets:
yield tweet
def userTimelineToMySQL(self, **params):
"""
For a given user, inserts all the accessible tweets from that user into,
the current table. (Twitter imposes a 3200 tweet limit).
Here's an example of how to use it:
userTimelineToMySQL(screen_name = "taylorswift13")
For details on keywords to use, see
http://dev.twitter.com/rest/reference/get/statuses/user_timeline
"""
print "Grabbing users tweets and inserting into MySQL"
# Replace SQL command instead of insert
if "replace" in params:
replace = params["replace"]
del params["replace"]
else:
replace = False
if "monthlyTables" in params:
monthlyTables = params["monthlyTables"]
del params["monthlyTables"]
else:
monthlyTables = False
self._tweetsToMySQL(self.userTimeline(**params), replace = replace, monthlyTables = monthlyTables)
def search(self, **params):
"""
Search API
"""
ok = True
print "Finding tweets for %s" % ', '.join(str(k)+': '+str(v) for k,v in params.iteritems())
params["count"] = 200 # Twitter limits to 200 returns
i = 0
while ok:
tweets = [tweet for tweet in self._apiRequest('search/tweets', params)]
if not tweets:
# Warn about no tweets?
ok = False
if i != 0: print
else:
i += len(tweets)
print "\rNumber of tweets grabbed: %d" % i,
sys.stdout.flush()
params["max_id"] = str(long(tweets[-1][1])-1)
for tweet in tweets:
yield tweet
def searchToMySQL(self, **params):
"""
Queries the Search API and pulls as many results as possible
Here's an example of how to use it:
userTimelineToMySQL(screen_name = "taylorswift13")
For details on keywords to use, see
http://dev.twitter.com/rest/reference/get/statuses/user_timeline
"""
print "Grabbing users tweets and inserting into MySQL"
# Replace SQL command instead of insert
if "replace" in params:
replace = params["replace"]
del params["replace"]
else:
replace = False
if "monthlyTables" in params:
monthlyTables = params["monthlyTables"]
del params["monthlyTables"]
else:
monthlyTables = False
self._tweetsToMySQL(self.search(**params), replace = replace, monthlyTables = monthlyTables)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment