Skip to content

Instantly share code, notes, and snippets.

@darthmall
Created October 7, 2011 23:20
Show Gist options
  • Save darthmall/1271577 to your computer and use it in GitHub Desktop.
Save darthmall/1271577 to your computer and use it in GitHub Desktop.
A simple Python 3 script to scrape toots from twitter and dump them in a SQLite database.
#!/usr/bin/env python3
from urllib.parse import urlencode
from urllib.request import urlopen
import itertools, json, sqlite3
# Constants
TWITTER_SEARCH_URL = 'http://search.twitter.com/search.json?'
# Simple database schema, largely for enforcing uniqueness
DATABASE_SCHEMA = '''
CREATE TABLE IF NOT EXISTS users (
id INTEGER UNIQUE NOT NULL,
name TEXT UNIQUE NOT NULL
);
CREATE TABLE IF NOT EXISTS toots (
id INTEGER UNIQUE NOT NULL,
content TEXT UNIQUE NOT NULL,
timestamp TEXT NOT NULL,
retoot INTEGER NOT NULL DEFAULT 0,
user_id INTEGER
);
'''
# Select statement to retrive toots from the database filtering out retoots
# and any toot containing the word "win" because those are likely to be spam.
SELECT_TOOTS = '''
select users.id, users.name, toots.content, toots.timestamp
from toots
join users on toots.user_id = users.id
where toots.retoot = 0
AND toots.content NOT LIKE '% win %'
;
'''
def search(query, **kwargs):
'''Return a dictionary for the JSON object returned by the twitter search.
'''
params = {'q': query}
params.update(**kwargs)
print(params)
response = urlopen(TWITTER_SEARCH_URL + urlencode(params))
# FIXME: Hard-coding the character encoding is bad since it should be
# readable from the response headers. Laziness prevails!
return json.loads(response.read().decode('utf-8'))
def init_db(filename):
'''Opens and initializes the database of toots.'''
conn = sqlite3.connect(filename)
cur = conn.cursor()
cur.executescript(DATABASE_SCHEMA)
conn.commit()
return conn
def store_toot(cursor, user, user_id, timestamp, toot, toot_id):
'''Writes a toot with the metadata I care about to the database.'''
try:
cursor.execute("select id from users where id=?", (user_id, ))
if not cursor.fetchone():
cursor.execute("insert into users ('id', 'name') values (?, ?)", (user_id, user))
cursor.execute(
"insert into toots ('id', 'content', 'timestamp', 'retoot', 'user_id') values (?, ?, ?, ?, ?)",
(toot_id, toot, timestamp, toot.startswith('RT'), user_id))
except sqlite3.IntegrityError:
print("Skipping: " + toot)
if __name__ == '__main__':
from argparse import ArgumentParser
import csv
# Set up the command-line arguments.
parser = ArgumentParser('Search twitter.')
parser.add_argument('file', metavar='DB_FILE', type=str,
help='filename of the sqlite3 database where the results are stored')
parser.add_argument('-q', metavar='QUERY', dest='query', type=str,
help='search string')
parser.add_argument('--rpp', dest='rpp', default=100, type=int,
help='results per page')
parser.add_argument('--pages', dest='pages', default=1, type=int,
help='number of results pages')
parser.add_argument('-o', '--output', dest='output', type=str,
help='filename for the CSV export')
args = parser.parse_args()
conn = init_db(args.file)
cursor = conn.cursor()
if args.output:
with open(args.output, 'w') as f:
writer = csv.writer(f)
writer.writerow(('user_id', 'user', 'toot', 'timestamp'))
for toot in cursor.execute(SELECT_TOOTS):
writer.writerow(toot)
else:
results = search(args.query, lang='en', rpp=args.rpp)
for p in range(args.pages):
r = search(args.query, lang='en', rpp=args.rpp, page=p+1)
for toot in r['results']:
store_toot(cursor, toot['from_user'], toot['from_user_id'],
toot['created_at'], toot['text'], toot['id_str'])
conn.commit()
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment