Skip to content

Instantly share code, notes, and snippets.

@skaag
Created June 25, 2024 06:10
Show Gist options
  • Save skaag/3cc16d6addb9fb3bfc71dce052f1d6ab to your computer and use it in GitHub Desktop.
Save skaag/3cc16d6addb9fb3bfc71dce052f1d6ab to your computer and use it in GitHub Desktop.
An example LangFlow "Tool" that fetches data from a URL
import importlib
from langchain.tools import StructuredTool
from pydantic.v1 import BaseModel, Field
import requests
# Define the input schema
class JobDataInput(BaseModel):
job_id: int = Field(description="ID of the job to retrieve")
# Define the function to load job data
def load_job_data(job_id: int):
base_url = 'http://localhost:6666/get_job'
params = {'job_id': job_id}
response = requests.get(base_url, params=params)
if response.status_code == 200:
full_url = response.url
loader = WebBaseLoader(web_paths=[full_url], encoding="utf-8")
docs = loader.load()
data = [Data(text=doc.page_content, **doc.metadata) for doc in docs]
return data
else:
raise Exception(f"Failed to retrieve data. Status code: {response.status_code}")
# Create the StructuredTool
job_data_tool = StructuredTool.from_function(
func=load_job_data,
args_schema=JobDataInput,
name="LoadJobData",
description="Load job data using a job_id",
return_direct=False,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment