Created
May 22, 2023 16:14
-
-
Save Big-al/a05269b90cb16269968f63018f7b93ac to your computer and use it in GitHub Desktop.
Python script to ingest PDFs and other text document types to Elasticsearch in a simple schema, page by page.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import json | |
import os | |
from pathlib import Path | |
from elasticsearch import Elasticsearch | |
from PyPDF2 import PdfReader | |
from ebooklib import epub | |
import mammoth | |
# ====================== Settings ====================== | |
# The folder path containing the files | |
folder_path = "./pdfs" | |
# index name | |
index_name = "documents_by_pages" | |
# Elasticsearch connection details | |
es_host = "es.host.com" | |
es_port = 443 | |
# Elasticsearch credentials if required | |
es_username = "myUser" | |
es_password = "myPa$$word" | |
# Elasticsearch connection setup | |
es = Elasticsearch( | |
hosts=[{"scheme": "https", "host": es_host, "port": es_port}], | |
basic_auth=(es_username, es_password) if es_username and es_password else None, | |
) | |
# ====================== Script ====================== | |
def upload_bulk_to_elasticsearch(es, actions): | |
response = es.bulk(index=index_name, operations=actions) | |
success = 0 | |
fails = 0 | |
for result in response.body["items"]: | |
# print(result) | |
if result["index"]["status"] == 200 or result["index"]["status"] == 201: | |
success += 1 | |
else: | |
print(result["index"]["status"]) | |
fails += 1 | |
print("Successfull: " + str(success) + " Fails: " + str(fails)) | |
def extract_text_from_pdf(pdf_path): | |
pages = [] | |
# Get PDF | |
try: | |
pdf = PdfReader(pdf_path) | |
print("Ingestion type: PDF, Length: " + str(len(pdf.pages)) + " pages, Path: " + str(pdf_path)) | |
# print(pdf.pages[40].extract_text()) | |
except: | |
print(" == ERROR | BOOK == Skipping book " + str(pdf_path) + ", Due to error.") | |
return pages | |
# Extract text: | |
for page_number in range(len(pdf.pages)): | |
try: | |
pages.append(pdf.pages[page_number].extract_text()) | |
except: | |
print( | |
" == ERROR | PAGE == Skipping page" | |
+ str(page_number) | |
+ " from book: " | |
+ str(pdf_path) | |
+ ", Due to error." | |
) | |
return pages | |
def extract_text_from_txt(txt_path): | |
with open(txt_path, "r", encoding="utf-8") as file: | |
content = file.read() | |
return content | |
def extract_text_from_markdown(md_path): | |
with open(md_path, "rb") as file: | |
result = mammoth.extract_raw_text(file) | |
content = result.value.decode("utf-8") | |
return content | |
def extract_text_from_word(docx_path): | |
with open(docx_path, "rb") as file: | |
result = mammoth.extract_raw_text( | |
file, "application/vnd.openxmlformats-officedocument.wordprocessingml.document" | |
) | |
content = result.value.decode("utf-8") | |
return content | |
def extract_text_from_file(file_path): | |
file_extension = file_path.suffix.lower() | |
if file_extension == ".pdf": | |
return extract_text_from_pdf(file_path) | |
elif file_extension == ".txt": | |
return extract_text_from_txt(file_path) | |
elif file_extension == ".md": | |
return extract_text_from_markdown(file_path) | |
elif file_extension in [".doc", ".docx"]: | |
return extract_text_from_word(file_path) | |
else: | |
return "" | |
# ====================== Settings ====================== | |
# Iterate over files in the folder | |
actions = [] | |
for filename in os.listdir(folder_path): | |
# Extract Author if possible | |
author = "NA" | |
if " by " in filename: # anything from title after " by " | |
author = filename.split(" by ")[1].removesuffix(".pdf") | |
file_path = os.path.join(folder_path, filename) | |
if os.path.isfile(file_path): | |
pages = extract_text_from_file(Path(file_path)) | |
for page_number in range(len(pages)): | |
id_field = filename + str(page_number + 1) | |
action = {"index": {"_index": index_name, "_id": id_field}} | |
doc = { | |
"id": id_field, | |
"author": author, | |
"page_number": page_number + 1, | |
"filename": filename, | |
"category": os.path.basename(folder_path), | |
"content": pages[page_number], | |
"@timestamp": datetime.datetime.now().replace(microsecond=0).isoformat(), | |
} | |
actions.append(action) | |
actions.append(doc) | |
if len(actions) >= 300: | |
print("Uploading " + str(len(actions)) + " documents to ES.") | |
upload_bulk_to_elasticsearch(es, actions) | |
actions = [] | |
upload_bulk_to_elasticsearch(es, actions) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment