Created
February 2, 2025 01:00
-
-
Save razvanalex/440468d8a435b9842e025d23a80ca536 to your computer and use it in GitHub Desktop.
Open WebUI search tool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
title: Web Search using SearXNG and Scrape first N Pages | |
author: constLiakos with enhancements by justinh-rahb and ther3zz | |
funding_url: https://github.com/open-webui | |
version: 0.1.12 | |
license: MIT | |
requirements: libmagic,pdfminer.six | |
""" | |
# pyright: basic | |
import os | |
os.system("apt update && apt install libmagic1") | |
import concurrent.futures | |
import json | |
import re | |
import time | |
import unicodedata | |
from typing import Any, Callable | |
from urllib.parse import urlparse | |
import magic | |
import requests | |
from bs4 import BeautifulSoup | |
from langchain.document_loaders.parsers import BS4HTMLParser, PDFMinerParser | |
from langchain.document_loaders.parsers.generic import MimeTypeBasedParser | |
from langchain.document_loaders.parsers.txt import TextParser | |
from langchain_community.document_loaders import Blob | |
from pydantic import BaseModel, Field | |
HANDLERS = { | |
"application/pdf": PDFMinerParser(), | |
"text/plain": TextParser(), | |
"text/html": BS4HTMLParser(), | |
} | |
MIMETYPE_BASED_PARSER = MimeTypeBasedParser( | |
handlers=HANDLERS, | |
fallback_parser=None, | |
) | |
class HelpFunctions: | |
def __init__(self): | |
pass | |
def get_base_url(self, url): | |
parsed_url = urlparse(url) | |
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" | |
return base_url | |
def generate_excerpt(self, content, max_length=200): | |
return content[:max_length] + "..." if len(content) > max_length else content | |
def format_text(self, original_text): | |
soup = BeautifulSoup(original_text, "html.parser") | |
formatted_text = soup.get_text(separator=" ", strip=True) | |
formatted_text = unicodedata.normalize("NFKC", formatted_text) | |
formatted_text = re.sub(r"\s+", " ", formatted_text) | |
formatted_text = formatted_text.strip() | |
formatted_text = self.remove_emojis(formatted_text) | |
return formatted_text | |
def remove_emojis(self, text): | |
return "".join(c for c in text if not unicodedata.category(c).startswith("So")) | |
def process_search_result(self, result, valves): | |
title_site = self.remove_emojis(result["title"]) | |
url_site = result["url"] | |
snippet = result.get("content", "") | |
# Check if the website is in the ignored list, but only if IGNORED_WEBSITES is not empty | |
if valves.IGNORED_WEBSITES: | |
base_url = self.get_base_url(url_site) | |
if any( | |
ignored_site.strip() in base_url | |
for ignored_site in valves.IGNORED_WEBSITES.split(",") | |
): | |
return None | |
try: | |
response_site = requests.get(url_site, timeout=20) | |
response_site.raise_for_status() | |
html_content = response_site.text | |
soup = BeautifulSoup(html_content, "html.parser") | |
content_site = self.format_text(soup.get_text(separator=" ", strip=True)) | |
truncated_content = self.truncate_to_n_words( | |
content_site, valves.PAGE_CONTENT_WORDS_LIMIT | |
) | |
return { | |
"title": title_site, | |
"url": url_site, | |
"content": truncated_content, | |
"snippet": self.remove_emojis(snippet), | |
} | |
except requests.exceptions.RequestException as e: | |
return None | |
def truncate_to_n_words(self, text, token_limit): | |
tokens = text.split() | |
truncated_tokens = tokens[:token_limit] | |
return " ".join(truncated_tokens) | |
class EventEmitter: | |
def __init__(self, event_emitter: Callable[[dict], Any] | None = None): | |
self.event_emitter = event_emitter | |
async def emit( | |
self, | |
description="Unknown State", | |
status="in_progress", | |
searchQuery=None, | |
urls=None, | |
action=None, | |
done=False, | |
): | |
if self.event_emitter: | |
await self.event_emitter( | |
{ | |
"type": "status", | |
"data": { | |
"action": action, | |
"status": status, | |
"description": description, | |
"query": searchQuery, | |
"urls": urls, | |
"done": done, | |
}, | |
} | |
) | |
class Tools: | |
class Valves(BaseModel): | |
SEARXNG_ENGINE_API_BASE_URL: str = Field( | |
default="https://example.com/search", | |
description="The base URL for Search Engine", | |
) | |
IGNORED_WEBSITES: str = Field( | |
default="", | |
description="Comma-separated list of websites to ignore", | |
) | |
RETURNED_SCRAPPED_PAGES_NO: int = Field( | |
default=3, | |
description="The number of Search Engine Results to Parse", | |
) | |
SCRAPPED_PAGES_NO: int = Field( | |
default=5, | |
description="Total pages scapped. Ideally greater than one of the returned pages", | |
) | |
PAGE_CONTENT_WORDS_LIMIT: int = Field( | |
default=5000, | |
description="Limit words content for each page.", | |
) | |
CITATION_LINKS: bool = Field( | |
default=False, | |
description="If True, send custom citations with links", | |
) | |
def __init__(self): | |
self.valves = self.Valves() | |
self.headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" | |
} | |
async def search_web( | |
self, | |
query: str, | |
__event_emitter__: Callable[[dict], Any] | None = None, | |
) -> str: | |
""" | |
Search the web and get the content of the relevant pages. Search for unknown knowledge, news, info, public contact info, weather, etc. | |
:params query: Web Query used in search engine. | |
:return: The content of the pages in json format. | |
""" | |
start = time.time() | |
functions = HelpFunctions() | |
emitter = EventEmitter(__event_emitter__) | |
await emitter.emit(f"Initiating web search for: {query}") | |
search_engine_url = self.valves.SEARXNG_ENGINE_API_BASE_URL | |
# Ensure RETURNED_SCRAPPED_PAGES_NO does not exceed SCRAPPED_PAGES_NO | |
if self.valves.RETURNED_SCRAPPED_PAGES_NO > self.valves.SCRAPPED_PAGES_NO: | |
self.valves.RETURNED_SCRAPPED_PAGES_NO = self.valves.SCRAPPED_PAGES_NO | |
params = { | |
"q": query, | |
"format": "json", | |
"number_of_results": self.valves.RETURNED_SCRAPPED_PAGES_NO, | |
} | |
try: | |
duration = time.time() - start | |
await emitter.emit( | |
f"Sending request to search engine. Spent {duration:.1f} seconds." | |
) | |
resp = requests.get( | |
search_engine_url, params=params, headers=self.headers, timeout=120 | |
) | |
resp.raise_for_status() | |
data = resp.json() | |
results = data.get("results", []) | |
limited_results = results[: self.valves.SCRAPPED_PAGES_NO] | |
duration = time.time() - start | |
await emitter.emit( | |
f"Retrieved {len(limited_results)} search results. Spent {duration:.1f} seconds." | |
) | |
except requests.exceptions.RequestException as e: | |
await emitter.emit( | |
status="error", | |
description=f"Error during search: {str(e)}", | |
done=True, | |
) | |
return json.dumps({"error": str(e)}) | |
results_json = [] | |
urls = [] | |
if limited_results: | |
duration = time.time() - start | |
await emitter.emit( | |
f"Processing search results. Spent {duration:.1f} seconds." | |
) | |
with concurrent.futures.ProcessPoolExecutor() as executor: | |
futures = [ | |
executor.submit( | |
functions.process_search_result, result, self.valves | |
) | |
for result in limited_results | |
] | |
for future in concurrent.futures.as_completed(futures): | |
result_json = future.result() | |
if result_json: | |
try: | |
json.dumps(result_json) | |
results_json.append(result_json) | |
except (TypeError, ValueError): | |
continue | |
if len(results_json) >= self.valves.RETURNED_SCRAPPED_PAGES_NO: | |
break | |
results_json = results_json[: self.valves.RETURNED_SCRAPPED_PAGES_NO] | |
for result in results_json: | |
if self.valves.CITATION_LINKS and __event_emitter__: | |
await __event_emitter__( | |
{ | |
"type": "citation", | |
"data": { | |
"document": [result["content"]], | |
"metadata": [{"source": result["url"]}], | |
"source": {"name": result["title"]}, | |
}, | |
} | |
) | |
urls.append(result["url"]) | |
duration = time.time() - start | |
await emitter.emit( | |
status="complete", | |
description=f"Retrieved content from {len(results_json)} pages in {duration:.1f} seconds.", | |
searchQuery=query, | |
action="web_search", | |
urls=urls, | |
done=True, | |
) | |
return json.dumps(results_json, ensure_ascii=False) | |
async def get_website( | |
self, url: str, __event_emitter__: Callable[[dict], Any] | None = None | |
) -> str: | |
""" | |
Web scrape the website provided and get the content of it. | |
:params url: The URL of the website. | |
:return: The content of the website in json format. | |
""" | |
start = time.time() | |
functions = HelpFunctions() | |
emitter = EventEmitter(__event_emitter__) | |
await emitter.emit(f"Fetching content from URL: {url}") | |
results_json = [] | |
try: | |
response_site = requests.get(url, headers=self.headers, timeout=120) | |
response_site.raise_for_status() | |
data = response_site.content | |
mime = magic.Magic(mime=True) | |
mime_type = mime.from_buffer(data) | |
blob = Blob.from_data( | |
data=data, | |
mime_type=mime_type, | |
) | |
parser = HANDLERS[mime_type] | |
documents = parser.parse(blob=blob) | |
title_site = url | |
content_site = documents[0].page_content | |
truncated_content = functions.truncate_to_n_words( | |
content_site, self.valves.PAGE_CONTENT_WORDS_LIMIT | |
) | |
result_site = { | |
"title": url, | |
"url": url, | |
"content": truncated_content, | |
"excerpt": functions.generate_excerpt(content_site), | |
} | |
results_json.append(result_site) | |
if self.valves.CITATION_LINKS and __event_emitter__: | |
await __event_emitter__( | |
{ | |
"type": "citation", | |
"data": { | |
"document": [truncated_content], | |
"metadata": [{"source": url}], | |
"source": {"name": title_site}, | |
"distance": 0, | |
}, | |
} | |
) | |
duration = time.time() - start | |
await emitter.emit( | |
status="complete", | |
description=f"Website content retrieved and processed successfully in {duration:.1f} seconds", | |
done=True, | |
) | |
except requests.exceptions.RequestException as e: | |
results_json.append( | |
{ | |
"url": url, | |
"content": f"Failed to retrieve the page. Error: {str(e)}", | |
} | |
) | |
await emitter.emit( | |
status="error", | |
description=f"Error fetching website content: {str(e)}", | |
done=True, | |
) | |
return json.dumps(results_json, ensure_ascii=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment