Skip to content

Instantly share code, notes, and snippets.

@BharatKalluri
Last active December 9, 2021 18:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BharatKalluri/b4978cd97250fcd69f90e3f9ee4590e2 to your computer and use it in GitHub Desktop.
Save BharatKalluri/b4978cd97250fcd69f90e3f9ee4590e2 to your computer and use it in GitHub Desktop.
Script to run through a zip files containing ISBN 13 file named covers for OpenLibrary
import datetime
import os
import sys
import zipfile
from zipfile import ZipFile, ZipInfo
from olclient import OpenLibrary, config
from sqlmodel import Field, SQLModel, create_engine, Session, select
import logging
from typing import List
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s:%(message)s',
handlers=[
logging.FileHandler("debug.log"),
logging.StreamHandler()
]
)
ol_access_key = os.environ['OL_ACCESS_KEY']
ol_secret_key = os.environ['OL_SECRET_KEY']
ol = OpenLibrary(credentials=config.Credentials(access=ol_access_key, secret=ol_secret_key))
class EditionCoverData(SQLModel, table=True):
isbn_13: str = Field(default=None, primary_key=True)
cover_exists: bool = Field(default=False)
engine = create_engine("sqlite:///db.sqlite", echo=False)
SQLModel.metadata.create_all(engine)
db_session = Session(engine)
def update_cover_for_edition(
edition_olid: str,
file_name: str,
cover_data: bytes,
mime_type: str
) -> bool:
form_data_body = {
"file": (file_name, cover_data, mime_type),
"url": (None, "https://"),
"upload": (None, "Submit")
}
resp = ol.session.post(
f'https://openlibrary.org/books/{edition_olid}/-/add-cover',
files=form_data_body
)
is_update_success: bool = resp.ok and b'Saved!' in resp.content
return is_update_success
def is_cover_already_stored(isbn_13: str) -> bool:
statement = select(EditionCoverData).where(
EditionCoverData.isbn_13 == isbn_13
)
edition_cover_data = db_session.execute(statement).first()
return (edition_cover_data is not None) and \
(len(edition_cover_data) == 1) and \
(edition_cover_data[0].cover_exists is True)
def verify_and_update_cover(isbn_13: str, archive_contents: ZipFile) -> None:
if is_cover_already_stored(isbn_13):
return
ol_edition = ol.Edition.get(isbn=isbn_13)
if not ol_edition:
return
edition_olid = ol_edition.olid
# TODO: double check this
cover_exists = hasattr(ol_edition, 'covers') and (ol_edition.covers is not None) and (ol_edition.covers != [-1])
if cover_exists is True:
db_session.bulk_save_objects([
EditionCoverData(isbn_13=isbn_13, cover_exists=True)
])
db_session.commit()
logging.debug(f'cover exists in OL for {isbn_13}')
return
is_success = update_cover_for_edition(
edition_olid=edition_olid,
cover_data=archive_contents.read(f'{isbn_13}.jpg'),
file_name=f'{isbn_13}.jpg',
mime_type='image/jpeg'
)
db_session.bulk_save_objects([
EditionCoverData(isbn_13=isbn_13, cover_exists=is_success)
])
db_session.commit()
logging.info(f'cover update status {is_success} for {isbn_13}')
return
def main(cover_zip_path: str):
logging.info(f"start time: {datetime.datetime.now().timestamp()}")
archive_contents: ZipFile = zipfile.ZipFile(cover_zip_path, 'r')
file_list: List[ZipInfo] = archive_contents.filelist
processed_file_list = []
for file in file_list:
logging.info(f'processing {file.filename}, processed {len(processed_file_list)} files')
isbn_of_file: str = file.filename.split('.')[0]
verify_and_update_cover(isbn_of_file, archive_contents)
processed_file_list.append(processed_file_list)
logging.info(f"end time: {datetime.datetime.now().timestamp()}")
if __name__ == '__main__':
args = sys.argv
if len(args) != 2:
raise Exception('python main.py <zip path>')
user_provided_path = args[1]
zip_paths = (
[
os.path.join(user_provided_path, f)
for f in os.listdir(user_provided_path)
if f.endswith('.zip')
]
if os.path.isdir(user_provided_path)
else [user_provided_path]
)
for zip_path in zip_paths:
logging.info(f"Processing: {zip_path}")
main(zip_path)
sqlmodel
git+git://github.com/internetarchive/openlibrary-client
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment