Last active
July 5, 2019 06:26
-
-
Save Nanguage/d37ec45f5744a245f800b5ea9d0bfb53 to your computer and use it in GitHub Desktop.
Fetch pictures in Douban Album
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Fetch pictures in Douban Album | |
via Douban API V2(https://douban-api-docs.zce.me/photo.html) | |
""" | |
import os | |
import sys | |
import json | |
from typing import Dict, List, Iterator, Tuple | |
import shutil | |
from concurrent.futures import ThreadPoolExecutor | |
from itertools import tee | |
import logging | |
import requests | |
from tqdm import tqdm | |
LOGGING_FMT = "%(levelname)-7s @ %(asctime)s: %(message)s" | |
LOGGING_DATE_FMT = "%m/%d/%y %H:%M:%S" | |
class Fetch(object): | |
def __init__(self, album_ids:Dict[str, int], saving_path:str, workers=10): | |
self.album_ids = album_ids | |
self.saving_path = saving_path | |
self.workers = workers | |
self.log = logging.getLogger("__name__") | |
handler = logging.StreamHandler(sys.stderr) | |
handler.setFormatter(logging.Formatter(fmt=LOGGING_FMT, datefmt=LOGGING_DATE_FMT)) | |
self.log.addHandler(handler) | |
self.log.setLevel(logging.DEBUG) | |
def run(self): | |
for album, id_ in self.album_ids.items(): | |
self.log.info("="*20+album+"="*20) | |
self.log.info | |
album_path = os.path.join(self.saving_path, album) | |
if os.path.exists(album_path): | |
self.log.warning(f"remove {album_path}") | |
shutil.rmtree(album_path) | |
os.mkdir(album_path) | |
photo_list = self.fetch_photo_list(id_) | |
self.log.info(f"Number of photos: {len(photo_list)}") | |
self.log.info("begin download:") | |
for fname, img in tqdm(self.fetch_photos(photo_list), total=len(photo_list)): | |
path = os.path.join(album_path, fname) | |
self.save_photo(path, img) | |
def fetch_photo_list(self, alb_id:int) -> List[Dict]: | |
def request(params={}): | |
self.log.debug(f'request list API with params: {params}') | |
r = requests.get(f"https://api.douban.com/v2/album/{alb_id}/photos", params=params) | |
if r.status_code != 200: | |
raise IOError(str(r), str(r.content)) | |
return json.loads(r.content) | |
resp = request() | |
photos = resp['photos'] | |
total = resp['total'] | |
self.log.info(f"Total photos: {total}") | |
remain = total - len(photos) | |
while remain > 0: | |
batch = min(100, remain) | |
resp = request({'start':len(photos), 'count': batch}) | |
photos += resp['photos'] | |
remain = total - len(photos) | |
return photos | |
def fetch_photos(self, photos:List[Dict]) -> Iterator[Tuple[str, bytes]]: | |
fnames, urls = tee((d['large'] for d in photos)) | |
fnames = (url.split('/')[-1] for url in fnames) | |
def _download(url) -> bytes: | |
r = requests.get(url) | |
return r.content | |
pool = ThreadPoolExecutor(self.workers) | |
for fname, img in zip(fnames, pool.map(_download, urls)): | |
yield fname, img | |
def save_photo(self, path:str, img:bytes): | |
with open(path, 'wb') as f: | |
f.write(img) | |
if __name__ == "__main__": | |
albums = { # albums and it's ID | |
"ent_manga": 35190909, | |
} | |
saving_path = "/home/nanguage/Pictures/douban" # where to save photos | |
fetch = Fetch(albums, saving_path) | |
fetch.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment