Skip to content

Instantly share code, notes, and snippets.

@Nanguage
Last active July 5, 2019 06:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Nanguage/d37ec45f5744a245f800b5ea9d0bfb53 to your computer and use it in GitHub Desktop.
Save Nanguage/d37ec45f5744a245f800b5ea9d0bfb53 to your computer and use it in GitHub Desktop.
Fetch pictures in Douban Album
"""
Fetch pictures in Douban Album
via Douban API V2(https://douban-api-docs.zce.me/photo.html)
"""
import os
import sys
import json
from typing import Dict, List, Iterator, Tuple
import shutil
from concurrent.futures import ThreadPoolExecutor
from itertools import tee
import logging
import requests
from tqdm import tqdm
LOGGING_FMT = "%(levelname)-7s @ %(asctime)s: %(message)s"
LOGGING_DATE_FMT = "%m/%d/%y %H:%M:%S"
class Fetch(object):
def __init__(self, album_ids:Dict[str, int], saving_path:str, workers=10):
self.album_ids = album_ids
self.saving_path = saving_path
self.workers = workers
self.log = logging.getLogger("__name__")
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(logging.Formatter(fmt=LOGGING_FMT, datefmt=LOGGING_DATE_FMT))
self.log.addHandler(handler)
self.log.setLevel(logging.DEBUG)
def run(self):
for album, id_ in self.album_ids.items():
self.log.info("="*20+album+"="*20)
self.log.info
album_path = os.path.join(self.saving_path, album)
if os.path.exists(album_path):
self.log.warning(f"remove {album_path}")
shutil.rmtree(album_path)
os.mkdir(album_path)
photo_list = self.fetch_photo_list(id_)
self.log.info(f"Number of photos: {len(photo_list)}")
self.log.info("begin download:")
for fname, img in tqdm(self.fetch_photos(photo_list), total=len(photo_list)):
path = os.path.join(album_path, fname)
self.save_photo(path, img)
def fetch_photo_list(self, alb_id:int) -> List[Dict]:
def request(params={}):
self.log.debug(f'request list API with params: {params}')
r = requests.get(f"https://api.douban.com/v2/album/{alb_id}/photos", params=params)
if r.status_code != 200:
raise IOError(str(r), str(r.content))
return json.loads(r.content)
resp = request()
photos = resp['photos']
total = resp['total']
self.log.info(f"Total photos: {total}")
remain = total - len(photos)
while remain > 0:
batch = min(100, remain)
resp = request({'start':len(photos), 'count': batch})
photos += resp['photos']
remain = total - len(photos)
return photos
def fetch_photos(self, photos:List[Dict]) -> Iterator[Tuple[str, bytes]]:
fnames, urls = tee((d['large'] for d in photos))
fnames = (url.split('/')[-1] for url in fnames)
def _download(url) -> bytes:
r = requests.get(url)
return r.content
pool = ThreadPoolExecutor(self.workers)
for fname, img in zip(fnames, pool.map(_download, urls)):
yield fname, img
def save_photo(self, path:str, img:bytes):
with open(path, 'wb') as f:
f.write(img)
if __name__ == "__main__":
albums = { # albums and it's ID
"ent_manga": 35190909,
}
saving_path = "/home/nanguage/Pictures/douban" # where to save photos
fetch = Fetch(albums, saving_path)
fetch.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment