Skip to content

Instantly share code, notes, and snippets.

@akwodkiewicz
Last active March 28, 2018 23:02
Show Gist options
  • Save akwodkiewicz/93bb84346a8177801546ab881a69aeeb to your computer and use it in GitHub Desktop.
Save akwodkiewicz/93bb84346a8177801546ab881a69aeeb to your computer and use it in GitHub Desktop.
Quick usage of Polona API
import requests
from pprint import pprint
base_url = 'https://polona.pl/api/'
entities = 'entities/?format=json'
filters = '&filters=category:serials+public:1+has_text_content:1+language:polski'
seed = '&seed=8'
size = '&size=45'
resp = requests.get(base_url + entities + size + seed + filters)
answer = resp.json()['hits']
list_of_ids = []
for hit in answer:
list_of_ids.append(hit['id'])
# pprint(list_of_ids)
for _id in list_of_ids:
resp = requests.get(base_url + 'entities/' + _id + '?format=json')
recources = resp.json()['resources']
pdf_url = None
for resource in recources:
if resource['mime'] == 'application/pdf':
pdf_url = resource['url']
if not pdf_url:
print("Id {} jest do kitu".format(_id))
else:
file_response = requests.get(pdf_url)
with open('{}.pdf'.format(_id), 'wb') as f:
f.write(file_response.content)
print("Downloaded {}.pdf -- {} -- ({})".format(_id, resp.json()['title'], resp.json()['date_descriptive']))
import asyncio
import datetime
import sys
import requests
import pathlib
import functools
from pprint import pprint
BASE_URL = 'https://polona.pl/api/'
_PDF_OUTPUT_PATH_STR = 'data'
_FILTERS_DICT = {
"category": "serials",
"public": 1,
"has_text_content": 1,
"language": "polski"
}
PDF_OUTPUT_PATH = pathlib.Path(_PDF_OUTPUT_PATH_STR)
FILTERS = '+'.join([f"{k}:{v}" for k, v in _FILTERS_DICT.items()])
SIZE = 45
CHUNK_SIZE = 10
def remove_dangerous_characters(filename):
"""Elliminates dangerous characters from `filename`"""
table = str.maketrans(r'<>:"/\|?*', r"---'--###")
return filename.translate(table)
def get_entities(seed_start, seed_end):
"""
Search for files that match `FILTERS`. Parameters `seed_start`, `seed_max` are used
to iterate the seed for server-side pagination of results.
Returns a list of `(url, filename)` tuples describing files that are not yet downloaded.
Files inside `PDF_OUTPUT_PATH` with the same filename as `filename` are excluded from the result.
"""
def get_pdf_url_and_filename(hit):
"""Helper function that extracts `(url, filename)` tuple from `hit` from a json response"""
recources = hit['resources']
pdf_url = None
# Look for the url to PDF in the resources list
for resource in recources:
if resource['mime'] == 'application/pdf':
pdf_url = resource['url']
# Format nicely the date inside `hit['date']`
date = datetime.datetime.strptime(hit['date'],
'%Y-%m-%dT%H:%M:%S').date()
title = remove_dangerous_characters(hit['title'])
filename = f"[{date}] {title}"
return pdf_url, filename
async def get_urls(seed):
"""
Asynchronously search for files that match `FILTERS`. Parameter `seed` is used
for server-side pagination of results.
Returns a list of `(url, filename)` tuples
"""
url = BASE_URL + f'entities/?format=json&seed={seed}&size={SIZE}&filters={FILTERS}'
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, requests.get, url)
response = await future
return [
get_pdf_url_and_filename(hit) for hit in response.json()['hits']
]
print(
f'- Searching for {(seed_end-seed_start) * SIZE} entities matching filters: {_FILTERS_DICT}'
)
loop = asyncio.get_event_loop()
tasks = []
for seed in range(seed_start, seed_end):
tasks.append(asyncio.ensure_future(get_urls(seed)))
gathered = asyncio.gather(*tasks)
loop.run_until_complete(gathered)
# List of all the files returned from server queries
all_entities = [item for sub in gathered.result() for item in sub]
# List of files with the same filename found inside `PDF_OUTPUT_PATH` directory
already_downloaded = [
url for (url, info) in all_entities
if pathlib.Path(PDF_OUTPUT_PATH, info).exists()
]
print(
f'- Skipping {len(already_downloaded)} urls (PDFs found in {PDF_OUTPUT_PATH.absolute()})'
)
return [(url, info) for (url, info) in all_entities
if url and url not in already_downloaded]
def get_pdfs(entities):
"""Downloads a list of PDFs contained in `entities` tuple and saves them info `PDF_OUTPUT_PATH`"""
counter = 1
async def get_pdf(url, filename):
"""Asynchronously download one pdf from `url` and save it to `filename`"""
nonlocal counter
loop = asyncio.get_event_loop()
filepath = pathlib.Path(PDF_OUTPUT_PATH, filename)
# Using `functools.partial()` to pass more than 1 argument to `requests.get()` inside `run_in_executor`
future = loop.run_in_executor(None,
functools.partial(
requests.get, url, timeout=3))
# Catching timeouts and connection errors
try:
response = await future
except requests.exceptions.ReadTimeout as e:
print('! ' + str(e))
return
except requests.exceptions.ConnectionError as e:
print('! ' + str(e))
return
# Writing data to file
try:
with open(filepath, 'wb') as f:
f.write(response.content)
except FileNotFoundError as e:
print('! ' + str(e))
except IOError as e:
print('! ' + str(e))
return
print(f'[{counter}] Downloaded {filename}')
counter += 1
return
print(
f'- Downloading PDFs to {PDF_OUTPUT_PATH.absolute()} in chunks of {CHUNK_SIZE}'
)
loop = asyncio.get_event_loop()
tasks = []
# Downloading asynchronously `CHUNK_SIZE` number of PDFs at once
# ...because we don't want to kill the server with `len(entities)` requests at the same time
for i in range(0, len(entities), CHUNK_SIZE):
print(f'--- Downloading PDFs {i+1}-{i+CHUNK_SIZE}:')
for url, filename in entities[i:i + CHUNK_SIZE]:
tasks.append(asyncio.ensure_future(get_pdf(url, filename)))
gathered = asyncio.gather(*tasks)
loop.run_until_complete(gathered)
return counter - 1
def main():
helper_msg = ''
if len(sys.argv) == 1:
seed_min = 0
seed_max = 1
helper_msg = f'\n- Using `python {sys.argv[0]} seed` you can change the seed of query'
elif len(sys.argv) == 2:
seed_min = int(sys.argv[1])
seed_max = seed_min + 1
helper_msg = f'\n- Using `python {sys.argv[0]} seed_min seed_max` you can iterate seeds'
else:
seed_min = int(sys.argv[1])
seed_max = int(sys.argv[2])
print(80 * '-')
print(f"""Running polona-api-downloader with:
seeds = [{seed_min},{seed_max})
SIZE={SIZE}
PDF_OUTPUT_PATH={PDF_OUTPUT_PATH.absolute()}
CHUNK_SIZE={CHUNK_SIZE}""" + helper_msg)
print(80 * '-')
if not PDF_OUTPUT_PATH.is_dir():
PDF_OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
print(f'- Created directory {PDF_OUTPUT_PATH.absolute()}')
entities = get_entities(seed_min, seed_max)
print(f'- Acquired {len(entities)} PDF urls.')
downloaded_num = get_pdfs(entities)
print(f'- Downloaded {downloaded_num} PDFs')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment