Skip to content

Instantly share code, notes, and snippets.

@Big-al
Created May 22, 2023 16:14
Show Gist options
  • Save Big-al/a05269b90cb16269968f63018f7b93ac to your computer and use it in GitHub Desktop.
Save Big-al/a05269b90cb16269968f63018f7b93ac to your computer and use it in GitHub Desktop.
Python script to ingest PDFs and other text document types to Elasticsearch in a simple schema, page by page.
import datetime
import json
import os
from pathlib import Path
from elasticsearch import Elasticsearch
from PyPDF2 import PdfReader
from ebooklib import epub
import mammoth
# ====================== Settings ======================
# The folder path containing the files
folder_path = "./pdfs"
# index name
index_name = "documents_by_pages"
# Elasticsearch connection details
es_host = "es.host.com"
es_port = 443
# Elasticsearch credentials if required
es_username = "myUser"
es_password = "myPa$$word"
# Elasticsearch connection setup
es = Elasticsearch(
hosts=[{"scheme": "https", "host": es_host, "port": es_port}],
basic_auth=(es_username, es_password) if es_username and es_password else None,
)
# ====================== Script ======================
def upload_bulk_to_elasticsearch(es, actions):
response = es.bulk(index=index_name, operations=actions)
success = 0
fails = 0
for result in response.body["items"]:
# print(result)
if result["index"]["status"] == 200 or result["index"]["status"] == 201:
success += 1
else:
print(result["index"]["status"])
fails += 1
print("Successfull: " + str(success) + " Fails: " + str(fails))
def extract_text_from_pdf(pdf_path):
pages = []
# Get PDF
try:
pdf = PdfReader(pdf_path)
print("Ingestion type: PDF, Length: " + str(len(pdf.pages)) + " pages, Path: " + str(pdf_path))
# print(pdf.pages[40].extract_text())
except:
print(" == ERROR | BOOK == Skipping book " + str(pdf_path) + ", Due to error.")
return pages
# Extract text:
for page_number in range(len(pdf.pages)):
try:
pages.append(pdf.pages[page_number].extract_text())
except:
print(
" == ERROR | PAGE == Skipping page"
+ str(page_number)
+ " from book: "
+ str(pdf_path)
+ ", Due to error."
)
return pages
def extract_text_from_txt(txt_path):
with open(txt_path, "r", encoding="utf-8") as file:
content = file.read()
return content
def extract_text_from_markdown(md_path):
with open(md_path, "rb") as file:
result = mammoth.extract_raw_text(file)
content = result.value.decode("utf-8")
return content
def extract_text_from_word(docx_path):
with open(docx_path, "rb") as file:
result = mammoth.extract_raw_text(
file, "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
)
content = result.value.decode("utf-8")
return content
def extract_text_from_file(file_path):
file_extension = file_path.suffix.lower()
if file_extension == ".pdf":
return extract_text_from_pdf(file_path)
elif file_extension == ".txt":
return extract_text_from_txt(file_path)
elif file_extension == ".md":
return extract_text_from_markdown(file_path)
elif file_extension in [".doc", ".docx"]:
return extract_text_from_word(file_path)
else:
return ""
# ====================== Settings ======================
# Iterate over files in the folder
actions = []
for filename in os.listdir(folder_path):
# Extract Author if possible
author = "NA"
if " by " in filename: # anything from title after " by "
author = filename.split(" by ")[1].removesuffix(".pdf")
file_path = os.path.join(folder_path, filename)
if os.path.isfile(file_path):
pages = extract_text_from_file(Path(file_path))
for page_number in range(len(pages)):
id_field = filename + str(page_number + 1)
action = {"index": {"_index": index_name, "_id": id_field}}
doc = {
"id": id_field,
"author": author,
"page_number": page_number + 1,
"filename": filename,
"category": os.path.basename(folder_path),
"content": pages[page_number],
"@timestamp": datetime.datetime.now().replace(microsecond=0).isoformat(),
}
actions.append(action)
actions.append(doc)
if len(actions) >= 300:
print("Uploading " + str(len(actions)) + " documents to ES.")
upload_bulk_to_elasticsearch(es, actions)
actions = []
upload_bulk_to_elasticsearch(es, actions)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment