Skip to content

Instantly share code, notes, and snippets.

@msakamoto-sf
Created January 4, 2022 02:46
Show Gist options
  • Save msakamoto-sf/356b986d26878ba803d1e00146f218f1 to your computer and use it in GitHub Desktop.
Save msakamoto-sf/356b986d26878ba803d1e00146f218f1 to your computer and use it in GitHub Desktop.
Twitter で「いいね!」した画像をローカルにダウンロードする Python スクリプト。(自分自身のアカウントを前提)
# Twitter で「いいね!」した画像をローカルにダウンロードする Python スクリプト。
# (自分自身のアカウントを前提)
#
# requirements: python3 (>= 3.10)
#
# setup:
# 1. `python3 -m venv venv`
# 2. `pip install tweepy==4.4.0`
# (if you installed via windows installer, try 'py -m venv venv')
#
# prepare key and secrets:
# 1. sign up https://developer.twitter.com/en
# 2. create project and app
# 3. save your app's consumer keys (API Key and Secret)
# 4. open your app's "Keys and tokens" page, then generate your Access Token and Secret
#
# run:
# 1. set environment values (see python script below)
# 2. `python3 liked-image-downloader.py`
# - (for win: `py liked-image-downloader.py`)
#
# reference:
# - https://www.tweepy.org/
# - https://docs.tweepy.org/en/v4.4.0/index.html
# - Twitterで「いいね」した画像を一括保存するPHPとTwitterAPI | キノコログ
# - https://kinocolog.com/favo_image_download/
# - Twitterいいね画像定期保存Bot - Qiita
# - https://qiita.com/hxbdy625/items/3806bdc2b36f64986119
import os
import datetime
import time
from pprint import pprint
import urllib
import tweepy
twitter_username =os.getenv('TWITTER_USERNAME', '')
oauth_consumer_key = os.getenv('CONSUMER_KEY', '<consumer-key>')
oauth_consumer_secret = os.getenv('CONSUMER_SECRET', '<consumer-secret>')
oauth_access_token = os.getenv('TOKEN', '<token>')
oauth_access_token_secret = os.getenv('TOKEN_SECRET', '<token-secret>')
client = tweepy.Client(
consumer_key=oauth_consumer_key,
consumer_secret=oauth_consumer_secret,
access_token=oauth_access_token,
access_token_secret=oauth_access_token_secret)
res = client.get_user(username=twitter_username, user_auth=True)
#pprint(res.data)
twitter_user_id = res.data.id
print(f'==> twitter username[{twitter_username}], id = [{twitter_user_id}]')
PHOTO_PREFIX_TO_URLS = {}
def collect_photo_urls(response):
#pprint(response)
liked_tweets = response.data
result_count = response.meta['result_count']
next_token = response.meta['next_token']
print(f'==> {len(liked_tweets)} tweets retrieved, result_count={result_count}, next_token=[{next_token}]')
media_key_to_photo_urls = {}
medias = response.includes['media']
for media in medias:
media_key = media.media_key
media_type = media.type
media_url = media.url if hasattr(media, 'url') else ''
#print(f'===>> key={media_key}, type={media_type}, url={media_url}')
if media_url:
media_key_to_photo_urls[media_key] = media_url
#pprint(media_key_to_photo_urls)
for liked_tweet in liked_tweets:
tweet_id = liked_tweet.id
tweet_text = liked_tweet.text
tweet_attachments = liked_tweet.attachments
tweet_created_at = liked_tweet.created_at
#print(f'===>> id={tweet_id}, created_at={tweet_created_at}, attachments={tweet_attachments}')
# timestamp_prefix = datetime.datetime.fromisoformat(tweet_created_at).strftime('%Y%m%d%H%M%S')
timestamp_prefix = tweet_created_at.strftime('%Y%m%d%H%M%S')
id_suffix3 = str(tweet_id)[-3:]
prefix0 = timestamp_prefix + id_suffix3
#print(prefix0)
if not tweet_attachments:
continue
if 'media_keys' not in tweet_attachments:
continue
for idx, media_key in enumerate(tweet_attachments['media_keys']):
prefix = f'{prefix0}_{idx}'
photo_url = media_key_to_photo_urls[media_key] if media_key in media_key_to_photo_urls else ''
#print(f'===>> prefix={prefix}, url={photo_url}')
if not photo_url:
print(f'#### NOTICE tweet id={tweet_id}, text={tweet_text} SKIPPED cause url is empty.')
continue
if prefix in PHOTO_PREFIX_TO_URLS:
print(f'#### WARN prefix {prefix} duplicated, SKIPPED for url={photo_url}')
continue
PHOTO_PREFIX_TO_URLS[prefix] = photo_url
#pprint(PHOTO_PREFIX_TO_URLS)
# MAX_RESULTS = 100
# LIMIT = 10
MAX_RESULTS = 100
LIMIT = 30
for response in tweepy.Paginator(client.get_liked_tweets,
id=twitter_user_id,
user_auth=True,
max_results=MAX_RESULTS,
expansions=['attachments.media_keys'],
media_fields=['type', 'url'],
tweet_fields=['attachments', 'created_at'],
limit=LIMIT):
collect_photo_urls(response)
time.sleep(1)
#pprint(PHOTO_PREFIX_TO_URLS)
TOTAL_SIZE = len(PHOTO_PREFIX_TO_URLS)
print(f'==> photo url : {TOTAL_SIZE} urls collected.')
save_dirname = './images_' + datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S')
os.mkdir(save_dirname)
count = 1
for prefix, url in PHOTO_PREFIX_TO_URLS.items():
urlpath = urllib.parse.urlparse(url).path
urlext = os.path.splitext(urlpath)[1] # we get ".ext" string
localfilepath = f'{save_dirname}/{prefix}{urlext}'
print(f'{count}/{TOTAL_SIZE}, {url} -> {localfilepath}')
try:
with urllib.request.urlopen(url) as urlconn:
data = urlconn.read()
with open(localfilepath, mode='wb') as localfile:
localfile.write(data)
except urllib.error.URLError as e:
print(e)
count += 1
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment