Created
March 19, 2024 19:12
-
-
Save Cdaprod/f4e4556523e6716d1ae9e8ea8ef0ee8c to your computer and use it in GitHub Desktop.
This script integrates the LCEL framework for data enhancement and applies custom runnables for MinIO and Weaviate operations, following the principles outlined in the provided pseudo code. Note that the actual implementation may require adjustments based on your environment and the specific functionalities of the `unstructured` library, which s…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from minio import Minio | |
from weaviate import Client | |
import io | |
import json | |
import time | |
from concurrent.futures import ThreadPoolExecutor | |
from langchain_core.runnables import Runnable, Chain | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_core.llms import ChatOpenAI, OpenAI | |
from langchain_anthropic import ChatAnthropic | |
import unstructured | |
# Define custom runnable for MinIO data extraction | |
class MinIOExtraction(Runnable): | |
def __init__(self, client, bucket_name, object_name): | |
self.client = client | |
self.bucket_name = bucket_name | |
self.object_name = object_name | |
def run(self): | |
response = self.client.get_object(self.bucket_name, self.object_name) | |
return response.read().decode("utf-8") | |
# Define custom runnable for Weaviate data storage | |
class WeaviateStorage(Runnable): | |
def __init__(self, client, data, class_name="Document"): | |
self.client = client | |
self.data = data | |
self.class_name = class_name | |
def run(self): | |
for item in self.data: | |
self.client.data_object.create(item, class_name=self.class_name) | |
return self.data | |
# Initialize MinIO and Weaviate clients | |
minio_client = Minio('MINIO_ENDPOINT', access_key='YOUR_ACCESS_KEY', secret_key='YOUR_SECRET_KEY', secure=True) | |
weaviate_client = Client("http://WEAVIATE_ENDPOINT") | |
# Setup LangChain components | |
prompt = ChatPromptTemplate.from_template("Generate an enhanced version of the following text:\n\n{text}") | |
output_parser = StrOutputParser() | |
chat_openai = ChatOpenAI(model="gpt-3.5-turbo") | |
openai = OpenAI(model="gpt-3.5-turbo-instruct") | |
anthropic = ChatAnthropic(model="claude-2") | |
# Compose the chain | |
enhancement_chain = Chain([ | |
prompt, | |
chat_openai.with_fallbacks([anthropic]), | |
output_parser | |
]) | |
def process_data(bucket_name, object_name): | |
# Extract data | |
raw_data = MinIOExtraction(minio_client, bucket_name, object_name).run() | |
# Process and enhance data | |
processed_data = unstructured.process_text(raw_data) | |
enhanced_data = enhancement_chain.run({"text": processed_data.text}) | |
# Store data | |
WeaviateStorage(weaviate_client, [{"text": enhanced_data, "source": processed_data.metadata["source"], "embedding": processed_data.embed()}]).run() | |
print("Data processing and storage completed.") | |
def main(): | |
# Example object processing | |
process_data('your-bucket-name', 'your-object-name') | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment