Skip to content

Instantly share code, notes, and snippets.

@Alexhuszagh
Last active November 23, 2022 20:56
Show Gist options
  • Save Alexhuszagh/298a1bcf5034cbdbc2074c18e737ba38 to your computer and use it in GitHub Desktop.
Save Alexhuszagh/298a1bcf5034cbdbc2074c18e737ba38 to your computer and use it in GitHub Desktop.
Script to export Twitter data (statuses, friends, followers).
#!/usr/bin/env python
'''
export_twitter
==============
Export data information from Twitter, including friends, followers,
statuses, and more. Note that the API JSON document should look like:
{
"consumer_key": "...",
"consumer_secret": "...",
"access_token": "...",
"access_token_secret": "..."
}
Sample Usage:
./export_twitter.py \
--friends \
--api api.json \
--user kardonice \
--output kardonice.csv \
--save-media \
--format csv \
--verbose
Requirements:
Python 3.5+
tweepy==3.10.0
requests>=2.25
'''
__version__ = '0.0.0-dev'
__author__ = 'Alex Huszagh <ahuszagh@gmail.com>'
__license__ = 'Unlicense (Public Domain)'
import argparse
import csv
import json
import os
import requests
import tweepy
import urllib.parse
def print_verbose(message, verbose=True):
if verbose:
print(message)
def generate_api(path):
api_data = json.load(open(path))
consumer_key = api_data['consumer_key']
consumer_secret = api_data['consumer_secret']
access_token = api_data['access_token']
access_token_secret = api_data['access_token_secret']
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
return tweepy.API(
auth,
timeout=5,
wait_on_rate_limit=True,
wait_on_rate_limit_notify=True,
compression=True
)
def get_user(api, screen_name):
return api.lookup_users(screen_names=[screen_name])[0]
def get_cursor(cursor, default):
if cursor is None:
return default
return cursor
def get_followers(api, user, args):
cursor = tweepy.Cursor(
api.followers,
user_id=user.id,
screen_name=None,
cursor=get_cursor(args.cursor, -1),
)
for page in cursor.pages():
print_verbose(f'Current cursor at {cursor.iterator.next_cursor}', args.verbose)
yield from page
def get_friends(api, user, args):
cursor = tweepy.Cursor(
api.friends,
user_id=user.id,
screen_name=None,
cursor=get_cursor(args.cursor, -1),
)
for page in cursor.pages():
print_verbose(f'Current cursor at {cursor.iterator.next_cursor}', args.verbose)
yield from page
def get_statuses(api, user, args):
cursor = tweepy.Cursor(
api.user_timeline,
user_id=user.id,
screen_name=None,
max_id=get_cursor(args.cursor, None),
)
for page in cursor.pages():
print_verbose(f'Current cursor at {cursor.iterator.max_id}', args.verbose)
yield from page
ITEM_MESSAGE = {
'followers': ('user', 'screen_name'),
'friends': ('user', 'screen_name'),
'statuses': ('status', 'id_str'),
}
SIMPLE_USER_FIELDS = [
'created_at',
'default_profile',
'default_profile_image',
'description',
'favourites_count',
'followers_count',
'friends_count',
'id',
'id_str',
'listed_count',
'location',
'name',
'profile_banner_url',
'profile_image_url_https',
'protected',
'screen_name',
'statuses_count',
'url',
'verified',
'withheld_scope',
]
SIMPLE_STATUS_FIELDS = [
'contributors',
'created_at',
'favorite_count',
'favorited',
'filter_level',
'geo',
'id',
'id_str',
'in_reply_to_screen_name',
'in_reply_to_status_id',
'in_reply_to_status_id_str',
'in_reply_to_user_id',
'in_reply_to_user_id_str',
'is_quote_status',
'lang',
'possibly_sensitive',
'quote_count',
'quoted_status_id',
'quoted_status_id_str',
'retweet_count',
'retweeted',
'source',
'text',
'truncated',
'withheld_copyright',
'withheld_scope',
]
SIMPLE_FIELDS = {
'followers': SIMPLE_USER_FIELDS,
'friends': SIMPLE_USER_FIELDS,
'statuses': SIMPLE_STATUS_FIELDS,
}
def print_item_verbose(item, mode, verbose):
field_type, field_attr = ITEM_MESSAGE[mode]
print_verbose(f'Writing {field_type} {getattr(item, field_attr)}', verbose)
def extract_fields(item, fields):
if fields is None:
return item._json
return {k: item._json.get(k) for k in fields}
def write_items(location, iterable, mode, args):
os.makedirs(location.parent, exist_ok=True)
if args.save_media:
os.makedirs(location.directory_path, exist_ok=True)
globals()[f'write_{args.format}'](location, iterable, mode, args)
def write_csv(location, iterable, mode, args):
with open(location.file_path, 'a', newline='') as file:
writer = None
fields = args.fields or SIMPLE_FIELDS[mode]
for index, item in enumerate(iterable):
if writer is None:
writer = csv.DictWriter(file, fieldnames=fields, dialect='excel-tab')
writer.writeheader()
print_item_verbose(item, mode, args.verbose)
data = extract_fields(item, fields)
writer.writerow(data)
if index % 10 == 0:
file.flush()
if args.save_media:
save_media_urls(location, item, args)
def write_json(location, iterable, mode, args):
# this writes it as a series of JSON objects, to avoid failing to write to disk
with open(location.file_path, 'a') as file:
fields = args.fields
for index, item in enumerate(iterable):
print_item_verbose(item, mode, args.verbose)
data = extract_fields(item, fields)
file.write(json.dumps(data) + '\n')
if index % 10 == 0:
file.flush()
if args.save_media:
save_media_urls(location, item, args)
def save_media_urls(location, item, args):
media = item._json.get('entities', {}).get('media', [])
media += item._json.get('extended_entities', {}).get('media', [])
for media_item in media:
save_media_item(location, item, media_item, args)
def save_media_item(location, item, media_item, args):
url = media_item.get('media_url_https')
if url is not None:
parsed = urllib.parse.urlparse(url)
filename = os.path.basename(parsed.path)
unique_filename = f'{item.id_str}-{media_item["id_str"]}-{filename}'
save_media(location, url, unique_filename, args)
def save_media(location, url, unique_filename, args):
print_verbose(f'Saving media at url "{url}" with unique ID {unique_filename}.')
response = requests.get(url)
if not response.ok:
print(f'\033[31mError:\033[0m Unable to save media attachment at url "{url}".')
path = os.path.join(location.directory_path, unique_filename)
with open(path, 'wb') as file:
file.write(response.content)
def get_mode(args):
attrs = ['friends', 'followers', 'statuses']
for attr in attrs:
if getattr(args, attr, None):
return attr
raise ValueError('Currently unknown export mode')
class OutputLocation:
_slots_ = ('parent', 'filename', 'extension')
def __init__(self, path, file_format):
realpath = os.path.realpath(path)
self.parent = os.path.dirname(realpath)
basename = os.path.basename(realpath)
self.filename, self.extension = os.path.splitext(basename)
if not self.extension:
self.extension = f'.{file_format}'
@property
def file_path(self):
return os.path.join(self.parent, f'{self.filename}{self.extension}')
@property
def directory_path(self):
return os.path.join(self.parent, self.filename)
def main():
parser = argparse.ArgumentParser(description='Twitter API exporter parameters.')
action_group = parser.add_mutually_exclusive_group(required=True)
action_group.add_argument(
'--friends',
help='Export a list of friends (accounts you follow)',
action='store_true',
)
action_group.add_argument(
'--followers',
help='Export a list of followers (accounts that follow you)',
action='store_true',
)
action_group.add_argument(
'--statuses',
help='Export a list of statuses from a user account',
action='store_true',
)
parser.add_argument(
'-a',
'--api',
help='JSON document with the API credentials.',
default='api.json',
)
parser.add_argument(
'-u',
'--user',
help='Screen name of user to get data from.',
required=True,
)
parser.add_argument(
'-o',
'--output',
help='Output file name, the extension will be added if not provided.',
)
parser.add_argument(
'-V',
'--version',
action='version',
version=f'%(prog)s {__version__}'
)
parser.add_argument(
'-sm',
'--save-media',
action='store_true',
help='Save media attachments. The directory name defaults to the filename.',
)
parser.add_argument(
'-f',
'--fields',
help='Fields to extract from each item. Leave empty for all',
nargs='*',
)
parser.add_argument(
'--format',
help='Export format.',
default='json',
choices=['json', 'csv'],
)
parser.add_argument(
'-c',
'--cursor',
help='Current cursor position (in case of interrupted run).',
)
parser.add_argument(
'-v',
'--verbose',
action='store_true',
help='Print verbose debugging information.',
)
args = parser.parse_args()
mode = get_mode(args)
output = args.output
if output is None:
output = f'{args.user}_{mode}.{args.format}'
location = OutputLocation(output, args.format)
api = generate_api(args.api)
user = get_user(api, args.user)
kwds = {}
if args.cursor is not None:
kwds['cursor'] = int(args.cursor)
iterable = globals()[f'get_{mode}'](api, user, args)
write_items(location, iterable, mode, args)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment