Skip to content

Instantly share code, notes, and snippets.

@fullstackwebdev
Created May 2, 2024 00:54
Show Gist options
  • Save fullstackwebdev/58990adc08fa888f5323b830a8f6ecfe to your computer and use it in GitHub Desktop.
Save fullstackwebdev/58990adc08fa888f5323b830a8f6ecfe to your computer and use it in GitHub Desktop.
import json
import os
from pydoc import doc
import queue
import random
import shutil
import tempfile
import threading
from typing import List, Union
import pandas as pd
from regex import P
import requests
from tqdm import tqdm
from utils.load_documents import DocumentData, load_data
import dspy
import time
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
API_BASE = 'https://api.freegpt.today/v1/'
# MODEL_NAME = requests.get(API_BASE+'models').json()['data'][0]['id']
# # MODEL_NAME ='mixtral-turbo'
MODEL_NAME ='gpt-3.5-turbo'
# turbo = dspy.OpenAI(model="your model", , api_base=API_BASE, api_key='asdf', timeout=200)
turbo = dspy.OpenAI(model_type='chat', top_p=1, presence_penalty=0.1, model=MODEL_NAME, temperature=0.1, max_tokens=6000, api_base=API_BASE, api_key='asdf', timeout=200)
dspy.settings.configure(lm=turbo)
from unstructured.chunking.basic import chunk_elements
from unstructured.partition.auto import partition
from unstructured.chunking.title import chunk_by_title
from unstructured.cleaners.core import group_broken_paragraphs, clean_ordered_bullets, clean_non_ascii_chars, clean_dashes, clean_bullets, replace_unicode_quotes, clean
# unstructured.documents.elements.ElementMetadata
from unstructured.documents.elements import ElementMetadata
def custom_serializer(obj):
if hasattr(obj, '__dict__'):
return obj.__dict__ # Serialize objects by turning their __dict__ attribute into a dictionary
elif isinstance(obj, frozenset):
return list(obj) # Convert frozenset to list, which is serializable
raise TypeError(f'Object of type {obj.__class__.__name__} is not JSON serializable')
def pretty_print(data):
print(json.dumps(data, indent=4, default=custom_serializer, sort_keys=True))
################
#############
from pydantic import BaseModel, Field
from typing import List, Optional
class Document(BaseModel):
metadata: Optional[Union[ElementMetadata, dict]]
text: str
summary: str
idx: int
class Config:
arbitrary_types_allowed = True
def process_documents_and_generate_summary(input_directory, output_directory, chunk_size, chunk_overlap, MAX_SUMMARY_LENGTH=4000):
documents = []
def elements_by_filename(filename):
try:
_input_file_path = os.path.join(input_directory, filename)
if os.path.isfile(_input_file_path):
print(f"Processing file: {_input_file_path}")
elements = partition(filename=_input_file_path, strategy="fast", include_page_breaks=True)
if elements is None:
print(f"No elements found in file: {filename}")
return []
return elements
except Exception as e:
print(f"Error processing file {filename}: {e}")
return []
for filename in os.listdir(input_directory):
file_path = os.path.join(input_directory, filename)
if not os.path.isfile(file_path):
return documents
# Rest of the code
elements = elements_by_filename(filename)
if elements:
chunks = chunk_by_title(
elements=elements,
max_characters=8000,
overlap=2000,
# overlap_all=chunk_overlap,
# new_after_n_chars=1000, # the “soft” maximum size for a chunk. A chunk that already exceeds this number of characters will not be extended, even if the next element would fit
)
idx = 0
for chunk in chunks:
text = chunk.text
metadata = chunk.metadata
original = chunk.metadata.orig_elements
# page_numbers = {e.metadata.page_number for e in chunk.metadata.orig_elements}
page_numbers = [chunk.metadata.page_number for e in chunk.metadata.orig_elements]
# print("\n\n".join([str(el) for el in original][5:15]))
# data.append({"name": filename, "text": text, "metadata": metadata, "page_numbers": page_numbers})
# input()
doc = Document(
metadata=chunk.metadata,
text=chunk.text,
summary="",
idx=idx
)
documents.append(doc)
idx += 1
else:
print(f"Skipping file {filename} due to no elements.")
return documents
file_names = [d['name'] for d in data]
pretty_print(file_names)
documents = process_documents_and_generate_summary('/home/fullstack/books', '/home/fullstack/books/output', 1024, 512, 4000)
# if not documents:
# print("No documents chunks to process.")
# exit()
# for doc in documents:
# print(f"Index: {doc.idx}")
# print(f"Document: {doc.metadata.filename}")
# # print(f"Page numbers: {doc.metadata.page_numbers}")
# print(f"Text: {doc.text}")
# print(f"Summary: {doc.summary[:100]}")
# print("\n\n")
##############
class ConciseSummary(dspy.Signature):
"""Summarize this."""
# previous_summary: Optional[str] = dspy.InputField(desc="The summary of the previous context.", format=str)
document: str = dspy.InputField(format=lambda x: "\n===\n" + str(x) + "\n===\n")
previous_summary = dspy.InputField(desc="Sometimes empty", format=str)
summary: str = dspy.OutputField(format=str)
class Reflect(dspy.Signature):
"""Given the original context and a summary of it, reflect on the summary and provide feedback. How would you rewrite the summary to include more details? Point out the missing details and suggest ways to include them in the summary, but not by repeating the same information. """
context: str = dspy.InputField(format=lambda x: "\n===\n" + str(x) + "\n===\n")
summary: str = dspy.InputField(format=lambda x: "\n===\n" + str(x) + "\n===\n")
feedback: str = dspy.OutputField(desc="Usually less than a few words")
class RewriteSummary(dspy.Signature):
"""Given a summary, context and feedback, and previous summary continue the previous summary but use the feedback to rewrite it better. Compress the text and reduce the content by half but follow the feedback and keep the same level of detail, in a fluid continuation way"""
context: str = dspy.InputField(format=lambda x: "\n===\n" + str(x) + "\n===\n")
previous_summary: Optional[str] = dspy.InputField(desc="continue from this", format=lambda x: "\n===\n" + str(x) + "\n===\n")
summary: str = dspy.InputField(format=lambda x: "\n===\n" + str(x) + "\n===\n")
feedback: str = dspy.InputField(format=lambda x: "\n===\n" + str(x) + "\n===\n")
rewritten_summary: str = dspy.OutputField(format=str)
class ConciseSummaryModule(dspy.Module):
def __init__(self):
self.summary = dspy.Predict(ConciseSummary)
self.reflect = dspy.ChainOfThought(Reflect)
self.rewrite = dspy.ChainOfThought(RewriteSummary)
def forward(self, sliding_context: str, previous_summary: str):
# if sliding_context is None or "", throw exception
if not sliding_context:
# raise ValueError("Sliding context cannot be empty.")
# empty input
return dspy.Prediction(summary="")
summary = self.summary(document=sliding_context, previous_summary=previous_summary).summary
feedback = self.reflect(context=sliding_context, summary=summary).feedback
rewritten_summary = self.rewrite(context=sliding_context, summary=summary, feedback=feedback, previous_summary=previous_summary).rewritten_summary
return dspy.Prediction(summary=rewritten_summary)
# return dspy.Prediction(summary=summary)
def main_loop(q, pbar, OUTPUT_FILE):
while True:
doc = q.get()
previous_summary = q.get()
if doc is None:
q.task_done()
break # Sentinel value to end thread
if previous_summary is None:
previous_summary = ""
print(f"Previous summary is None.")
q.task_done()
# exit()
# break # Sentinel value to end thread
CoT = ConciseSummaryModule()
# print(f"Processing document {doc.idx} with content length {len(doc.text)}")
with dspy.context(lm=turbo):
_summary = CoT(sliding_context=doc.text, previous_summary=previous_summary)
# summary = _summary.summary
print(f"Summary: {_summary}")
# # print()
# print(f"All of _SUMMARY: {_summary}")
doc.summary = _summary.summary
with open(OUTPUT_FILE, "a") as f:
json.dump({
"metadata": None,
"text": doc.text,
"summary": _summary.summary,
"idx": doc.idx
}
,
f)
f.write("\n")
print(f"Document {doc.idx} processed and written to file.")
q.task_done()
pbar.update(1) # Update progress bar
CoT = ConciseSummaryModule()
# lass Document(BaseModel):
# metadata: ElementMetadata
# text: str
# summary: str
# idx: int
# # page_numbers: List[int]
# class Config:
# arbitrary_types_allowed = True
OUTPUT_FILE = 'output.jsonl'
document_pass2 = []
summaries = []
previous_summary = None
for chunk in documents[:3]:
with dspy.context(lm=turbo):
_summary = CoT(sliding_context=chunk.text, previous_summary=previous_summary)
chunk.summary = _summary.summary
previous_summary = chunk.summary
doc = Document(
# metadata=chunk.metadata.to_dict(),
metadata=None,
text=chunk.text,
summary=chunk.summary,
idx=chunk.idx
)
print(f"Document: {doc}")
document_pass2.append(doc)
summaries.append(chunk.summary)
with open(OUTPUT_FILE, "a") as f:
json.dump({
"metadata": doc.metadata,
"text": doc.text,
"summary": _summary.summary,
"idx": doc.idx
}
,
f)
f.write("\n")
print(f"Document {doc.idx} processed and written to file.")
summaries_all_in_one_text = "\n\n".join(summaries)
_summary = CoT(sliding_context=summaries_all_in_one_text, previous_summary="").summary
print(f"Summary: {_summary}")
# Function to initiate processing with threading and tqdm
def process_documents_with_threads(document_data_list, num_threads=5, OUTPUT_FILE="output.jsonl"):
q = queue.Queue()
threads = []
# Initialize tqdm progress bar
pbar = tqdm(total=len(document_data_list[5:]))
# Start threads
for _ in range(num_threads):
t = threading.Thread(target=main_loop, args=(q, pbar, OUTPUT_FILE))
t.start()
threads.append(t)
# Add documents to queue
idx=0
for document in document_data_list[5:]:
# document.idx = idx
# idx+=1
q.put(document)
q.put(_summary)
# Add sentinel values to queue to signal threads to stop
for _ in range(num_threads):
q.put(None)
q.put(None)
# Wait for all threads to complete
for t in threads:
t.join()
pbar.close() # Close progress bar
process_documents_with_threads(documents, num_threads=5, OUTPUT_FILE="output.jsonl") #working but no way to save
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment