Last active
February 22, 2024 05:47
-
-
Save skeptrunedev/01681aaf795c0cf5ddf91580652d6b20 to your computer and use it in GitHub Desktop.
Trieve CSV iterator and uploader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import io | |
import json | |
import os | |
import re | |
import string | |
import requests | |
import tiktoken | |
import pandas as pd | |
from dotenv import load_dotenv | |
load_dotenv() | |
api_key = os.environ.get('API_KEY') | |
dataset_id = os.environ.get('DATASET_ID') | |
api_url = os.environ.get('API_URL') | |
target_sentence_count = int(os.environ.get('TARGET_SENTENCE_COUNT')) | |
min_sentence_count = int(os.environ.get('MIN_SENTENCE_COUNT')) | |
class Chunk: | |
def __init__(self, chunk_html, tag_set, metadata_dict): | |
self.chunk_html = chunk_html | |
self.tag_set = tag_set | |
self.metadata = metadata_dict | |
if not self.metadata: | |
print("Missing metadata.") | |
exit(1) | |
# TODO: verify that this is unique | |
self.tracking_id = self.metadata["Production_Begin_Bates"]+ "||" + self.metadata["Production_End_Bates"] | |
def to_json(self): | |
def replace_nan_none(obj): | |
if isinstance(obj, float) and (obj != obj or obj is None): | |
return "" | |
if obj is None: | |
return "" | |
if isinstance(obj, dict): | |
return {key: replace_nan_none(value) for key, value in obj.items()} | |
if isinstance(obj, list): | |
return [replace_nan_none(item) for item in obj] | |
return obj | |
json_dict = {key: replace_nan_none(value) for key, value in self.__dict__.items()} | |
return json.dumps(json_dict, sort_keys=True, default=str) | |
def send_post_request(self): | |
if count_sentences(self.chunk_html) < min_sentence_count: | |
return | |
url = f"{api_url}/chunk" | |
payload = self.to_json() | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization":api_key, | |
"TR-Dataset": dataset_id | |
} | |
req_result = requests.post(url, data=payload, headers=headers) | |
if req_result.status_code != 200: | |
req_error = req_result.text | |
print(req_error) | |
def count_sentences(input_string): | |
sentences = re.split(r'[.!?]', input_string) | |
sentences = [sentence.strip() for sentence in sentences if sentence.strip()] | |
sentence_count = len(sentences) | |
word_count = count_words(input_string) | |
if sentence_count * 10 > word_count: | |
return word_count // 10 | |
return sentence_count | |
def count_words(input_string): | |
words = input_string.split() | |
return len(words) | |
def has_only_spaces_special_chars_numbers(input_string): | |
stripped_string = input_string.replace(" ", "") | |
special_chars = string.punctuation | |
return all(char.isdigit() or char in special_chars for char in stripped_string) | |
def get_total_token_count(input_strings): | |
total_token_count = 0 | |
for input_string in input_strings: | |
total_token_count += len(encoding.encode(input_string)) | |
return total_token_count | |
def remove_hex_unicodes(input_string): | |
return input_string.encode('ascii', 'ignore').decode('ascii') | |
encoding = tiktoken.get_encoding("cl100k_base") | |
words_to_trigger_line_ignore = ['to:', 'cc:', 'from:', 'date:', "sent:", "forwarded message", "@", "───", "meeting id:", "password:", "all rights reserved", "ext.", "has invited you", "google llc", "shared with you", "...", "confidentiality notice", "information that is protected from disclosure", "you are not the intended recipient", "without disclosing or using", "wrote:", "tel:", "email:", ", ca", "road,", "ct,", "external email", "facsimile/email", "confidential or privileged", "distribution of this fax", "exclusive use of", "violation of federal", "under HIPAA", "for reproduction", "further distribution", "message are private", "unauthorized use", "notify the sender", "unauthorized interception", "outside of the ICR", "recognize the sender's email", "docs.google.com", "external sender", "trust this email", "content is safe", "proof of sender", "do not click", "mentioned in this thread", "google docs sends", "www.google.com", "zoom.us", "disclosure under HIPAA", "solely for use"] | |
words_trigger_email_start = ['to:', 'cc:', 'from:', 'date:', 'sent:', 'subject:', 're:', 'fw:', 'fwd:', 'attachments:', 'attached:', 'wrote:'] | |
words_trigger_email_end = ['forwarded message', 'has invited you', 'open in', 'google llc', 'original message', 'original message follows', '───' '--', '***', '===', 'regards,', 'from,', 'sincerely,', 'yours,', 'regards,', 'gratitude,', 'appreciation,', 'care,', 'cheers,', 'cordially,', 'gratitude,', 'respectfully,', 'warmly,', 'best,', 'wishes,', 'humbly,', "thanks,"] | |
def contains_email_start(line): | |
for word in words_trigger_email_start: | |
if word in line.lower(): | |
return True | |
return False | |
def contains_email_end(line): | |
for word in words_trigger_email_end: | |
if word in line.lower(): | |
return True | |
return False | |
def ignore_line(line): | |
if has_only_spaces_special_chars_numbers(line): | |
return True | |
for word in words_to_trigger_line_ignore: | |
if word in line.lower(): | |
return True | |
phone_pattern = r'\b\d{3}[-. ]?\d{3}[-. ]?\d{4}\b' | |
phone_numbers = re.findall(phone_pattern, line) | |
if len(phone_numbers) > 0: | |
return True | |
return False | |
# Returns True to cutoff, False to continue | |
def default_cutoff_or_continue(cur_chunk_text, line): | |
cur_sentence_count = count_sentences(cur_chunk_text) | |
new_line_sentence_count = count_sentences(line) | |
if cur_sentence_count + new_line_sentence_count > target_sentence_count and new_line_sentence_count >= 1: | |
return True | |
if get_total_token_count([cur_chunk_text, line]) >= 460: | |
return True | |
return False | |
# --- | |
def process_file(full_path, metadata_dict): | |
email_dict = { | |
'email_to': "", | |
'email_cc': "", | |
'email_from': "", | |
'email_sent': "", | |
'email_subject': "", | |
'email_re': "", | |
'email_fw': "", | |
'email_attachments': "" | |
} | |
def clear_email_vars(): | |
nonlocal email_dict | |
for key in email_dict: | |
email_dict[key] = "" | |
def set_email_vars(line): | |
nonlocal email_dict | |
line = line.replace("\n", "") | |
lowercase_line = line.lower() | |
keys = ["to:", "cc:", "from:", "sent:", "date:", "subject:", "re:", "fw:", "fwd:", "attachments:", "attached:"] | |
for key in keys: | |
key_translation = { | |
"to:": "email_to", | |
"cc:": "email_cc", | |
"from:": "email_from", | |
"sent:": "email_sent", | |
"date:": "email_sent", | |
"subject:": "email_subject", | |
"re:": "email_re", | |
"fw:": "email_fw", | |
"fwd:": "email_fw", | |
"attachments:": "email_attachments", | |
"attached:": "email_attachments" | |
} | |
split_line = lowercase_line.split(key) | |
if len(split_line) == 1: | |
continue | |
value_len = len(split_line[1]) | |
value = line[-value_len:].strip() | |
if not value: | |
continue | |
if value != email_dict[key_translation[key]]: | |
email_dict[key_translation[key]] = value if not email_dict[key_translation[key]] else email_dict[key_translation[key]] + "|" + value | |
with open(full_path, 'r', encoding="latin-1") as file: | |
cur_chunk_content = "" | |
next_nonsentence = "" | |
is_email = False | |
found_email_start = False | |
for line in file: | |
line = remove_hex_unicodes(line) | |
if contains_email_start(line): | |
if count_sentences(cur_chunk_content) >= 4: | |
metadata_dict.update(email_dict) | |
chunk = Chunk(cur_chunk_content, "email" if is_email else "", metadata_dict) | |
chunk.send_post_request() | |
clear_email_vars() | |
set_email_vars(line) | |
is_email = True | |
found_email_start = True | |
cur_chunk_content = "" | |
next_nonsentence = "" | |
continue | |
line = line.replace("\n", " ") | |
# split line into sentences | |
sentences = re.split(r'[.!?]', line) | |
if len(sentences) == 1: | |
if line != "\n" and not ignore_line(line): | |
next_nonsentence += line | |
continue | |
# set next_nonsentence to the last element of sentences | |
temp_next_nonsentence = sentences[-1] | |
# set line equal to itself minus the last element of sentences | |
line = line[:-len(temp_next_nonsentence)] | |
line = next_nonsentence + line | |
if ignore_line(line): | |
set_email_vars(line) | |
continue | |
# email cases | |
if is_email and not found_email_start: | |
cur_chunk_content = "" | |
next_nonsentence = "" | |
continue | |
if is_email and contains_email_end(line): | |
next_nonsentence = "" | |
found_email_start = False | |
if count_sentences(cur_chunk_content) < 4: | |
cur_chunk_content = "" | |
continue | |
metadata_dict.update(email_dict) | |
chunk = Chunk(cur_chunk_content, "email" if is_email else "", metadata_dict) | |
chunk.send_post_request() | |
cur_chunk_content = "" | |
clear_email_vars() | |
continue | |
if is_email: | |
next_nonsentence = temp_next_nonsentence | |
if len(cur_chunk_content) > 3000 or get_total_token_count([cur_chunk_content, line]) >= 8191: | |
metadata_dict.update(email_dict) | |
chunk = Chunk(cur_chunk_content, "email" if is_email else "", metadata_dict) | |
chunk.send_post_request() | |
cur_chunk_content = line | |
continue | |
cur_chunk_content += line | |
continue | |
# non-email cases | |
cutoff = default_cutoff_or_continue(cur_chunk_content, line) | |
if cutoff: | |
metadata_dict.update(email_dict) | |
chunk = Chunk(cur_chunk_content, "email" if is_email else "", metadata_dict) | |
chunk.send_post_request() | |
next_nonsentence = temp_next_nonsentence | |
cur_chunk_content = line | |
continue | |
elif not cutoff: | |
cur_chunk_content += line | |
next_nonsentence = temp_next_nonsentence | |
continue | |
metadata_dict.update(email_dict) | |
chunk = Chunk(cur_chunk_content, "email" if is_email else "", metadata_dict) | |
chunk.send_post_request() | |
def main(): | |
parser = argparse.ArgumentParser(description='Process files with an option for full path.') | |
parser.add_argument('--df_path', help='File path to a dataframe') | |
args = parser.parse_args() | |
if args.df_path is None: | |
print("Missing dataframe path.") | |
exit(1) | |
df_path = args.df_path | |
with open(df_path, 'r', encoding='utf-8') as file: | |
file_content = file.read().replace('þ', '') | |
text_buffer = io.StringIO(file_content) | |
# TODO: change this to the correct delimiter | |
delimiter = r'' | |
df = pd.read_csv(text_buffer, delimiter=delimiter, engine="python") | |
df.columns = df.columns.str.replace(' ', '_') | |
for _, row in df.iterrows(): | |
metadata_dict = row.to_dict() | |
# TODO: change this to the correct TXT file | |
txt_file_path = unimplemented! | |
process_file(txt_file_path, metadata_dict) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment