Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Seleniumを使用してTwitterメディアダウンローダを自動化するスクリプト。
import argparse
import glob
import os
import re
import shutil
import signal
import time
import zipfile
from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.ui import WebDriverWait
download_dir = os.path.dirname(__file__) + "\\download\\"
temp_dir = download_dir + "temp\\"
os.makedirs(temp_dir, exist_ok=True)
extract_dir = temp_dir + "extract\\"
zip_dir = download_dir + "zip\\"
user_data_dir = "C:\\Users\\kood\\AppData\\Local\\Google\\Chrome\\User Data\\"
profile_name = "Default"
driver_path = os.path.dirname(__file__) + "\\chromedriver.exe"
class TwitterUser():
def __init__(self, name, download_all):
self.name = name
self.a_name = "@" + name
self.url = "https://twitter.com/" + name
self.dir = download_dir + self.a_name + "\\"
self.downloaded_id = None
self.get_latest_downloaded_id(download_all)
def get_latest_downloaded_id(self, download_all):
if not download_all and os.path.exists(self.dir):
all_log = glob.glob(self.dir + "*.log")
if len(all_log) > 0:
ids = []
for log in all_log:
try:
downloaded_id = re.findall(r"-[0-9]*\(", str(log))[1]
downloaded_id = re.sub("(\-|\()", "", downloaded_id)
ids.append(int(downloaded_id))
except:
pass
if len(ids):
self.downloaded_id = max(ids)
def media_downloader(self, driver, limit):
print("ユーザー名 :", self.a_name)
driver.get(self.url)
# 目的のクラスが現れるまで最大5秒待機
wait = WebDriverWait(driver, 5)
class_name = "twMediaDownloader_download_button_container"
try:
twmd_element = wait.until(expected_conditions.visibility_of_element_located((By.CLASS_NAME, class_name)))
except:
print("Twitterメディアダウンローダ : このユーザーからはダウンロード出来ません。")
return False
# ダイアログを表示
md_element = twmd_element.find_element_by_class_name("twMediaDownloader_download_button")
md_element.click()
# ID範囲を空にし、制限数を設定
toolbox_element = driver.find_element_by_class_name("twMediaDownloader_toolbox")
since_element = toolbox_element.find_element_by_name("since_id")
until_element = toolbox_element.find_element_by_name("until_id")
since_element.clear()
until_element.clear()
if self.downloaded_id is not None:
since_element.send_keys(self.downloaded_id)
limit_elem = toolbox_element.find_element_by_name("limit")
limit_elem.clear()
limit_elem.send_keys(limit)
# "画像", "動画(GIF)", "動画" にチェックを入れ、"メディア無し", "RT含む" のチェックを外す
checkbox_class_names = [
"twMediaDownloader_checkbox_image",
"twMediaDownloader_checkbox_gif",
"twMediaDownloader_checkbox_video",
"twMediaDownloader_checkbox_nomedia",
"twMediaDownloader_checkbox_include_retweets"
]
checkboxes_element = driver.find_element_by_class_name("twMediaDownloader_checkbox_container")
for cb_class_name in checkbox_class_names:
cb = checkboxes_element.find_element_by_class_name(cb_class_name)
cb_status = cb.find_element_by_tag_name("input").is_selected()
if cb_class_name == "twMediaDownloader_checkbox_image" and not cb_status:
cb.click()
if cb_class_name == "twMediaDownloader_checkbox_gif" and not cb_status:
cb.click()
if cb_class_name == "twMediaDownloader_checkbox_video" and not cb_status:
cb.click()
if cb_class_name == "twMediaDownloader_checkbox_nomedia" and cb_status:
cb.click()
if cb_class_name == "twMediaDownloader_checkbox_include_retweets" and cb_status:
cb.click()
# "走査のみ" のチェックを外す
dry_run_cb = driver.find_element_by_class_name("twMediaDownloader_checkbox_dry_run")
dry_run_cb_status = dry_run_cb.find_element_by_tag_name("input").is_selected()
if dry_run_cb_status:
dry_run_cb.click()
# 開始
print("Twitterメディアダウンローダ : 開始します。")
start_button = driver.find_element_by_class_name("twMediaDownloader_button_start")
start_button.click()
# 処理が完了するのを待つ
print("Twitterメディアダウンローダ : 処理中...")
while True:
# 処理完了までステータスを確認し続ける。ここではとりあえず3秒毎に。
time.sleep(3)
try:
status_elem = driver.find_element_by_class_name("twMediaDownloader_status_bar")
status_str = status_elem.get_attribute("innerHTML")
if "Done" in status_str or "No applicable tweets found" in status_str:
# 処理終了。
# ここで数秒待たないとZIP作成中に展開処理に入ってしまいZIPが見つからずプログラムが終了する可能性がある。
time.sleep(5)
break
except:
pass
print("Twitterメディアダウンローダ : 完了しました。")
return True
def extract(self, zip_remove):
os.makedirs(extract_dir, exist_ok=True)
all_zip = glob.glob(temp_dir + self.name + "*.zip")
all_zip_len = len(all_zip)
if all_zip_len>0:
for i, zip in enumerate(all_zip):
zip_name = os.path.basename(zip)
print("ZIPを展開 :", str(i+1) + "/" + str(all_zip_len))
with zipfile.ZipFile(zip) as f:
f.extractall(extract_dir)
if zip_remove:
print("ZIPを削除 :", str(i+1) + "/" + str(all_zip_len))
os.remove(zip)
else:
os.makedirs(zip_dir, exist_ok=True)
if not os.path.exists(zip_dir + zip_name):
shutil.move(zip, zip_dir)
else:
os.remove(zip)
os.makedirs(self.dir, exist_ok=True)
extracted_files = glob.glob(extract_dir + "*")
img_list = [f for f in extracted_files if re.match(".*(-img).*", f)]
video_list = [f for f in extracted_files if re.match(".*\.mp4", f)]
print("画像の枚数 :", len(img_list))
print("動画の個数 :", len(video_list))
for file in extracted_files:
file_name = os.path.basename(file)
if not os.path.exists(self.dir + file_name):
shutil.move(file, self.dir)
else:
os.remove(file)
else:
if self.downloaded_id is not None:
print(self.a_name, "は以前のチェック以降に画像を投稿していません。")
else:
print(self.a_name, "からは何もダウンロード出来ませんでした。")
def setup_driver():
options = ChromeOptions()
options.add_argument("user-data-dir=" + user_data_dir)
options.add_argument("profile-directory=" + profile_name)
prefs = {"download.default_directory" : temp_dir}
options.add_experimental_option("prefs", prefs)
options.add_experimental_option("excludeSwitches", ["enable-logging"])
driver = webdriver.Chrome(driver_path, options=options)
return driver
def get_names():
name_dirs = glob.glob(download_dir + "@*")
names = []
if len(name_dirs)>0:
sp = os.sep
for dir in name_dirs:
name = dir[dir.rfind(sp)+1:]
names.append(name)
return names
else:
return names
def main(names, limit, download_all, zip_remove):
if names[0]=="@@":
names = get_names()
if len(names)>0:
try:
driver = setup_driver()
for name in names:
name = name.replace("@", "")
user = TwitterUser(name, download_all)
completed = user.media_downloader(driver, limit)
if completed:
user.extract(zip_remove)
finally:
print("プログラムを終了します。")
driver.quit()
#ブラウザを開いた状態で処理を終了する場合は以下
#os.kill(driver.service.process.pid,signal.SIGTERM)
exit()
else:
print("ユーザー名が用意出来ないため、プログラムを終了します。")
exit()
if __name__ == "__main__":
try:
arg_parser = argparse.ArgumentParser()
help1 = "ユーザー名を入力。複数設定する場合はカンマ区切りで入力。@を2つ入力で、過去にDLした事があるユーザーの最新画像をチェック。"
help2 = "Twitterメディアダウンローダの制限数。(設定しない場合は200)"
arg_parser.add_argument("name", help=help1)
arg_parser.add_argument("-l", "--limit", help=help2)
arg_parser.add_argument("-a", "--all", action="store_true", help="ダウンロード済みの最新IDのチェックを行わないフラグ。")
arg_parser.add_argument("-r", "--remove", action="store_true", help="ZIP解凍後にZIPを削除するフラグ。")
args = arg_parser.parse_args()
names = args.name.split(",")
limit = args.limit
limit_type = type(limit).__name__
if limit_type=="NoneType":
limit = 200
else:
limit = int(limit)
download_all = args.all
zip_remove = args.remove
main(names, limit, download_all, zip_remove)
except:
def test():
# テスト用
names = ["テストに使用するID"]
limit = 200
download_all = False
zip_remove = False
main(names, limit, download_all, zip_remove)
#test()
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment