Last active
August 29, 2015 14:13
-
-
Save selahlynch/4e9a39b53a8b2465b86f to your computer and use it in GitHub Desktop.
social media posts gathering
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# get_tweets.py | |
# a collection of useful functions for getting tweets from the topsy api | |
# and getting them into a mysql database | |
# required packages: | |
# pip install requests | |
# pip install MySQL-python | |
from datetime import datetime, timedelta | |
import requests | |
import json | |
from xpath_json.xpath_json import get_value_by_xpath | |
import time | |
import warnings | |
topsy_datafields = { | |
'message_id' : 'tweet/id', | |
'user_handle' : 'author/nick', | |
'orig_user_id' : 'tweet/user/id', | |
'orig_user_handle' : 'tweet/user/screen_name', | |
'orig_date_posted' : 'firstpost_date', | |
'date_posted' : 'citation_date', | |
'created_at' : 'tweet/created_at', | |
'message' : 'tweet/text', | |
'orig_user_location' : 'tweet/user/location', | |
'tweet_location' : 'tweet/place/full_name', | |
'longitude' : 'tweet/coordinates/coordinates/0', | |
'latitude' : 'tweet/coordinates/coordinates/1', | |
'type' : 'type' | |
} | |
data_locations_daniel = { | |
'message_id' : 'id', | |
'user_handle' : 'user/screen_name', | |
'user_id' : 'user/id', | |
'user_location' : 'user/location', | |
'created_at' : 'created_at', | |
'retweet_count' : 'retweet_count', | |
'text' : 'text', | |
'longitude' : 'geo/coordinates/0', | |
'latitude' : 'geo/coordinates/1', | |
'retweeted' : 'retweeted' | |
} | |
#TODO - find a way to verify these fields match the table columns? | |
def drop_table_if_exists(table_name, db_conn): | |
cr = db_conn.cursor() | |
cr.execute("DROP TABLE IF EXISTS {0}".format(table_name)) | |
def create_table_daniel(table_name, db_conn): | |
db_cursor = db_conn.cursor() | |
db_cursor.execute("CREATE TABLE {0} (" | |
"id int NOT NULL AUTO_INCREMENT," | |
"message_id bigint(20) NOT NULL," | |
"user_handle varchar(128)," | |
"user_id bigint(20)," | |
"user_location varchar(128)," | |
"created_at datetime," | |
"retweet_count int," | |
"text text CHARACTER SET utf8 COLLATE utf8_general_ci ," | |
"longitude float, latitude float," | |
"retweeted varchar(128) DEFAULT NULL," | |
"PRIMARY KEY (id)," | |
"UNIQUE unique_message_user(message_id, user_handle)," | |
"KEY userindex (user_id))".format(table_name)) | |
def create_table_generic_topsy(table_name, db_conn): | |
cr = db_conn.cursor() | |
cr.execute("CREATE TABLE {0} (" | |
"id int NOT NULL AUTO_INCREMENT," | |
"message_id bigint(20) NOT NULL," | |
"user_handle varchar(128)," | |
"orig_user_id bigint(20)," | |
"orig_user_handle varchar(128)," | |
"type varchar(128)," | |
"created_at datetime," | |
"orig_date_posted datetime," | |
"date_posted datetime," | |
"message text CHARACTER SET utf8 COLLATE utf8_general_ci ," | |
"orig_user_location varchar(128) DEFAULT NULL," | |
"tweet_location varchar(128) DEFAULT NULL," | |
"longitude float,latitude float," | |
"PRIMARY KEY (id)," | |
"UNIQUE unique_message_user(message_id, user_handle)," | |
"KEY userindex (orig_user_id))".format(table_name)) | |
#TODO - make create insert_records also | |
def insert_record(fields, table_name, db_conn): | |
''' | |
NOTE - the fields keys must correspond to the column names of the table | |
''' | |
columns = ','.join(fields.keys()) | |
wildcards = ','.join(['%s']*len(fields)) | |
values = fields.values() | |
insert_statement = "INSERT INTO {0} ({1}) values({2})".format(table_name, columns, wildcards) | |
db_cursor = db_conn.cursor() | |
try: | |
db_cursor.execute(insert_statement, values) | |
except Exception as e: | |
print "Error inserting data: {}{}".format(type(e), e) | |
def utc_from_dt(dt): | |
return int((dt - datetime(1970, 1, 1)).total_seconds()) | |
def dt_from_utc(utc): | |
return datetime(1970, 1, 1) + timedelta(seconds=utc) | |
def topsy_api_call(params, endpoint="tweets"): | |
''' | |
params - dict - all the query params except for the apikey | |
''' | |
#add apikey to params | |
#collect api parameters except for auth key | |
##See API docs here - http://api.topsy.com/doc/resources/content/tweets/ | |
params['apikey'] = "xxxxxxxx" | |
api_url = "http://api.topsy.com/v2/content/{}.json".format(endpoint) | |
return requests.get(api_url, params=params).text | |
def get_object_values(some_object, xpath_dict): | |
''' | |
:param some_object: a python object generated from json that represents a tweet | |
:param xpath_dict: a python dict with keys that represent data fields, | |
that point to xpaths that detail the data fields location in the object | |
:return: a dictionary where the xpaths have been replaced by values | |
''' | |
#TODO - put a warning for anytime a value is of type dict or array | |
res = {} | |
for (key, xpath) in xpath_dict.iteritems(): | |
try: | |
res[key] = get_value_by_xpath(some_object, xpath) | |
except KeyError as e : | |
warnings.warn("value not found at {} for {}".format(key, xpath), RuntimeWarning) | |
res[key] = None | |
return res | |
def utc_to_mysql_date(utc_formatted_date): | |
temp = dt_from_utc(utc_formatted_date); | |
return str(time.strftime("%Y-%m-%d %H:%M:%S", temp.timetuple())) | |
def reformat_date_to_mysql_string(date_string, date_format): | |
''' | |
:param date_string: a string that represents a date | |
:param date_format: a string that represents the format of the date as per time.strftime | |
:return: a string that represents a date in mysql format | |
''' | |
temp = time.strptime(date_string, date_format) | |
return str(time.strftime("%Y-%m-%d %H:%M:%S", temp)) | |
def topsy_to_mysql_date(topsy_formatted_date): | |
temp = time.strptime(topsy_formatted_date, '%a %b %d %H:%M:%S +0000 %Y') | |
return str(time.strftime("%Y-%m-%d %H:%M:%S", temp)) | |
def topsy_post_processing(fields): | |
fields['created_at'] = topsy_to_mysql_date(fields['created_at']) | |
fields['orig_date_posted'] = utc_to_mysql_date(fields['orig_date_posted']) | |
fields['date_posted'] = utc_to_mysql_date(fields['date_posted']) | |
return fields | |
def flatten_tweets(tweet_objects, data_xpaths, post_processing_func): | |
tweet_objects_flat = [] | |
for tweet_object in tweet_objects: | |
fields = get_object_values(tweet_object, data_xpaths) | |
tweet_objects_flat.append(post_processing_func(fields)) | |
return tweet_objects_flat | |
def put_in_database_table(tweet_objects_flat, table, db_conn): | |
''' | |
tweets - python dict presumably generated from json that came from topsy request | |
table - table name | |
NOTE - table must already exist | |
''' | |
#connect to db table | |
cr = db_conn.cursor() | |
for tweet_object_flat in tweet_objects_flat: | |
columns = ','.join(tweet_object_flat.keys()) | |
wildcards = ','.join(['%s']*len(tweet_object_flat)) | |
values = tweet_object_flat.values() | |
insert_statement = "INSERT INTO {0} ({1}) values({2})".format(table, columns, wildcards) | |
try: | |
cr.execute(insert_statement, values) | |
except Exception as e: | |
print "Error inserting data: {}{}".format(type(e), e) | |
def get_time_ranges(date_first, date_last, interval): | |
''' | |
return appropriate time ranges for api calls | |
first_date - can be before or after last date | |
interval - adjust so that the tweets returned from each api call do not exceed 500 limit | |
must be positive | |
''' | |
going_backwards = date_last < date_first | |
#TODO - wrap this up into an iterator? | |
if going_backwards: | |
interval = -interval | |
date_min = date_last | |
date_max = date_first | |
date_apicall_min = date_max + interval | |
date_apicall_max = date_max - timedelta(seconds=1) #subtract 1 second in order to avoid overlap | |
else: | |
date_min = date_first | |
date_max = date_last | |
date_apicall_min = date_min | |
date_apicall_max = date_min + interval - timedelta(seconds=1) | |
assert date_min < date_max | |
assert date_apicall_min < date_apicall_max | |
while date_min <= date_apicall_min and date_apicall_max <= date_max: | |
yield date_apicall_min, date_apicall_max | |
date_apicall_min += interval | |
date_apicall_max += interval | |
def log_get_tweets(log_file, query_text, geocode_only, table_name, date_first, date_last, interval): | |
log_file.write(query_text + '\n') | |
log_file.write(str(geocode_only) + '\n') | |
log_file.write(table_name + '\n') | |
log_file.write(str(date_first) + '\n') | |
log_file.write(str(date_last) + '\n') | |
log_file.write(str(interval) + '\n') | |
def log_get_tweets_error_over_500(log_file, date_range): | |
log_file.write("Over 500 tweets at time {}\n".format(date_range)) | |
def get_tweets(query_text, geocode_only, table_name, date_first, date_last, interval, db_conn): | |
''' | |
Get tweets using topsy API | |
query_text - eg 'diabetes OR insulin' | |
geocode_only - bool | |
table_name - output table | |
date_first - from when to start collecting tweets | |
date_last - from when to finish collecting tweets | |
interval - the amount of time for each api call.. adjust so that | |
500 tweet limit is never hit | |
''' | |
log_file_name = "get_tweets_{}_{}.log".format(table_name, int(time.time())) | |
log_file = open(log_file_name, 'w') | |
log_get_tweets(log_file, query_text, geocode_only, table_name, date_first, date_last, interval) | |
api_time_ranges = get_time_ranges(date_first, date_last, interval) | |
for time_range in api_time_ranges: | |
#collect api parameters except for auth key | |
##See API docs here - http://api.topsy.com/doc/resources/content/tweets/ | |
params = { | |
'q':query_text, | |
'mintime':utc_from_dt(time_range[0]), | |
'maxtime':utc_from_dt(time_range[1]), | |
'limit':500, | |
'sort_by':'-date', #order by time, oldest first | |
'latlong': 1 if geocode_only else 0, | |
'include_enrichment_all':1 | |
} | |
try: | |
response_json_text = topsy_api_call(params) | |
response_json = json.loads(response_json_text) | |
tweets = response_json['response']['results']['list'] | |
print "Got {} tweets from timerange: {} to {}".format(len(tweets), time_range[0], time_range[1]) | |
put_in_database_table(tweets, table_name, topsy_datafields, topsy_post_processing, db_conn) | |
except Exception as e: | |
print "An exception occurred: {}".format(e) | |
print "Trudging on..." | |
if len(tweets) >= 500: | |
log_get_tweets_error_over_500(log_file, time_range) | |
log_file.close() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import datetime | |
import weibo | |
import time, sys, re | |
import MySQLdb, pprint | |
from weibo import APIClient | |
import xml.etree.ElementTree as ET | |
from warnings import filterwarnings | |
filterwarnings('ignore', category = MySQLdb.Warning) | |
client = APIClient(3682690754, 'xxxxxxxxx', 'http://www.upenn.edu') | |
client.set_access_token('2.xxxxxx', xxxxxxxxx) | |
db = MySQLdb.connect(db = "randomWeibo", charset = 'utf8') | |
cur = db.cursor() | |
MAX_INSERT_ATTEMPS = 5 | |
MAX_LIMIT = 2000 | |
WEIBO_WAIT_TIME = 60*10 | |
MySQLtable = """message_id bigint(20) primary key, | |
message text, | |
created_time datetime, | |
source varchar(128), | |
reposts_count int(8), | |
comments_count int(8), | |
attitudes_count int(8), | |
retweeted_message_id bigint(20), | |
in_reply_to_status_id bigint(20), | |
in_reply_to_user_id bigint(20), | |
message_geo varchar(128), | |
user_id bigint(20), | |
gender char(1), | |
followers_count int(8), | |
friends_count int(8), | |
bi_followers_count int(8), | |
statuses_count int(8), | |
lang char(5), | |
profile_created_time datetime, | |
location varchar(20), | |
province int(2), | |
city int(5), | |
verified int(1)""" | |
MySQLcols = [i.strip().split()[0] for i in MySQLtable.strip().split(',\n')] | |
MySQLtable = re.sub("\n"," ",MySQLtable) | |
cols_corresp = {"message_id": "mid", | |
"message": "text", | |
"created_time": "created_at", | |
"source": "source", | |
"reposts_count": "reposts_count", | |
"comments_count": "comments_count", | |
"attitudes_count": "attitudes_count", | |
"retweeted_message_id": "retweeted_status.mid", | |
"in_reply_to_status_id": "in_reply_to_status_id", | |
"in_reply_to_user_id": "in_reply_to_user_id", | |
"message_geo": "geo", | |
"user_id": "user.idstr", | |
"gender": "user.gender", | |
"followers_count": "user.followers_count", | |
"friends_count": "user.friends_count", | |
"bi_followers_count": "user.bi_followers_count", | |
"statuses_count": "user.statuses_count", | |
"lang": "user.lang", | |
"profile_created_time": "user.created_at", | |
"location": "user.location", | |
"province": "user.province", | |
"city": "user.city", | |
"verified": "user.verified" | |
} | |
prevMonth = -1; | |
# Todos: | |
# - message table needs to be called something like "messages_2014_6" | |
# - messages need to be sorted per month, not enough to check what month it is when inserting | |
# - make use of "dictionary", "eval" when doing cleanup (i.e. when turning json weibo into row format) | |
# - make sure it runs even when encountering errors | |
def _connect(): | |
db = MySQLdb.connect(db = "randomWeibo", charset = 'utf8') | |
cur = db.cursor() | |
return cur | |
def _execute(cur, query, verbose = True, attempt = 0): | |
if attempt >= 5: | |
print "This SQL query yielded too many errors, moving on (some data may have been lost)" | |
return | |
if verbose: | |
print "SQL:\t%s" % query[:200] | |
ret = None | |
try: | |
ret = cur.execute(query) | |
except Exception as e: | |
print "Error encountered: %s" % str(e), "[attempt: %d]" % attempt | |
ret = _execute(_connect(), query, False, attempt + 1) | |
return ret | |
def _executemany(cur, query, values, verbose = True, attempt = 0): | |
if attempt >= 5: | |
print "This SQL query yielded too many errors, moving on (some data may have been lost)" | |
return | |
if verbose: | |
print "SQL:\t%s" % query[:200] | |
ret = None | |
try: | |
ret = cur.executemany(query, values) | |
except Exception as e: | |
print "Error encountered: %s" % str(e), "[attempt: %d]" % attempt | |
ret = _executemany(_connect(), query, values, False, attempt + 1) | |
return ret | |
def wait(s): | |
for i in xrange(s): | |
print "\r\t%s left to wait" % datetime.timedelta(seconds = (s-i)), | |
sys.stdout.flush() | |
time.sleep(1) | |
def weiboTimeToMysql(timestr): | |
#Mon Jan 25 05:02:27 +0800 2010 | |
return str(time.strftime("%Y-%m-%d %H:%M:%S", time.strptime(timestr, '%a %b %d %H:%M:%S +0800 %Y'))) | |
def weiboMonth(timestr): | |
t = time.strptime(timestr, '%a %b %d %H:%M:%S +0800 %Y') | |
return time.strftime('%Y_%m',t) | |
def linearizeWeibo(jWeibo): | |
weibo = [] | |
for myCol in MySQLcols: | |
item = None | |
try: | |
item = eval("jWeibo.%s" % cols_corresp[myCol]) | |
item = item.encode('utf8') | |
if item == '': | |
item = None | |
if "time" in myCol: | |
item = weiboTimeToMysql(item) | |
if "source" in myCol and item: | |
try: | |
item = ET.fromstring(item).text | |
except Exception as e: | |
print "Parsing error on item: '%s'? [%s]" % (item,str(e)) | |
except AttributeError as e: | |
pass | |
weibo.append(item) | |
return weibo | |
def createNewMySQLtable(tableName): | |
query = "create table %s (%s) engine=MyISAM" % (tableName, MySQLtable + ", index datedex (created_time)") | |
_execute(cur, query) | |
def insertWeibos(weiboDict, nbAttempts = 0): | |
# weiboDict = {"2014_06" : list of weibo rows} | |
if nbAttempts >= MAX_INSERT_ATTEMPS: | |
print "There was a problem with inserting rows, there will be some rows ignored." | |
return | |
for monthYear, weibos in weiboDict.iteritems(): | |
# Check that monthly table exists | |
_execute(cur, "show tables like 'messages_%s'" % monthYear, False) | |
if len(cur.fetchall()) == 0: | |
# Create table if it doesn't exist | |
createNewMySQLtable("messages_%s" % monthYear) | |
# Disable keys for faster insertion | |
_execute(cur, "alter table messages_%s disable keys" % monthYear, False) | |
# Inserting rows | |
query = "REPLACE INTO messages_"+monthYear+" VALUES ("+", ".join("%s" for i in MySQLcols)+")" | |
print "SQL:\t%s" % query[:200] | |
try: | |
cur.executemany(query, weibos) | |
except Exception as e: | |
print "Oops, error when inserting:", e, "- retrying, attempt number %d" % (nbAttempts + 1) | |
insertWeibos(weiboDict, nbAttempts + 1) | |
# Reenable keys after insertion | |
_execute(cur, "alter table messages_%s enable keys" % monthYear, False) | |
# Confirm insertion | |
print "Sucessfully inserted %d weibos into 'messages_%s' [%s]" % (len(weibos), monthYear, time.strftime("%c")) | |
counter = 0 | |
weibos = dict() | |
while True : | |
r = None | |
try: | |
r = client.statuses.public_timeline.get() | |
except weibo.APIError as e: | |
if ("error_code" in dir(e) and e.error_code == 10023) or "rate limit" in str(e): | |
# Rate Limit Reached | |
print "\rRate limit reached" | |
print "\rERROR: Weibo's out [%s], waiting %s, but saving all Weibos first (if there are any)" % (time.strftime("%c"), datetime.timedelta(seconds = WEIBO_WAIT_TIME)) | |
if weibos and len(weibos.items()[0][1]): | |
insertWeibos(weibos) | |
weibos = dict() | |
counter = 0 | |
wait(WEIBO_WAIT_TIME) | |
print "Time's up, trying again!" | |
except Exception as e: | |
# other error? | |
print "\rERROR: Encountered some unknown error [%s], waiting 2 seconds" % str(e) | |
time.sleep(2) | |
if not r: continue | |
for st in r.statuses: | |
weibo_month = weiboMonth(st.created_at) | |
lWeibo = linearizeWeibo(st) | |
if not lWeibo[0]: continue | |
try: | |
weibos[weibo_month].append(lWeibo) | |
if st.has_key("retweeted_status"): | |
try: | |
weibos[weiboMonth(st.retweeted_status.created_at)].append(linearizeWeibo(st.retweeted_status)) | |
except KeyError: | |
weibos[weiboMonth(st.retweeted_status.created_at)] = [linearizeWeibo(st.retweeted_status)] | |
counter += 1 | |
except KeyError: | |
weibos[weibo_month] = [lWeibo] | |
counter += 1 | |
if counter % 9 == 0 : | |
print "\rGrabbed %d weibos" % (counter+1), | |
sys.stdout.flush() | |
if counter >= MAX_LIMIT: | |
print "\nReached limit, inserting %d weibos" % MAX_LIMIT | |
insertWeibos(weibos) | |
weibos = dict() | |
counter = 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/python | |
__author__ = "Maarten Sap" | |
__email__ = "maartensap93@gmail.com" | |
__version__ = "0.3" | |
""" | |
TODOs: | |
pull retweeted message as well as original | |
Command Line interface | |
better geolocation? | |
""" | |
import datetime, time | |
import os, sys | |
import json, re | |
import MySQLdb | |
from TwitterAPI import TwitterAPI | |
from requests.exceptions import ChunkedEncodingError | |
import xml.etree.ElementTree as ET | |
from HTMLParser import HTMLParser | |
MAX_MYSQL_ATTEMPTS = 5 | |
MAX_TWITTER_ATTEMPTS = 5 | |
TWEET_LIMIT_BEFORE_INSERT = 100 #edited by selah | |
TWT_REST_WAIT = 15*60 | |
DEFAULT_MYSQL_COL_DESC = ["user_id bigint(20)", "message_id bigint(20) primary key", | |
"message text", "created_time datetime", | |
"in_reply_to_message_id bigint(20)", | |
"in_reply_to_user_id bigint(20)", "retweet_message_id bigint(20)", | |
"source varchar(128)", "lang varchar(4)", "time_zone varchar(64)", | |
"friend_count int(6)", "followers_count int(6)", | |
"user_location varchar(200)", "tweet_location varchar(200)", "coordinates varchar(128)", | |
"coordinates_address varchar(64)", "coordinates_state varchar(3)", | |
"index useriddex (user_id)", "index datedex (created_time)"] | |
DEFAULT_TWEET_JSON_SQL_CORR = {'id': "['id_str']", | |
'message_id': "['id_str']", | |
'message': "['text']", | |
'created_time': "['created_at']", | |
'user_id': "['user']['id_str']", | |
'in_reply_to_message_id': "['in_reply_to_status_id_str']", | |
'in_reply_to_user_id': "['in_reply_to_user_id_str']", | |
'retweet_message_id': "['retweeted_status']['id']", | |
'user_location': "['user']['location']", | |
'tweet_location': "['place']['full_name']", | |
'friend_count': "['user']['friends_count']", | |
'followers_count': "['user']['followers_count']", | |
'time_zone': "['user']['id_str']", | |
'lang': "['lang']", | |
'source': "['source']", | |
} | |
class TwitterMySQL: | |
"""Wrapper for the integration of Twitter APIs into MySQL | |
Turns JSON tweets into row format | |
Failsafe connection to MySQL servers | |
Geolocates if tweet contains coordinates in the US | |
[TODO] Geolocates using the Google Maps API | |
""" | |
def _warn(self, *objs): | |
errorStream = open(self.errorFile, "a+") if self.errorFile else sys.stderr | |
print >> errorStream, "\rWARNING: ", " ".join(str(o) for o in objs) | |
def __init__(self, **kwargs): | |
""" | |
Required parameters: | |
- db MySQL database to connect to | |
- table table to insert Twitter responses in | |
- API_KEY Twitter API key (to connect to Twitter) | |
- API_SECRET Twitter API Secret | |
- ACCESS_TOKEN Twitter App Access token | |
- ACCESS_SECRET Twitter App Access token secret | |
Optional parameters: | |
- noWarnings disable MySQL warnings [Default: False] | |
- dropIfExists set to True to delete the existing table | |
- geoLocate a function that converts coordinates to state | |
and/or address. | |
Format: | |
(state, address) = your_method(lat, long) | |
- errorFile error logging file - warnings will be written to it | |
[Default: stderr] | |
- jTweetToRow JSON tweet to MySQL row tweet correspondence | |
(see help file for more info) | |
[Default: DEFAULT_TWEET_JSON_SQL_CORR] | |
- SQLfieldsExp SQL column description for MySQL table | |
[Default: DEFAULT_MYSQL_COL_DESC] | |
- host host where the MySQL database is on | |
[Default: localhost] | |
- any other MySQL.connect argument | |
""" | |
if "table" in kwargs: | |
self.table = kwargs["table"] | |
del kwargs["table"] | |
else: | |
raise ValueError("Table name missing") | |
if "dropIfExists" in kwargs: | |
self.dropIfExists = kwargs["dropIfExists"] | |
del kwargs["dropIfExists"] | |
else: | |
self.dropIfExists = False | |
if "geoLocate" in kwargs: | |
self.geoLocate = kwargs["geoLocate"] | |
del kwargs["geoLocate"] | |
else: | |
self.geoLocate = None | |
if "noWarnings" in kwargs and kwargs["noWarnings"]: | |
del kwargs["noWarnings"] | |
from warnings import filterwarnings | |
filterwarnings('ignore', category = MySQLdb.Warning) | |
if "errorFile" in kwargs: | |
self.errorFile = kwargs["errorFile"] | |
del kwargs["errorFile"] | |
else: | |
self.errorFile = None | |
if "jTweetToRow" in kwargs: | |
self.jTweetToRow = kwargs["jTweetToRow"] | |
del kwargs["jTweetToRow"] | |
else: | |
self.jTweetToRow = DEFAULT_TWEET_JSON_SQL_CORR | |
if "fields" in kwargs and "SQLfieldsExp" in kwargs: | |
# Fields from the JSON Tweet to pull out | |
self.columns = kwargs["fields"] | |
del kwargs["fields"] | |
self.columns_description = kwargs["SQLfieldsExp"] | |
del kwargs["SQLfieldsExp"] | |
if len([f for f in self.columns_description if "index" != f[:5]]) != len(self.columns): | |
raise ValueError("There was a mismatch between the number of columns in the 'fields' and the 'field_expanded' variable. Please check those and try again.") | |
elif "fields" in kwargs: | |
raise ValueError("Please provide a detailed MySQL column description of the fields you want grabbed. (keyword argument: 'SQLfieldsExp')") | |
elif "SQLfieldsExp" in kwargs: | |
self.columns_description = kwargs["SQLfieldsExp"] | |
del kwargs["SQLfieldsExp"] | |
self.columns = [f.split(' ')[0] | |
for f in self.columns_description | |
if f.split(' ')[0][:5] != "index"] | |
else: | |
self.columns_description = DEFAULT_MYSQL_COL_DESC | |
self.columns = [f.split(' ')[0] | |
for f in self.columns_description | |
if f.split(' ')[0][:5] != "index"] | |
if "api" in kwargs: | |
self._api = kwargs["api"] | |
del kwargs["api"] | |
elif ("API_KEY" in kwargs and | |
"API_SECRET" in kwargs and | |
"ACCESS_TOKEN" in kwargs and | |
"ACCESS_SECRET" in kwargs): | |
self._api = TwitterAPI(kwargs["API_KEY"], kwargs["API_SECRET"], kwargs["ACCESS_TOKEN"], kwargs["ACCESS_SECRET"]) | |
del kwargs["API_KEY"], kwargs["API_SECRET"], kwargs["ACCESS_TOKEN"], kwargs["ACCESS_SECRET"] | |
else: | |
raise ValueError("TwitterAPI object or API_KEY, API_SECRET, ACCESS_TOKEN, ACCESS_SECRET needed to connect to Twitter. Please see dev.twitter.com for the keys.") | |
if not "charset" in kwargs: | |
kwargs["charset"] = 'utf8' | |
try: | |
self._connect(kwargs) | |
except TypeError as e: | |
print "You're probably using the wrong keywords, here's a list:\n"+self.__init__.__doc__ | |
raise TypeError(e) | |
def _connect(self, kwargs = None): | |
"""Connecting to MySQL sometimes has to be redone""" | |
if kwargs: | |
self._SQLconnectKwargs = kwargs | |
elif not kwargs and self._SQLconnectKwargs: | |
kwargs = self._SQLconnectKwargs | |
self._connection = MySQLdb.connect(**kwargs) | |
self.cur = self._connection.cursor() | |
def _wait(self, t, verbose = True): | |
"""Wait function, offers a nice countdown""" | |
for i in xrange(t): | |
if verbose: | |
print "\rDone waiting in: %s" % datetime.timedelta(seconds=(t-i)), | |
sys.stdout.flush() | |
time.sleep(1) | |
if verbose: | |
print "\rDone waiting! " | |
def _execute(self, query, nbAttempts = 0, verbose = True): | |
if nbAttempts >= MAX_MYSQL_ATTEMPTS: | |
self._warn("Too many attempts to execute the query, moving on from this [%s]" % query[:300]) | |
return 0 | |
if verbose: print "SQL:\t%s" % query[:200] | |
try: | |
ret = self.cur.execute(query) | |
except Exception as e: | |
if "MySQL server has gone away" in str(e): | |
self._connect() | |
nbAttempts += 1 | |
if not verbose: print "SQL:\t%s" % query[:200] | |
self._warn("%s [Attempt: %d]" % (str(e), nbAttempts)) | |
self._wait(nbAttempts * 2) | |
ret = self._execute(query, nbAttempts, False) | |
return ret | |
def _executemany(self, query, values, nbAttempts = 0, verbose = True): | |
if nbAttempts >= MAX_MYSQL_ATTEMPTS: | |
self._warn("Too many attempts to execute the query, moving on from this [%s]" % query[:300]) | |
return 0 | |
if verbose: print "SQL:\t%s" % query[:200] | |
ret = None | |
try: | |
ret = self.cur.executemany(query, values) | |
except Exception as e: | |
if "MySQL server has gone away" in str(e): | |
self._connect() | |
nbAttempts += 1 | |
if not verbose: print "SQL:\t%s" % query[:200] | |
self._warn("%s [Attempt: %d]" % (str(e), nbAttempts)) | |
self._wait(nbAttempts * 2) | |
ret = self._executemany(query, values, nbAttempts, False) | |
return ret | |
def createTable(self, table = None): | |
""" | |
Creates the table specified during __init__(). | |
By default, the table will be deleted if it already exists, | |
but there will be a 10 second grace period for the user | |
to cancel the deletion (by hitting CTRL-c). | |
To disable the grace period and have it be deleted immediately, | |
please use dropIfExists = True during construction | |
""" | |
table = self.table if not table else table | |
# Checking if table exists | |
SQL = """show tables like '%s'""" % table | |
self._execute(SQL) | |
SQL = """create table %s (%s) character set utf-8 collate utf8_general_ci """ % (table, ', '.join(self.columns_description)) | |
if not self.cur.fetchall(): | |
# Table doesn't exist | |
self._execute(SQL) | |
else: | |
# table does exist | |
if not self.dropIfExists: | |
USER_DELAY = 10 | |
for i in xrange(USER_DELAY): | |
print "\rTable %s already exists, it will be deleted in %s, please hit CTRL-C to cancel the deletion" % (table, datetime.timedelta(seconds=USER_DELAY-i)), | |
sys.stdout.flush() | |
time.sleep(1) | |
print "\rTable %s already exists, it will be deleted" % table, " " * 150 | |
SQL_DROP = """drop table %s""" % table | |
self._execute(SQL_DROP) | |
self._execute(SQL) | |
def insertRow(self, row, table = None, columns = None, verbose = True): | |
"""Inserts a row into the table specified using an INSERT SQL statement""" | |
return self.insertRows([row], table, columns, verbose) | |
def replaceRow(self, row, table = None, columns = None, verbose = True): | |
"""Inserts a row into the table specified using a REPLACE SQL statement.""" | |
return self.replaceRows([row], table, columns, verbose) | |
def insertRows(self, rows, table = None, columns = None, verbose = True): | |
"""Inserts multiple rows into the table specified using an INSERT SQL statement | |
:param rows: list - list of rows, each row is itself a list with each column corresponding to a field | |
""" | |
table = self.table if not table else table | |
columns = self.columns if not columns else columns | |
EXISTS = "SHOW TABLES LIKE '%s'" % table | |
if not self._execute(EXISTS, verbose = False): self.createTable(table) | |
SQL = "INSERT INTO %s (%s) VALUES (%s)" % (table, | |
', '.join(columns), | |
', '.join("%s" for r in rows[0])) | |
return self._executemany(SQL, rows, verbose = verbose) | |
def replaceRows(self, rows, table = None, columns = None, verbose = True): | |
"""Inserts multiple rows into the table specified using a REPLACE SQL statement""" | |
table = self.table if not table else table | |
columns = self.columns if not columns else columns | |
EXISTS = "SHOW TABLES LIKE '%s'" % table | |
if not self._execute(EXISTS, verbose = False): self.createTable(table) | |
SQL = "REPLACE INTO %s (%s) VALUES (%s)" % (table, | |
', '.join(columns), | |
', '.join("%s" for r in rows[0])) | |
return self._executemany(SQL, rows, verbose = verbose) | |
#modify this as necessaary if I get a time in a format different than what is expected | |
def _tweetTimeToMysql(self, timestr, parseFormat = '%a %b %d %H:%M:%S +0000 %Y'): | |
# Mon Jan 25 05:02:27 +0000 2010 | |
return str(time.strftime("%Y-%m-%d %H:%M:%S", time.strptime(timestr, parseFormat))) | |
def _yearMonth(self, mysqlTime): | |
return time.strftime("%Y_%m",time.strptime(mysqlTime,"%Y-%m-%d %H:%M:%S")) | |
def _prepTweet(self, jTweet): | |
""" | |
:param jTweet: dict - dictionary representation of one tweet | |
:returns: list - each element corresponding to a field in a database table row | |
""" | |
tweet = {} | |
for SQLcol in self.columns: | |
try: | |
if SQLcol in self.jTweetToRow: | |
tweet[SQLcol] = eval("jTweet%s" % self.jTweetToRow[SQLcol]) | |
if isinstance(tweet[SQLcol], str) or isinstance(tweet[SQLcol], unicode): | |
tweet[SQLcol] = HTMLParser().unescape(tweet[SQLcol]).encode("utf-8") | |
if SQLcol == "created_time": | |
tweet[SQLcol] = self._tweetTimeToMysql(tweet[SQLcol]) | |
if SQLcol == "source": | |
try: | |
tweet[SQLcol] = ET.fromstring(re.sub("&", "&", tweet[SQLcol])).text | |
except Exception as e: | |
raise NotImplementedError("OOPS", type(e), e, [tweet[SQLcol]]) | |
else: | |
tweet[SQLcol] = None | |
except KeyError: | |
tweet[SQLcol] = None | |
if not any(tweet.values()): | |
raise NotImplementedError("OOPS", jTweet, tweet) | |
# Coordinates state and address | |
if "coordinates" in jTweet and jTweet["coordinates"]: | |
lon, lat = map(lambda x: float(x), jTweet["coordinates"]["coordinates"]) | |
if self.geoLocate: | |
(state, address) = self.geoLocate(lat, lon) | |
else: | |
(state, address) = (None, None) | |
tweet["coordinates"] = str(jTweet["coordinates"]["coordinates"]) | |
tweet["coordinates_state"] = str(state) if state else None | |
tweet["coordinates_address"] = str(address) if address else str({"lon": lon, "lat": lat}) | |
# Tweet is dictionary of depth one, now has to be linearized | |
tweet = [tweet[SQLcol] for SQLcol in self.columns] | |
return tweet | |
def _apiRequest(self, twitterMethod, params): | |
done = False | |
nbAttempts = 0 | |
while not done and nbAttempts < MAX_TWITTER_ATTEMPTS: | |
try: | |
r = self._api.request(twitterMethod, params) | |
except Exception as e: | |
# If the request doesn't work | |
if "timed out" in str(e).lower(): | |
self._warn("Time out encountered, reconnecting immediately.") | |
self._wait(1, False) | |
else: | |
self._warn("Unknown error encountered: [%s]" % str(e)) | |
self._wait(10) | |
nbAttempts += 1 | |
continue | |
# Request was successful in terms of http connection | |
try: | |
for i, response in enumerate(r.get_iterator()): | |
# Checking for error messages | |
if isinstance(response, int) or "delete" in response: | |
continue | |
if i == 0 and "message" in response and "code" in response: | |
if response['code'] == 88: # Rate limit exceeded | |
self._warn("Rate limit exceeded, waiting 15 minutes before a restart") | |
self._wait(TWT_REST_WAIT) | |
else: | |
self._warn("Error message received from Twitter %s" % str(response)) | |
continue | |
yield self._prepTweet(response) | |
done = True | |
except ChunkedEncodingError as e: | |
# nbAttempts += 1 | |
self._warn("ChunkedEncodingError encountered, reconnecting immediately: [%s]" % e) | |
continue | |
except Exception as e: | |
nbAttempts += 1 | |
self._warn("unknown exception encountered, waiting %d second: [%s]" % (nbAttempts * 2, str(e))) | |
self._wait(nbAttempts * 2) | |
continue | |
# If it makes it all the way here, there was no error encountered | |
nbAttempts = 0 | |
if nbAttempts >= MAX_TWITTER_ATTEMPTS: | |
self._warn("Request attempted too many times (%d), it will not be executed anymore [%s]" % (nbAttempts, twitterMethod + str(params))) | |
return | |
def apiRequest(self, twitterMethod, **params): | |
""" | |
Takes in a Twitter API request and yields formatted responses in return | |
Use as follows: | |
for tweet in twtSQL.apiRequest('statuses/filter', track="Twitter API"): | |
print tweet | |
For more info (knowing which twitterMethod to use) see: | |
http://dev.twitter.com/rest/public | |
http://dev.twitter.com/streaming/overview | |
""" | |
for response in self._apiRequest(twitterMethod, params): | |
yield response | |
def _tweetsToMySQL(self, tweetsYielder, replace = False, monthlyTables = False): | |
""" | |
Tool function to insert tweets into MySQL tables in chunks, | |
while outputting counts. | |
NOTE - the 4th column must be a date or else this function will throw an exception | |
""" | |
tweetsDict = {} | |
i = 0 | |
# TWEET_LIMIT_BEFORE_INSERT = 100 | |
for tweet in tweetsYielder: | |
i += 1 | |
try: | |
#TODO - this should not be hardcoded, change to allow for columns | |
tweetsDict[self._yearMonth(tweet[3])].append(tweet) | |
except KeyError: | |
tweetsDict[self._yearMonth(tweet[3])] = [tweet] | |
if i % 10 == 0: | |
print "\rNumber of tweets grabbed: %d" % i, | |
sys.stdout.flush() | |
if i % TWEET_LIMIT_BEFORE_INSERT == 0: | |
if monthlyTables: | |
for yearMonth, twts in tweetsDict.iteritems(): | |
table = self.table+"_"+yearMonth | |
if replace: | |
print "Sucessfully replaced %4d tweets into '%s' (%4d rows affected) [%s]" % (i, table, self.replaceRows(twts, table = table, verbose = False), time.strftime("%c")) | |
else: | |
print "Sucessfully inserted %4d tweets into '%s' [%s]" % (self.insertRows(twts, table = table, verbose = False), table, time.strftime("%c")) | |
else: | |
tweets = [twt for twts in tweetsDict.values() for twt in twts] | |
if replace: | |
print "Sucessfully replaced %4d tweets into '%s' (%4d rows affected) [%s]" % (i, self.table, self.replaceRows(tweets, verbose = False), time.strftime("%c")) | |
else: | |
print "Sucessfully inserted %4d tweets into '%s' [%s]" % (self.insertRows(tweets, verbose = False), self.table, time.strftime("%c")) | |
i, tweetsDict = (0, {}) | |
# If there are remaining tweets | |
if any(tweetsDict.values()): | |
if monthlyTables: | |
for yearMonth, twts in tweetsDict.iteritems(): | |
table = self.table+"_"+yearMonth | |
if replace: | |
print "Sucessfully replaced %4d tweets into '%s' (%4d rows affected) [%s]" % (i, table, self.replaceRows(twts, table = table, verbose = False), time.strftime("%c")) | |
else: | |
print "Sucessfully inserted %4d tweets into '%s' [%s]" % (self.insertRows(twts, table = table, verbose = False), table, time.strftime("%c")) | |
else: | |
tweets = [twt for twts in tweetsDict.values() for twt in twts] | |
if replace: | |
print "Sucessfully replaced %4d tweets into '%s' (%4d rows affected) [%s]" % (i, self.table, self.replaceRows(tweets, verbose = False), time.strftime("%c")) | |
else: | |
print "Sucessfully inserted %4d tweets into '%s' [%s]" % (self.insertRows(tweets, verbose = False), self.table, time.strftime("%c")) | |
i, tweetsDict = (0, {}) | |
def tweetsToMySQL(self, twitterMethod, **params): | |
""" | |
Ultra uber awesome function that takes in a Twitter API | |
request and inserts it into MySQL, all in one call | |
Here's some examples on how to use it: | |
For the Search API | |
twtSQL.tweetsToMySQL('search/tweets', q='"Taylor Swift" OR "Jennifer Lawrence"') | |
For hydrating (getting all available details) for a tweet | |
twtSQL.tweetsToMySQL('statuses/lookup', id="504710715954188288") | |
For more twitterMethods and info on how to use them, see: | |
http://dev.twitter.com/rest/public | |
http://dev.twitter.com/streaming/overview | |
""" | |
# Replace SQL command instead of insert | |
if "replace" in params: | |
replace = params["replace"] | |
del params["replace"] | |
else: | |
replace = False | |
if "monthlyTables" in params: | |
monthlyTables = params["monthlyTables"] | |
del params["monthlyTables"] | |
else: | |
monthlyTables = False | |
self._tweetsToMySQL(self._apiRequest(twitterMethod, params), replace = replace, monthlyTables = monthlyTables) | |
def randomSampleToMySQL(self, replace = False, monthlyTables = True): | |
""" | |
Takes the random sample of all tweets (~ 1%) and | |
inserts it into monthly table [tableName_20YY_MM]. | |
For more info, see: | |
http://dev.twitter.com/streaming/reference/get/statuses/sample | |
""" | |
self.tweetsToMySQL('statuses/sample', replace = replace, monthlyTables = monthlyTables) | |
def filterStreamToMySQL(self, **params): | |
""" | |
Use this to insert the tweets from the FilterStream into MySQL | |
Here's an example: | |
twtSQL.filterStreamToMySQL(track="Taylor Swift") | |
Here's a second example (Continental US bounding box): | |
twtSQL.filterStreamToMySQL(locations="-124.848974,24.396308,-66.885444,49.384358") | |
More info here: | |
http://dev.twitter.com/streaming/reference/post/statuses/filter | |
""" | |
self.tweetsToMySQL('statuses/filter', **params) | |
def userTimeline(self, **params): | |
""" | |
For a given user, returns all the accessible tweets from that user, | |
starting with the most recent ones (Twitter imposes a 3200 tweet limit). | |
Here's an example of how to use it: | |
for tweet in userTimeline(screen_name = "taylorswift13"): | |
print tweet | |
See http://dev.twitter.com/rest/reference/get/statuses/user_timeline for details | |
""" | |
ok = True | |
print "Finding tweets for %s" % ', '.join(str(k)+': '+str(v) for k,v in params.iteritems()) | |
params["count"] = 200 # Twitter limits to 200 returns | |
i = 0 | |
while ok: | |
tweets = [tweet for tweet in self._apiRequest('statuses/user_timeline', params)] | |
if not tweets: | |
# Warn about no tweets? | |
ok = False | |
if i != 0: print | |
else: | |
i += len(tweets) | |
print "\rNumber of tweets grabbed: %d" % i, | |
sys.stdout.flush() | |
params["max_id"] = str(long(tweets[-1][1])-1) | |
for tweet in tweets: | |
yield tweet | |
def userTimelineToMySQL(self, **params): | |
""" | |
For a given user, inserts all the accessible tweets from that user into, | |
the current table. (Twitter imposes a 3200 tweet limit). | |
Here's an example of how to use it: | |
userTimelineToMySQL(screen_name = "taylorswift13") | |
For details on keywords to use, see | |
http://dev.twitter.com/rest/reference/get/statuses/user_timeline | |
""" | |
print "Grabbing users tweets and inserting into MySQL" | |
# Replace SQL command instead of insert | |
if "replace" in params: | |
replace = params["replace"] | |
del params["replace"] | |
else: | |
replace = False | |
if "monthlyTables" in params: | |
monthlyTables = params["monthlyTables"] | |
del params["monthlyTables"] | |
else: | |
monthlyTables = False | |
self._tweetsToMySQL(self.userTimeline(**params), replace = replace, monthlyTables = monthlyTables) | |
def search(self, **params): | |
""" | |
Search API | |
""" | |
ok = True | |
print "Finding tweets for %s" % ', '.join(str(k)+': '+str(v) for k,v in params.iteritems()) | |
params["count"] = 200 # Twitter limits to 200 returns | |
i = 0 | |
while ok: | |
tweets = [tweet for tweet in self._apiRequest('search/tweets', params)] | |
if not tweets: | |
# Warn about no tweets? | |
ok = False | |
if i != 0: print | |
else: | |
i += len(tweets) | |
print "\rNumber of tweets grabbed: %d" % i, | |
sys.stdout.flush() | |
params["max_id"] = str(long(tweets[-1][1])-1) | |
for tweet in tweets: | |
yield tweet | |
def searchToMySQL(self, **params): | |
""" | |
Queries the Search API and pulls as many results as possible | |
Here's an example of how to use it: | |
userTimelineToMySQL(screen_name = "taylorswift13") | |
For details on keywords to use, see | |
http://dev.twitter.com/rest/reference/get/statuses/user_timeline | |
""" | |
print "Grabbing users tweets and inserting into MySQL" | |
# Replace SQL command instead of insert | |
if "replace" in params: | |
replace = params["replace"] | |
del params["replace"] | |
else: | |
replace = False | |
if "monthlyTables" in params: | |
monthlyTables = params["monthlyTables"] | |
del params["monthlyTables"] | |
else: | |
monthlyTables = False | |
self._tweetsToMySQL(self.search(**params), replace = replace, monthlyTables = monthlyTables) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment