Created
March 15, 2020 12:05
-
-
Save ssokolow/e3efc2300347a0b83f6b42b7860dac60 to your computer and use it in GitHub Desktop.
Alternative viewer for tweets in a Twitter data export
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
"""A simple script to preview the text of tweets in tweet.js from a Twitter | |
dump. | |
Accepts Zip files fresh from Twitter or extracted tweet.js files as input. | |
""" | |
# Prevent Python 2.x PyLint from complaining if run on this | |
from __future__ import (absolute_import, division, print_function, | |
with_statement, unicode_literals) | |
__author__ = "Stephan Sokolow (deitarion/SSokolow)" | |
__appname__ = "Quick Twitter dump viewer hack" | |
__version__ = "0.1" | |
__license__ = "MIT" | |
import html, json, logging, os, re, textwrap, zipfile | |
log = logging.getLogger(__name__) | |
# The text that, in my experiments, was all that was preventing tweet.js from | |
# being valid JSON | |
JS_PREFIX = "window.YTD.tweet.part0 = " | |
# Borrowed from my ssokolow.com/scripts/index.cgi | |
# TODO: Look up the actual rules Twitter applies when encountering URLs | |
# interacting with things like parens. | |
hyperlinkable_url_re = re.compile( | |
r"""((?:ht|f)tps?://[^\s()]+(?:\([^\s()]*\)[^\s()]*)*)""", | |
re.IGNORECASE | re.UNICODE) | |
# Retrieve the width to word-wrap to | |
try: | |
cols = int(os.environ.get('COLUMNS', '<nothing>')) | |
except ValueError: | |
cols = 80 | |
# Simple indenting and word-wrapping for | |
wrapper_l1 = textwrap.TextWrapper(width=cols, | |
initial_indent='* ', subsequent_indent=' ' * 2) | |
wrapper_l2 = textwrap.TextWrapper(width=cols, | |
initial_indent=' ' * 4 + '* ', subsequent_indent=' ' * 6) | |
def load_tweet_json(path): | |
"""Load tweet data from a Twitter dump zip file or bare tweet.js""" | |
if zipfile.is_zipfile(path): | |
zobj = zipfile.ZipFile(path) | |
with zobj.open('data/tweet.js') as fobj: | |
data = fobj.read().decode('utf8') | |
else: | |
with open(path) as fobj: | |
data = fobj.read() | |
if data.startswith(JS_PREFIX): | |
data = data[len(JS_PREFIX):] | |
return json.loads(data) | |
def make_tags_re(tag_list): | |
"""Make a regular expression that matches any of the given hashtags | |
Strings containing spaces will be treated as lists of tags. | |
Tag names will leading and trailing whitespace stripped and will | |
have # prepended if not already present. | |
""" | |
# Simplify calling this by allowing input to be empty | |
if not tag_list: | |
return None | |
# Normalize whitespace use | |
# (Turn boundaries between entries into spaces, strip leading and trailing | |
# spaces on the string as a whole to avoid empty entries, then re-split, | |
# treating each span of arbitrary whitespace as one split) | |
tag_list = ' '.join(tag_list).strip().split() | |
# Prepend # conditionally since I don't know whether Twitter considers | |
# something like ##foo to be distinct from #foo | |
prepared_tags = [] | |
for tag in tag_list: | |
tag = tag.strip() | |
if not tag.startswith('#'): | |
tag = '#' + tag | |
prepared_tags.append(tag) | |
# Escape each hashtag and construct a regex string that will match any | |
# tag surrounded by whitespace, string start/end, or word boundary | |
re_str = r'(^|\b|\s)({})($|\b|\s)'.format('|'.join( | |
re.escape(x) for x in prepared_tags)) | |
# Compile and return the regex | |
return re.compile(re_str) | |
def process_arg(path, skip_replies=False, skipped_tags=None): | |
"""Pretty-print the tweets in the given file""" | |
data = load_tweet_json(path) | |
for entry in data: | |
if 'tweet' in entry: | |
tweet = entry['tweet'] | |
text = tweet['full_text'] | |
# Pre-compute lookup for shortened->full URL mappings | |
url_map = {x['url']: x['expanded_url'] | |
for x in tweet.get('entities', {}).get('urls', [])} | |
def find_full_url(match_obj): | |
"""Regex replacement function to resolve shortened URLs. | |
Implemented as a closure for simplicity. | |
""" | |
# Fall back to just passing the shortened URL through | |
# (And silence PyLint being too stupid to recognize that | |
# all *use* sites are within the same loop body's scope) | |
# pylint: disable=cell-var-from-loop | |
short_url = match_obj.group(1) | |
return url_map.get(short_url, short_url) | |
if skipped_tags and skipped_tags.search(text): | |
continue | |
# Since Twitter doesn't allow HTML tags in tweets, just unescape | |
# entities and call it done | |
# TODO: Check what Twitter does to URLs, to verify that this | |
# shouldn't be *after* the un-shortening | |
text = html.unescape(text) | |
# Un-shorten URLs | |
text = hyperlinkable_url_re.sub(find_full_url, text) | |
# Indent reply tweets | |
if 'in_reply_to_screen_name' in tweet: | |
if not skip_replies: | |
print('\n'.join(wrapper_l2.wrap(text))) | |
else: | |
print('\n'.join(wrapper_l1.wrap(text))) | |
def main(): | |
"""The main entry point, compatible with setuptools entry points.""" | |
from argparse import ArgumentParser, RawDescriptionHelpFormatter | |
parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, | |
description=__doc__.replace('\r\n', '\n').split('\n--snip--\n')[0]) | |
parser.add_argument('--version', action='version', | |
version="%%(prog)s v%s" % __version__) | |
# parser.add_argument('-v', '--verbose', action="count", | |
# default=2, help="Increase the verbosity. Use twice for extra effect") | |
# parser.add_argument('-q', '--quiet', action="count", | |
# default=0, help="Decrease the verbosity. Use twice for extra effect") | |
parser.add_argument('path', action="store", nargs="+", | |
help="Dump file(s) to display") | |
parser.add_argument('--skip-hashtags', action='append', | |
help="Omit tweets containing any of the given hashtags. Will " | |
"automatically add the # character if missing. Values will be " | |
"concatenated if given multiple times.") | |
parser.add_argument('--skip-replies', action='store_true', default=False, | |
help="Omit replies from the output.") | |
args = parser.parse_args() | |
# Set up clean logging to stderr | |
# log_levels = [logging.CRITICAL, logging.ERROR, logging.WARNING, | |
# logging.INFO, logging.DEBUG] | |
# args.verbose = min(args.verbose - args.quiet, len(log_levels) - 1) | |
# args.verbose = max(args.verbose, 0) | |
# logging.basicConfig(level=log_levels[args.verbose], | |
# format='%(levelname)s: %(message)s') | |
for path in args.path: | |
process_arg(path, | |
skip_replies=args.skip_replies, | |
skipped_tags=make_tags_re(args.skip_hashtags)) | |
if __name__ == '__main__': | |
main() | |
# vim: set sw=4 sts=4 expandtab : |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment