Created
January 4, 2022 02:46
Revisions
-
msakamoto-sf created this gist
Jan 4, 2022 .There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,142 @@ # Twitter で「いいね!」した画像をローカルにダウンロードする Python スクリプト。 # (自分自身のアカウントを前提) # # requirements: python3 (>= 3.10) # # setup: # 1. `python3 -m venv venv` # 2. `pip install tweepy==4.4.0` # (if you installed via windows installer, try 'py -m venv venv') # # prepare key and secrets: # 1. sign up https://developer.twitter.com/en # 2. create project and app # 3. save your app's consumer keys (API Key and Secret) # 4. open your app's "Keys and tokens" page, then generate your Access Token and Secret # # run: # 1. set environment values (see python script below) # 2. `python3 liked-image-downloader.py` # - (for win: `py liked-image-downloader.py`) # # reference: # - https://www.tweepy.org/ # - https://docs.tweepy.org/en/v4.4.0/index.html # - Twitterで「いいね」した画像を一括保存するPHPとTwitterAPI | キノコログ # - https://kinocolog.com/favo_image_download/ # - Twitterいいね画像定期保存Bot - Qiita # - https://qiita.com/hxbdy625/items/3806bdc2b36f64986119 import os import datetime import time from pprint import pprint import urllib import tweepy twitter_username =os.getenv('TWITTER_USERNAME', '') oauth_consumer_key = os.getenv('CONSUMER_KEY', '<consumer-key>') oauth_consumer_secret = os.getenv('CONSUMER_SECRET', '<consumer-secret>') oauth_access_token = os.getenv('TOKEN', '<token>') oauth_access_token_secret = os.getenv('TOKEN_SECRET', '<token-secret>') client = tweepy.Client( consumer_key=oauth_consumer_key, consumer_secret=oauth_consumer_secret, access_token=oauth_access_token, access_token_secret=oauth_access_token_secret) res = client.get_user(username=twitter_username, user_auth=True) #pprint(res.data) twitter_user_id = res.data.id print(f'==> twitter username[{twitter_username}], id = [{twitter_user_id}]') PHOTO_PREFIX_TO_URLS = {} def collect_photo_urls(response): #pprint(response) liked_tweets = response.data result_count = response.meta['result_count'] next_token = response.meta['next_token'] print(f'==> {len(liked_tweets)} tweets retrieved, result_count={result_count}, next_token=[{next_token}]') media_key_to_photo_urls = {} medias = response.includes['media'] for media in medias: media_key = media.media_key media_type = media.type media_url = media.url if hasattr(media, 'url') else '' #print(f'===>> key={media_key}, type={media_type}, url={media_url}') if media_url: media_key_to_photo_urls[media_key] = media_url #pprint(media_key_to_photo_urls) for liked_tweet in liked_tweets: tweet_id = liked_tweet.id tweet_text = liked_tweet.text tweet_attachments = liked_tweet.attachments tweet_created_at = liked_tweet.created_at #print(f'===>> id={tweet_id}, created_at={tweet_created_at}, attachments={tweet_attachments}') # timestamp_prefix = datetime.datetime.fromisoformat(tweet_created_at).strftime('%Y%m%d%H%M%S') timestamp_prefix = tweet_created_at.strftime('%Y%m%d%H%M%S') id_suffix3 = str(tweet_id)[-3:] prefix0 = timestamp_prefix + id_suffix3 #print(prefix0) if not tweet_attachments: continue if 'media_keys' not in tweet_attachments: continue for idx, media_key in enumerate(tweet_attachments['media_keys']): prefix = f'{prefix0}_{idx}' photo_url = media_key_to_photo_urls[media_key] if media_key in media_key_to_photo_urls else '' #print(f'===>> prefix={prefix}, url={photo_url}') if not photo_url: print(f'#### NOTICE tweet id={tweet_id}, text={tweet_text} SKIPPED cause url is empty.') continue if prefix in PHOTO_PREFIX_TO_URLS: print(f'#### WARN prefix {prefix} duplicated, SKIPPED for url={photo_url}') continue PHOTO_PREFIX_TO_URLS[prefix] = photo_url #pprint(PHOTO_PREFIX_TO_URLS) # MAX_RESULTS = 100 # LIMIT = 10 MAX_RESULTS = 100 LIMIT = 30 for response in tweepy.Paginator(client.get_liked_tweets, id=twitter_user_id, user_auth=True, max_results=MAX_RESULTS, expansions=['attachments.media_keys'], media_fields=['type', 'url'], tweet_fields=['attachments', 'created_at'], limit=LIMIT): collect_photo_urls(response) time.sleep(1) #pprint(PHOTO_PREFIX_TO_URLS) TOTAL_SIZE = len(PHOTO_PREFIX_TO_URLS) print(f'==> photo url : {TOTAL_SIZE} urls collected.') save_dirname = './images_' + datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S') os.mkdir(save_dirname) count = 1 for prefix, url in PHOTO_PREFIX_TO_URLS.items(): urlpath = urllib.parse.urlparse(url).path urlext = os.path.splitext(urlpath)[1] # we get ".ext" string localfilepath = f'{save_dirname}/{prefix}{urlext}' print(f'{count}/{TOTAL_SIZE}, {url} -> {localfilepath}') try: with urllib.request.urlopen(url) as urlconn: data = urlconn.read() with open(localfilepath, mode='wb') as localfile: localfile.write(data) except urllib.error.URLError as e: print(e) count += 1 time.sleep(1)