Skip to content

Instantly share code, notes, and snippets.

@skypenguins
Last active May 10, 2021 01:09
Show Gist options
  • Save skypenguins/509e6c66e246ae44bcb08953db2841ef to your computer and use it in GitHub Desktop.
Save skypenguins/509e6c66e246ae44bcb08953db2841ef to your computer and use it in GitHub Desktop.
Download media that an authorized user has RTed
import json, config
from requests_oauthlib import OAuth1Session
import datetime
import requests
import shutil
import re
import argparse
def get_user_timeline(max_id, screen_name=config.SCREEN_NAME):
twitter = OAuth1Session(config.CONSUMER_KEY, config.CONSUMER_SECRET,
config.ACCESS_TOKEN, config.ACCESS_TOKEN_SECRET)
url = 'https://api.twitter.com/1.1/statuses/user_timeline.json?tweet_mode=extended' # get tweets of greater than or equal to 140 characters
if max_id:
params = {
'screen_name': screen_name,
'count': '200',
'include_rts': 1,
'max_id': max_id
}
else:
params = {'screen_name': screen_name, 'count': '200', 'include_rts': 1}
res = twitter.get(url, params=params)
if res.status_code == 200:
user_timeline = json.loads(res.text)
return user_timeline
else:
print('HTTP error: %d' % res.status_code)
return False
def get_retweet_list(year, month, day):
rts = []
max_id = None
flag = True
print('starting to get retweets...')
while flag:
user_timeline = get_user_timeline(max_id=max_id)
if user_timeline:
max_id = user_timeline[-1]['id_str']
# get retweets
vanilla_rts = [
status for status in user_timeline
if 'retweeted_status' in status
]
# search a retweet of the specified date
for status in vanilla_rts:
dt = datetime.datetime.strptime(status['created_at'],
'%a %b %d %H:%M:%S +0000 %Y')
created_at = dt + datetime.timedelta(hours=9) # JST
if datetime.date(year, month, day) <= created_at.date():
rts.append(status)
print(created_at)
else:
flag = False
else:
print('getting error: no timelime')
break
else:
print('RTs:', len(rts))
print('getting RTs finished')
return rts
def get_media_urls_from_tl(tweets):
urls = []
for tweet in tweets:
dt = datetime.datetime.strptime(
tweet['retweeted_status']['created_at'],
'%a %b %d %H:%M:%S +0000 %Y')
created_at = dt + datetime.timedelta(hours=9) # JST
dt2 = datetime.datetime.strptime(tweet['created_at'],
'%a %b %d %H:%M:%S +0000 %Y')
user_status_created_at = dt2 + datetime.timedelta(hours=9) # JST
print('----------------------------------------------------')
print('{name} @{screen_name}\n'.format(
name=tweet['retweeted_status']['user']['name'],
screen_name=tweet['retweeted_status']['user']['screen_name']))
print(tweet['retweeted_status']['full_text'])
print('\noriginal status id:', tweet['retweeted_status']['id_str'])
print('user\'s status id:', tweet['id_str'])
print('original date:', created_at)
print('retweeted date:', user_status_created_at)
if 'extended_entities' in tweet['retweeted_status']:
# get image urls
for media in tweet['retweeted_status']['extended_entities'][
'media']: # multiple images
urls.append(media['media_url_https'])
print('image url:', media['media_url_https'])
# get video urls
videos = []
bitrates = []
if 'video_info' in media:
for variant in media['video_info'][
'variants']: # multiple video qualities
if 'bitrate' in variant:
bitrates.append(int(variant['bitrate']))
videos.append(variant)
# select best quality
best_bitrate = max(bitrates)
for video in videos:
if best_bitrate == video['bitrate']:
urls.append(video['url'])
print('video url:', video['url'])
else:
print('\nno media')
print('\nRTs:', len(tweets))
print('extracted media:', len(urls))
print('getting media urls finished')
return urls
def dl_media(media_urls):
save_dir = './media/'
for media_url in media_urls:
file_name = str(media_url.split("/")[-1])
file_name = re.sub(r'\?.*', '', file_name) # delete parameter
full_path = save_dir + file_name
print('downloading {file_name} ... ({index} of {length})'.format(
file_name=file_name,
index=media_urls.index(media_url) + 1,
length=len(media_urls)))
if ('.mp4' in file_name) == False:
media_url = media_url + '?name=large'
try:
headers_dic = {
"User-Agent":
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.97 Safari/537.36"
}
with requests.get(media_url, stream=True,
headers=headers_dic) as res:
with open(full_path, 'wb') as f:
res.raw.decode_content = True
shutil.copyfileobj(res.raw, f)
print('download finished')
except requests.exceptions.HTTPError:
print(str(requests.exceptions.HTTPError))
print('止まるんじゃねぇぞ…')
print('all download finished')
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='download media that an authorized user has RTed')
parser.add_argument(
'year',
type=int,
help=
'the year of media RTed by the authorized user that you want to get')
parser.add_argument(
'month',
type=int,
help=
'the month of media RTed by the authorized user that you want to get')
parser.add_argument(
'day',
type=int,
help='the day of media RTed by the authorized user that you want to get'
)
args = parser.parse_args()
dl_media(
get_media_urls_from_tl(
get_retweet_list(args.year, args.month, args.day)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment