Skip to content

Instantly share code, notes, and snippets.

@Cdaprod
Created March 19, 2024 19:12
Show Gist options
  • Save Cdaprod/f4e4556523e6716d1ae9e8ea8ef0ee8c to your computer and use it in GitHub Desktop.
Save Cdaprod/f4e4556523e6716d1ae9e8ea8ef0ee8c to your computer and use it in GitHub Desktop.
This script integrates the LCEL framework for data enhancement and applies custom runnables for MinIO and Weaviate operations, following the principles outlined in the provided pseudo code. Note that the actual implementation may require adjustments based on your environment and the specific functionalities of the `unstructured` library, which s…
from minio import Minio
from weaviate import Client
import io
import json
import time
from concurrent.futures import ThreadPoolExecutor
from langchain_core.runnables import Runnable, Chain
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.llms import ChatOpenAI, OpenAI
from langchain_anthropic import ChatAnthropic
import unstructured
# Define custom runnable for MinIO data extraction
class MinIOExtraction(Runnable):
def __init__(self, client, bucket_name, object_name):
self.client = client
self.bucket_name = bucket_name
self.object_name = object_name
def run(self):
response = self.client.get_object(self.bucket_name, self.object_name)
return response.read().decode("utf-8")
# Define custom runnable for Weaviate data storage
class WeaviateStorage(Runnable):
def __init__(self, client, data, class_name="Document"):
self.client = client
self.data = data
self.class_name = class_name
def run(self):
for item in self.data:
self.client.data_object.create(item, class_name=self.class_name)
return self.data
# Initialize MinIO and Weaviate clients
minio_client = Minio('MINIO_ENDPOINT', access_key='YOUR_ACCESS_KEY', secret_key='YOUR_SECRET_KEY', secure=True)
weaviate_client = Client("http://WEAVIATE_ENDPOINT")
# Setup LangChain components
prompt = ChatPromptTemplate.from_template("Generate an enhanced version of the following text:\n\n{text}")
output_parser = StrOutputParser()
chat_openai = ChatOpenAI(model="gpt-3.5-turbo")
openai = OpenAI(model="gpt-3.5-turbo-instruct")
anthropic = ChatAnthropic(model="claude-2")
# Compose the chain
enhancement_chain = Chain([
prompt,
chat_openai.with_fallbacks([anthropic]),
output_parser
])
def process_data(bucket_name, object_name):
# Extract data
raw_data = MinIOExtraction(minio_client, bucket_name, object_name).run()
# Process and enhance data
processed_data = unstructured.process_text(raw_data)
enhanced_data = enhancement_chain.run({"text": processed_data.text})
# Store data
WeaviateStorage(weaviate_client, [{"text": enhanced_data, "source": processed_data.metadata["source"], "embedding": processed_data.embed()}]).run()
print("Data processing and storage completed.")
def main():
# Example object processing
process_data('your-bucket-name', 'your-object-name')
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment