-
-
Save fullstackwebdev/58990adc08fa888f5323b830a8f6ecfe to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
from pydoc import doc | |
import queue | |
import random | |
import shutil | |
import tempfile | |
import threading | |
from typing import List, Union | |
import pandas as pd | |
from regex import P | |
import requests | |
from tqdm import tqdm | |
from utils.load_documents import DocumentData, load_data | |
import dspy | |
import time | |
import warnings | |
warnings.filterwarnings("ignore", category=DeprecationWarning) | |
API_BASE = 'https://api.freegpt.today/v1/' | |
# MODEL_NAME = requests.get(API_BASE+'models').json()['data'][0]['id'] | |
# # MODEL_NAME ='mixtral-turbo' | |
MODEL_NAME ='gpt-3.5-turbo' | |
# turbo = dspy.OpenAI(model="your model", , api_base=API_BASE, api_key='asdf', timeout=200) | |
turbo = dspy.OpenAI(model_type='chat', top_p=1, presence_penalty=0.1, model=MODEL_NAME, temperature=0.1, max_tokens=6000, api_base=API_BASE, api_key='asdf', timeout=200) | |
dspy.settings.configure(lm=turbo) | |
from unstructured.chunking.basic import chunk_elements | |
from unstructured.partition.auto import partition | |
from unstructured.chunking.title import chunk_by_title | |
from unstructured.cleaners.core import group_broken_paragraphs, clean_ordered_bullets, clean_non_ascii_chars, clean_dashes, clean_bullets, replace_unicode_quotes, clean | |
# unstructured.documents.elements.ElementMetadata | |
from unstructured.documents.elements import ElementMetadata | |
def custom_serializer(obj): | |
if hasattr(obj, '__dict__'): | |
return obj.__dict__ # Serialize objects by turning their __dict__ attribute into a dictionary | |
elif isinstance(obj, frozenset): | |
return list(obj) # Convert frozenset to list, which is serializable | |
raise TypeError(f'Object of type {obj.__class__.__name__} is not JSON serializable') | |
def pretty_print(data): | |
print(json.dumps(data, indent=4, default=custom_serializer, sort_keys=True)) | |
################ | |
############# | |
from pydantic import BaseModel, Field | |
from typing import List, Optional | |
class Document(BaseModel): | |
metadata: Optional[Union[ElementMetadata, dict]] | |
text: str | |
summary: str | |
idx: int | |
class Config: | |
arbitrary_types_allowed = True | |
def process_documents_and_generate_summary(input_directory, output_directory, chunk_size, chunk_overlap, MAX_SUMMARY_LENGTH=4000): | |
documents = [] | |
def elements_by_filename(filename): | |
try: | |
_input_file_path = os.path.join(input_directory, filename) | |
if os.path.isfile(_input_file_path): | |
print(f"Processing file: {_input_file_path}") | |
elements = partition(filename=_input_file_path, strategy="fast", include_page_breaks=True) | |
if elements is None: | |
print(f"No elements found in file: {filename}") | |
return [] | |
return elements | |
except Exception as e: | |
print(f"Error processing file {filename}: {e}") | |
return [] | |
for filename in os.listdir(input_directory): | |
file_path = os.path.join(input_directory, filename) | |
if not os.path.isfile(file_path): | |
return documents | |
# Rest of the code | |
elements = elements_by_filename(filename) | |
if elements: | |
chunks = chunk_by_title( | |
elements=elements, | |
max_characters=8000, | |
overlap=2000, | |
# overlap_all=chunk_overlap, | |
# new_after_n_chars=1000, # the “soft” maximum size for a chunk. A chunk that already exceeds this number of characters will not be extended, even if the next element would fit | |
) | |
idx = 0 | |
for chunk in chunks: | |
text = chunk.text | |
metadata = chunk.metadata | |
original = chunk.metadata.orig_elements | |
# page_numbers = {e.metadata.page_number for e in chunk.metadata.orig_elements} | |
page_numbers = [chunk.metadata.page_number for e in chunk.metadata.orig_elements] | |
# print("\n\n".join([str(el) for el in original][5:15])) | |
# data.append({"name": filename, "text": text, "metadata": metadata, "page_numbers": page_numbers}) | |
# input() | |
doc = Document( | |
metadata=chunk.metadata, | |
text=chunk.text, | |
summary="", | |
idx=idx | |
) | |
documents.append(doc) | |
idx += 1 | |
else: | |
print(f"Skipping file {filename} due to no elements.") | |
return documents | |
file_names = [d['name'] for d in data] | |
pretty_print(file_names) | |
documents = process_documents_and_generate_summary('/home/fullstack/books', '/home/fullstack/books/output', 1024, 512, 4000) | |
# if not documents: | |
# print("No documents chunks to process.") | |
# exit() | |
# for doc in documents: | |
# print(f"Index: {doc.idx}") | |
# print(f"Document: {doc.metadata.filename}") | |
# # print(f"Page numbers: {doc.metadata.page_numbers}") | |
# print(f"Text: {doc.text}") | |
# print(f"Summary: {doc.summary[:100]}") | |
# print("\n\n") | |
############## | |
class ConciseSummary(dspy.Signature): | |
"""Summarize this.""" | |
# previous_summary: Optional[str] = dspy.InputField(desc="The summary of the previous context.", format=str) | |
document: str = dspy.InputField(format=lambda x: "\n===\n" + str(x) + "\n===\n") | |
previous_summary = dspy.InputField(desc="Sometimes empty", format=str) | |
summary: str = dspy.OutputField(format=str) | |
class Reflect(dspy.Signature): | |
"""Given the original context and a summary of it, reflect on the summary and provide feedback. How would you rewrite the summary to include more details? Point out the missing details and suggest ways to include them in the summary, but not by repeating the same information. """ | |
context: str = dspy.InputField(format=lambda x: "\n===\n" + str(x) + "\n===\n") | |
summary: str = dspy.InputField(format=lambda x: "\n===\n" + str(x) + "\n===\n") | |
feedback: str = dspy.OutputField(desc="Usually less than a few words") | |
class RewriteSummary(dspy.Signature): | |
"""Given a summary, context and feedback, and previous summary continue the previous summary but use the feedback to rewrite it better. Compress the text and reduce the content by half but follow the feedback and keep the same level of detail, in a fluid continuation way""" | |
context: str = dspy.InputField(format=lambda x: "\n===\n" + str(x) + "\n===\n") | |
previous_summary: Optional[str] = dspy.InputField(desc="continue from this", format=lambda x: "\n===\n" + str(x) + "\n===\n") | |
summary: str = dspy.InputField(format=lambda x: "\n===\n" + str(x) + "\n===\n") | |
feedback: str = dspy.InputField(format=lambda x: "\n===\n" + str(x) + "\n===\n") | |
rewritten_summary: str = dspy.OutputField(format=str) | |
class ConciseSummaryModule(dspy.Module): | |
def __init__(self): | |
self.summary = dspy.Predict(ConciseSummary) | |
self.reflect = dspy.ChainOfThought(Reflect) | |
self.rewrite = dspy.ChainOfThought(RewriteSummary) | |
def forward(self, sliding_context: str, previous_summary: str): | |
# if sliding_context is None or "", throw exception | |
if not sliding_context: | |
# raise ValueError("Sliding context cannot be empty.") | |
# empty input | |
return dspy.Prediction(summary="") | |
summary = self.summary(document=sliding_context, previous_summary=previous_summary).summary | |
feedback = self.reflect(context=sliding_context, summary=summary).feedback | |
rewritten_summary = self.rewrite(context=sliding_context, summary=summary, feedback=feedback, previous_summary=previous_summary).rewritten_summary | |
return dspy.Prediction(summary=rewritten_summary) | |
# return dspy.Prediction(summary=summary) | |
def main_loop(q, pbar, OUTPUT_FILE): | |
while True: | |
doc = q.get() | |
previous_summary = q.get() | |
if doc is None: | |
q.task_done() | |
break # Sentinel value to end thread | |
if previous_summary is None: | |
previous_summary = "" | |
print(f"Previous summary is None.") | |
q.task_done() | |
# exit() | |
# break # Sentinel value to end thread | |
CoT = ConciseSummaryModule() | |
# print(f"Processing document {doc.idx} with content length {len(doc.text)}") | |
with dspy.context(lm=turbo): | |
_summary = CoT(sliding_context=doc.text, previous_summary=previous_summary) | |
# summary = _summary.summary | |
print(f"Summary: {_summary}") | |
# # print() | |
# print(f"All of _SUMMARY: {_summary}") | |
doc.summary = _summary.summary | |
with open(OUTPUT_FILE, "a") as f: | |
json.dump({ | |
"metadata": None, | |
"text": doc.text, | |
"summary": _summary.summary, | |
"idx": doc.idx | |
} | |
, | |
f) | |
f.write("\n") | |
print(f"Document {doc.idx} processed and written to file.") | |
q.task_done() | |
pbar.update(1) # Update progress bar | |
CoT = ConciseSummaryModule() | |
# lass Document(BaseModel): | |
# metadata: ElementMetadata | |
# text: str | |
# summary: str | |
# idx: int | |
# # page_numbers: List[int] | |
# class Config: | |
# arbitrary_types_allowed = True | |
OUTPUT_FILE = 'output.jsonl' | |
document_pass2 = [] | |
summaries = [] | |
previous_summary = None | |
for chunk in documents[:3]: | |
with dspy.context(lm=turbo): | |
_summary = CoT(sliding_context=chunk.text, previous_summary=previous_summary) | |
chunk.summary = _summary.summary | |
previous_summary = chunk.summary | |
doc = Document( | |
# metadata=chunk.metadata.to_dict(), | |
metadata=None, | |
text=chunk.text, | |
summary=chunk.summary, | |
idx=chunk.idx | |
) | |
print(f"Document: {doc}") | |
document_pass2.append(doc) | |
summaries.append(chunk.summary) | |
with open(OUTPUT_FILE, "a") as f: | |
json.dump({ | |
"metadata": doc.metadata, | |
"text": doc.text, | |
"summary": _summary.summary, | |
"idx": doc.idx | |
} | |
, | |
f) | |
f.write("\n") | |
print(f"Document {doc.idx} processed and written to file.") | |
summaries_all_in_one_text = "\n\n".join(summaries) | |
_summary = CoT(sliding_context=summaries_all_in_one_text, previous_summary="").summary | |
print(f"Summary: {_summary}") | |
# Function to initiate processing with threading and tqdm | |
def process_documents_with_threads(document_data_list, num_threads=5, OUTPUT_FILE="output.jsonl"): | |
q = queue.Queue() | |
threads = [] | |
# Initialize tqdm progress bar | |
pbar = tqdm(total=len(document_data_list[5:])) | |
# Start threads | |
for _ in range(num_threads): | |
t = threading.Thread(target=main_loop, args=(q, pbar, OUTPUT_FILE)) | |
t.start() | |
threads.append(t) | |
# Add documents to queue | |
idx=0 | |
for document in document_data_list[5:]: | |
# document.idx = idx | |
# idx+=1 | |
q.put(document) | |
q.put(_summary) | |
# Add sentinel values to queue to signal threads to stop | |
for _ in range(num_threads): | |
q.put(None) | |
q.put(None) | |
# Wait for all threads to complete | |
for t in threads: | |
t.join() | |
pbar.close() # Close progress bar | |
process_documents_with_threads(documents, num_threads=5, OUTPUT_FILE="output.jsonl") #working but no way to save | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment