Skip to content

Instantly share code, notes, and snippets.

@nbraud
Last active April 27, 2018 08:55
Show Gist options
  • Save nbraud/900f5ee5718e3277d113b242b6fd71ea to your computer and use it in GitHub Desktop.
Save nbraud/900f5ee5718e3277d113b242b6fd71ea to your computer and use it in GitHub Desktop.
Tweet dump
/keys.py
__pycache__/
*.state
*.xz
#!/usr/bin/env python3
import cbor, lzma, tweepy, sys
def dump_user(api, user, since=None):
user_id = api.get_user(user).id
if since:
return tweepy.Cursor(api.user_timeline,
id=user_id, since_id=since).items()
else:
return tweepy.Cursor(api.user_timeline, id=user_id).items()
def process(state, out, tweet, flush=False):
from util import debug
debug('process', tweet)
cbor.dump(tweet._json, out)
user = tweet.user.screen_name
if state[user]:
state[user] = max(state[user], tweet.id)
else:
state[user] = tweet.id
if flush:
out.flush()
def argparser():
import argparse
from util import OverwriteablePath
parser = argparse.ArgumentParser(
description="Dump statuses from Tweeter."
)
parser.add_argument('--state', '-s',
help='State file (to support resumption)',
type=OverwriteablePath)
parser.add_argument('--follow', '-f',
action='store_true',
help='Wait for new tweets.')
parser.add_argument('--output', '-o',
default=sys.stdout.buffer,
help='Output file/stream (default: stdout)',
type=argparse.FileType('ab'))
parser.add_argument('user', nargs='*',
help='User(s) to follow')
return parser
if __name__ == "__main__":
from keys import consumer, access
from atomicwrites import atomic_write
import os.path
auth = tweepy.OAuthHandler(*consumer)
auth.set_access_token(*access)
api = tweepy.API(auth)
args = argparser().parse_args()
state = { user: None for user in args.user }
if args.state and os.path.exists(args.state):
with open(args.state, 'rb') as f:
state.update(cbor.load(f))
with lzma.open(args.output, "a", check=lzma.CHECK_SHA256) as out:
try:
for user, last_id in state.items():
for tweet in dump_user(api, user, since=last_id):
process(state, out, tweet)
if args.follow:
from util import StreamTweets, debug
from sys import stderr
out.flush()
print('Done fetching old tweets, now streaming.',
file=stderr, flush=True)
for tweet in StreamTweets(api, state.keys()):
debug('main/for:', tweet)
process(state, out, tweet, flush=True)
finally:
if args.state:
with atomic_write(args.state, mode='wb', overwrite=True) as f:
cbor.dump(state, f)
consumer = ("consumer token",
"consumer secret")
access = ("OAuth access token",
"OAuth secret")
#!/usr/bin/env python3
import cbor, lzma, sys
def cbors(input):
try:
while True:
yield cbor.load(input)
except EOFError:
return
def argparser():
import argparse
from jsonpath_rw import jsonpath as jp
from jsonpath_rw import parse as jparse
parser = argparse.ArgumentParser(
description="Extract data from Twitter dumps."
)
parser.add_argument('--path', '-p',
default=jp.Fields('text'),
help='The JSONpath to extract (default: text)',
type=jparse)
parser.add_argument('file', nargs='*',
default=[sys.stdin.buffer],
help='Compressed file(s) to process (default: stdin)',
type=argparse.FileType('rb'))
return parser
if __name__ == "__main__":
args = argparser().parse_args()
for in_file in args.file:
with lzma.open(in_file, "r") as inp:
for i, tweet in enumerate(cbors(inp)):
print(in_file.name, i,
[ '{0.full_path}: {0.value}'.format(match)
for match in args.path.find(tweet)
]
)
#!/bin/sh -e
if [ "$#" -ne 1 ]; then
echo "Usage: $0 twitter_handle" >&2
exit 1
fi
./dump.py -s "$1.state" "$1" | \
tee -a "$1.xz" | \
./load.py
def OverwriteablePath(path):
import os
from argparse import ArgumentError
path = os.path.abspath(path)
dir = os.path.dirname(path)
if os.path.exists(path):
if not os.access(path, os.R_OK | os.W_OK):
raise ArgumentError("File cannot be read or written to.")
if not os.path.exists(dir):
raise ArgumentError("Parent directory does not exist.")
if not os.access(dir, os.R_OK | os.W_OK | os.X_OK):
raise ArgumentError("Parent directory not accessible.")
return path
def StreamTweets(api, follows):
import tweepy
from queue import Queue
q = Queue()
follows = list(map(lambda user: api.get_user(user).id_str, follows))
class Producer(tweepy.StreamListener):
def on_status(self, status):
debug('on_status:', status)
q.put(status)
def on_error(self, status):
debug('on_error:', status)
stream = tweepy.Stream(api.auth, listener=Producer())
debug('stream:', stream)
debug('follow:', follows)
stream.filter(follow=follows, async=True)
while True:
debug('q.get()')
yield q.get()
import os
if 'DEBUG' in os.environ:
def debug(txt, obj=None):
from tweepy import Status
from sys import stderr
if isinstance(obj, Status):
print(txt, obj.created_at, obj.text, file=stderr, flush=True)
elif obj is not None:
print(txt, obj, file=stderr, flush=True)
else:
print(txt, file=stderr, flush=True)
else:
def debug(txt, obj=None):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment