Skip to content

Instantly share code, notes, and snippets.

@BharatKalluri
Last active September 7, 2021 06:56
Show Gist options
  • Save BharatKalluri/1b3c7fd88a780a9cdd99063715a5baa1 to your computer and use it in GitHub Desktop.
Save BharatKalluri/1b3c7fd88a780a9cdd99063715a5baa1 to your computer and use it in GitHub Desktop.
BWB import bot

How the Pipeline Works:

  • Our project lives in a directory called /imports
  • Within /imports we have a sqlite db called /imports/bwb-import-state.db
  • Each month a cron runs which creates a directory called /imports/YYYY-MM and downloads several CSVs
  • We concat each of these data files into a single monthly csv called yyyy-mm.csv which gets fed into bwb-import-bot.py
  • bwb-import-bot.py loads each record of yyyy-mm.csv into /imports/bwb-import-state.db and sets a batch_id of yyyy-mm
  • The end result is a single sqlite table bwb-import-state.db which contains all unique records from every batch yyyy-mm

Notes:

  • Batches: The batch_id field lets us run multiple different months in parallel, rm / roll back failed batches, and check progress or resume batches
  • Idempotence: If the same batch e.g. yyyy-mm is run multiple times, we can use the wc -l (line count of yyyy-mm.csv) v. the count of records with batch_id=yyyy-mm in /imports/bwb-import-state.db to know where to resume.

Questions:

  • When an entry is imported successfully, it will be removed from the sqlitedb?
    • Currently, there is no cleanup. This will be worked on next!
#!/usr/bin/env python3
import csv
import datetime
import json
import logging
import os.path
import sqlite3
import sys
from enum import Enum
from sqlite3 import Connection, Cursor
from typing import Optional
import numpy as np
import pandas as pd
from olclient import OpenLibrary
from pandarallel import pandarallel
from requests.adapters import HTTPAdapter, Response
from requests.packages.urllib3.util.retry import Retry
ol = OpenLibrary()
OL_IMPORT_API_URL = "https://www.openlibrary.org/api/import"
adapter = HTTPAdapter(max_retries=Retry(total=5, read=5, connect=5, backoff_factor=0.3))
ol.session.mount("https://", adapter)
pandarallel.initialize()
# fake = Faker()
# Faker.seed(57)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("bwb-import-bot")
CLI_HELP_TEXT = """
usage:
python3.9 bwb-import-bot.py setup_db 2021-05.csv bwb-import-state.db batch_id_here
python3.9 bwb-import-bot.py process bwb-import-state.db 1
The last arg is the limit, if not provided will default to 10000
"""
COL_NUMBER_TO_COL_NAME_MAP = {
10: "title",
135: "publisher",
20: "publication_date",
19: "copyright",
124: "isbn",
36: "pages",
37: "language",
54: "issn",
145: "doi",
146: "lccn",
147: "lc_class",
49: "dewey",
39: "weight",
40: "length",
41: "width",
42: "height",
}
class ImportStatusEnum(Enum):
SUCCESS = "SUCCESS"
ERROR = "ERROR"
TO_BE_IMPORTED = "TO_BE_IMPORTED"
def pre_setup_db(sqlite_conn: Connection) -> None:
# Setting up the table and a unique index on month and line number to prevent repeated imports of the same data
sqlite_conn.execute(
"""
CREATE TABLE if not exists "import_state" (
"line_number" INTEGER,
"title" TEXT,
"publisher" TEXT,
"publication_date" INTEGER,
"copyright" REAL,
"isbn" INTEGER,
"pages" INTEGER,
"language" TEXT,
"issn" TEXT,
"doi" TEXT,
"lccn" TEXT,
"lc_class" TEXT,
"dewey" TEXT,
"weight" REAL,
"length" REAL,
"width" REAL,
"height" REAL,
"subjects" TEXT,
"contributors_dicts" TEXT,
"status" TEXT,
"batch_id" TEXT,
"comment" TEXT
)
"""
)
sqlite_conn.execute(
"""
create unique index if not exists batch_id_line_no_uniq
on import_state(batch_id, line_number)
"""
)
def get_subjects(row: pd.Series) -> list[str]:
subjects_list = row.iloc[91:100]
return [
s.capitalize().replace("_", ", ")
for s in subjects_list
if s and pd.notnull(s) and isinstance(s, str)
]
def make_author(contributor: list[str]) -> dict:
author = {"name": contributor[0]}
if contributor[2] == "X":
# set corporate contributor
author["entity_type"] = "org"
# TODO: sort out contributor types
# AU = author
# ED = editor
return author
def get_contributors(row: pd.Series) -> list[dict]:
row_as_list = list(row)
contributors = []
for i in range(5):
contributors.append(
[row_as_list[21 + i * 3], row_as_list[22 + i * 3], row_as_list[23 + i * 3]]
)
# form list of author dicts
return [make_author(c) for c in contributors if c[0] and pd.notnull(c[0])]
def setup_db_from_csv_dump(
sqlite_connection: Connection, csv_file_path: str, batch_id_str: str
) -> None:
# can also import multiple csv files here using glob patterns
df: pd.DataFrame = pd.read_csv(
csv_file_path, sep="|", header=None, quoting=csv.QUOTE_NONE
)
df["subjects"] = df.parallel_apply(
lambda row: json.dumps(get_subjects(row)), axis=1
)
df["contributors_dicts"] = df.parallel_apply(
lambda row: json.dumps(get_contributors(row)), axis=1
)
required_col_list = list(COL_NUMBER_TO_COL_NAME_MAP.keys())
required_col_list.extend(["subjects", "contributors_dicts"])
df = df[required_col_list].rename(columns=COL_NUMBER_TO_COL_NAME_MAP)
# Dropping duplicate ISBNs
df = df.drop_duplicates(subset=["isbn"], keep="last")
df["status"] = ImportStatusEnum.TO_BE_IMPORTED.name
df["batch_id"] = batch_id_str
df["comment"] = np.NaN
logger.info(f"Inserting {len(df)} rows into database")
# Inserting into the sqlite db
df.to_sql(
con=sqlite_connection,
name="import_state",
chunksize=10000,
method="multi",
if_exists="append",
index_label="line_number",
)
def update_status_and_comment(
sqlite_conn: Connection,
cursor: Cursor,
batch_id_str: str,
line_number: int,
status: str,
comment: Optional[str],
):
# TODO: Check success here
# TODO: Use parameters in query rather than string interpolation
sanitized_comment = comment if comment else "NULL"
cursor.execute(
f"UPDATE import_state set comment='{sanitized_comment}', status='{status}' where import_state.line_number={line_number} and import_state.batch_id='{batch_id_str}' "
)
sqlite_conn.commit()
def ol_import(payload: dict):
# TODO: refactor this, very verbose code!
import_payload = {}
title = payload.get("title")
isbn_13 = payload.get("isbn")
publish_data = payload.get("publication_date")
publisher = payload.get("publisher")
authors = payload.get("contributors_dicts")
lc_classifications = payload.get("lc_class")
no_pages = payload.get("pages")
languages = payload.get("language")
subjects = payload.get("subjects")
if title:
import_payload["title"] = title
if isbn_13:
import_payload["isbn_13"] = isbn_13
if publish_data:
import_payload["publish_data"] = str(publish_data)[:4]
if publisher:
import_payload["publishers"] = [publisher]
if authors:
import_payload["authors"] = authors
if lc_classifications:
import_payload["lc_classifications"] = lc_classifications
if no_pages is not None:
import_payload["number_of_pages"] = no_pages
if languages:
import_payload["languages"] = [languages]
if subjects:
import_payload["subjects"] = subjects
logger.info("hitting import with")
logger.info(import_payload)
# Excluding source records here. TODO
r: Response = ol.session.post(OL_IMPORT_API_URL, data=json.dumps(import_payload))
if r.status_code != 200:
logger.error(
{"status_code": r.status_code, "content": r.content, "level": "error"}
)
raise Exception(r.content)
def process_row_and_import_to_ol(
sqlite_conn: Connection, cursor: Cursor, row_data: pd.Series
):
bwb_data = dict(row_data)
batch_id_from_data = bwb_data["batch_id"]
line_number = bwb_data["line_number"]
assert bwb_data["status"] == ImportStatusEnum.TO_BE_IMPORTED.name
try:
bwb_data["subjects"] = json.loads(bwb_data["subjects"])
bwb_data["contributors_dicts"] = json.loads(bwb_data["contributors_dicts"])
ol_import(bwb_data)
update_status_and_comment(
sqlite_conn=sqlite_conn,
cursor=cursor,
batch_id_str=batch_id_from_data,
line_number=line_number,
status=ImportStatusEnum.SUCCESS.name,
comment=None,
)
except Exception as e:
update_status_and_comment(
sqlite_conn=sqlite_conn,
cursor=cursor,
batch_id_str=batch_id_from_data,
line_number=line_number,
status=ImportStatusEnum.ERROR.name,
comment=str(e),
)
def process_imports_in_batches(sqlite_conn: Connection, batch_size: int) -> None:
while True:
df = pd.read_sql(
f"SELECT * FROM import_state WHERE import_state.status=='TO_BE_IMPORTED' LIMIT {batch_size}",
db_connection,
)
logger.info(f"Processing {len(df)} records")
if len(df) == 0:
# All TO_BE_IMPORTED are processed!
return
cursor = sqlite_conn.cursor()
df.apply(
lambda row_data: process_row_and_import_to_ol(
sqlite_conn=sqlite_conn, cursor=cursor, row_data=row_data
),
axis=1,
)
if __name__ == "__main__":
start_time: datetime.datetime = datetime.datetime.now()
cli_args: list[str] = sys.argv
logger.info(f"started at {start_time}. with args {cli_args[1:]}")
LIMIT = 1
if len(cli_args) > 0 and cli_args[1] == "setup_db":
csv_path = cli_args[2]
db_path = cli_args[3]
batch_id_from_args = cli_args[4]
batch_id_file_name = os.path.split(csv_path)
batch_id, ext = batch_id_file_name[-1].split(".")
db_connection: Connection = sqlite3.connect(db_path)
pre_setup_db(db_connection)
setup_db_from_csv_dump(
sqlite_connection=db_connection,
csv_file_path=csv_path,
batch_id_str=batch_id_from_args,
)
elif len(cli_args) > 0 and cli_args[1] == "process":
db_path = cli_args[2]
limit = cli_args[3] if len(cli_args) > 2 else 10000
db_connection: Connection = sqlite3.connect(db_path)
process_imports_in_batches(db_connection, limit)
else:
print(CLI_HELP_TEXT)
end_time = datetime.datetime.now()
logger.info(f"Ended on {end_time}. Took {(end_time - start_time).seconds} seconds")
pandas==1.2.4
numpy==1.20.2
pandarallel==1.5.2
Faker==8.1.2
openlibrary-client==0.0.30
requests==2.23.0
@mekarpeles
Copy link

We may also have to import csv

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment