Skip to content

Instantly share code, notes, and snippets.

@luisriverag
Created May 10, 2023 16:07
Show Gist options
  • Save luisriverag/9a4ec2744e4ec95accc4b732e6d2e986 to your computer and use it in GitHub Desktop.
Save luisriverag/9a4ec2744e4ec95accc4b732e6d2e986 to your computer and use it in GitHub Desktop.
import os
import sys
import datetime
from docx import Document
import textract
import pymongo
from pymongo import MongoClient
import pytz
import nltk
nltk.download('punkt')
MONGODB_CONNECTION_STRING = os.environ.get('MONGODB_CONNECTION_STRING', 'mongodb://localhost:27017/')
def get_sections_titles(document):
section_titles = []
for paragraph in document.paragraphs:
if paragraph.style.name.startswith('Heading'):
section_titles.append(paragraph.text)
return section_titles
def process_paragraph(paragraph, current_section, file_name, index):
content = []
sentences = nltk.sent_tokenize(paragraph)
for sentence in sentences:
if sentence.strip():
content.append({
'paragraph_number': index,
'section_title': current_section,
'file_name': file_name,
'upload_date_time': datetime.datetime.now(pytz.utc),
'sentence_character_count': len(sentence),
'paragraph_character_count': len(paragraph),
'sentence': sentence.strip(),
})
return content
def process_docx(file_path):
doc = Document(file_path)
sections = get_sections_titles(doc)
content = []
current_section = ''
for index, paragraph in enumerate(doc.paragraphs):
if not paragraph.text.strip():
continue
if paragraph.style.name.startswith('Heading'):
current_section = paragraph.text
else:
content.extend(process_paragraph(paragraph.text, current_section, os.path.basename(file_path), index))
return content
def process_doc(file_path):
text = textract.process(file_path).decode('utf-8')
paragraphs = text.split('\n\n')
content = []
current_section = ''
for index, paragraph in enumerate(paragraphs):
if not paragraph.strip():
continue
if paragraph.strip().upper() == paragraph.strip():
current_section = paragraph.strip()
else:
content.extend(process_paragraph(paragraph, current_section, os.path.basename(file_path), index))
return content
def upload_to_mongodb(content):
with MongoClient(MONGODB_CONNECTION_STRING) as client:
db = client['word_data']
collection = db['sentences']
collection.insert_many(content)
def main(file_path):
_, file_extension = os.path.splitext(file_path)
if file_extension == '.docx':
content = process_docx(file_path)
elif file_extension == '.doc':
content = process_doc(file_path)
else:
print("Unsupported file type")
return
upload_to_mongodb(content)
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: python script.py <path_to_word_document>")
else:
main(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment