Created March 19, 2024 19:12
This script integrates the LCEL framework for data enhancement and applies custom runnables for MinIO and Weaviate operations, following the principles outlined in the provided pseudo code. Note that the actual implementation may require adjustments based on your environment and the specific functionalities of the `unstructured` library, which s…
from minio import Minio
from weaviate import Client
import io
import json
import time
from concurrent.futures import ThreadPoolExecutor
from langchain_core.runnables import Runnable, Chain
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.llms import ChatOpenAI, OpenAI
from langchain_anthropic import ChatAnthropic
import unstructured
# Define custom runnable for MinIO data extraction
class MinIOExtraction(Runnable):
def __init__(self, client, bucket_name, object_name):
self.client = client
self.bucket_name = bucket_name
self.object_name = object_name
def run(self):
response = self.client.get_object(self.bucket_name, self.object_name)
# Define custom runnable for Weaviate data storage
class WeaviateStorage(Runnable):
def __init__(self, client, data, class_name="Document"):
self.client = client = data
self.class_name = class_name
def run(self):
for item in
self.client.data_object.create(item, class_name=self.class_name)
# Initialize MinIO and Weaviate clients
minio_client = Minio('MINIO_ENDPOINT', access_key='YOUR_ACCESS_KEY', secret_key='YOUR_SECRET_KEY', secure=True)
weaviate_client = Client("http://WEAVIATE_ENDPOINT")
# Setup LangChain components
prompt = ChatPromptTemplate.from_template("Generate an enhanced version of the following text:\n\n{text}")
output_parser = StrOutputParser()
chat_openai = ChatOpenAI(model="gpt-3.5-turbo")
openai = OpenAI(model="gpt-3.5-turbo-instruct")
anthropic = ChatAnthropic(model="claude-2")
# Compose the chain
enhancement_chain = Chain([
def process_data(bucket_name, object_name):
# Extract data
raw_data = MinIOExtraction(minio_client, bucket_name, object_name).run()
# Process and enhance data
processed_data = unstructured.process_text(raw_data)
enhanced_data ={"text": processed_data.text})
# Store data
WeaviateStorage(weaviate_client, [{"text": enhanced_data, "source": processed_data.metadata["source"], "embedding": processed_data.embed()}]).run()
print("Data processing and storage completed.")
def main():
# Example object processing
process_data('your-bucket-name', 'your-object-name')
if __name__ == "__main__":
