See https://loppsided.blog/posts/2024-12-26-a-rag-to-chat-with-your-favorite-blogger/
Last active
January 2, 2025 22:32
-
-
Save slopp/278832dac5a994485dff7d53ef4348bf to your computer and use it in GitHub Desktop.
Code for creating a RAG chatbot based on theradavist
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
# from langchain_community.vectorstores.chroma import Chroma | |
from langchain_chroma import Chroma | |
from langchain_nvidia_ai_endpoints import NVIDIAEmbeddings | |
from langchain_nvidia_ai_endpoints import ChatNVIDIA | |
from langchain_community.document_loaders import ( | |
DirectoryLoader, | |
UnstructuredMarkdownLoader, | |
) | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.chains import ConversationalRetrievalChain | |
from langchain.memory import ConversationBufferMemory | |
from pathlib import Path | |
from pprint import pprint | |
import argparse | |
# Sample invocation | |
# python chat_with_radavist.py --text "what is the best material to build a hardtail out of?" | |
parser = argparse.ArgumentParser(description="Chat with the radavist.") | |
parser.add_argument("--rebuild", action="store_true", help="rebuild embeddings or not") | |
parser.add_argument("--text", type=str, help="your question") | |
args = parser.parse_args() | |
API_KEY = "your-api-key" # get one here: https://www.nvidia.com/en-us/ai/ | |
# code taken from https://amaarora.github.io/posts/2023-07-27_Document_Question_Answering_with_LangChain.html | |
# with adjustments made to use Nvidia NIMs instead of OpenAI API | |
# load or create vector database | |
persist_directory = "./db/" | |
embeddings = NVIDIAEmbeddings( | |
model="nvidia/llama-3.2-nv-embedqa-1b-v2", api_key=API_KEY | |
) | |
if args.rebuild: | |
print("Creating document embeddings") | |
loader = DirectoryLoader( | |
"./radavist_posts/", glob="**/*.md", loader_cls=UnstructuredMarkdownLoader | |
) | |
docs = loader.load() | |
print(f"Found {len(docs)} docs to embed") | |
text_splitter = CharacterTextSplitter(chunk_size=1024, chunk_overlap=128) | |
chunks = text_splitter.split_documents(docs) | |
print(f"Found {len(chunks)} chunks to embed via API calls!") | |
db = Chroma.from_documents( | |
documents=chunks, embedding=embeddings, persist_directory=persist_directory | |
) | |
else: | |
print("Loading existing vector database of Radavist embeddings") | |
db = Chroma(persist_directory=persist_directory, embedding_function=embeddings) | |
print(f"Answers based on RAG with {db._collection.count()} potential embeddings") | |
memory = ConversationBufferMemory( | |
memory_key="chat_history", output_key="answer", return_messages=False | |
) | |
# create QA chain using `langchain`, database is used as vector store retriever to find "context" (using similarity search) | |
# this code uses an outdated form of the langchain API, so will need to be updated in the future | |
qa = ConversationalRetrievalChain.from_llm( | |
llm=ChatNVIDIA( | |
api_key=API_KEY, | |
model="meta/llama-3.3-70b-instruct", | |
temperature=0.2, | |
top_p=0.7, | |
max_tokens=1024, | |
), | |
chain_type="stuff", | |
retriever=db.as_retriever(), | |
get_chat_history=lambda o: o, | |
memory=memory, | |
return_generated_question=True, | |
verbose=False, | |
return_source_documents=True, | |
) | |
response = qa( | |
{"question": f"{args.text}"}, | |
) | |
pprint(response) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
from bs4 import BeautifulSoup | |
import re | |
from concurrent.futures import ThreadPoolExecutor | |
from typing import List | |
from pathlib import Path | |
import bs4 | |
import random | |
# Define the base URL and output directory | |
BASE_URL = "https://theradavist.com" | |
OUTPUT_DIR = "radavist_posts" | |
# Create the output directory if it doesn't exist | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
def get_soup(url) -> BeautifulSoup | str: | |
"""Fetches a URL and parses it into BeautifulSoup.""" | |
response = requests.get(url) | |
response.raise_for_status() | |
try: | |
soup = BeautifulSoup(response.text, "html.parser") | |
except bs4.builder.ParserRejectedMarkup: | |
return "error" | |
return soup | |
def clean_filename(title): | |
"""Cleans and formats the title for a valid filename.""" | |
return re.sub(r'[\\/*?:"<>|]', "_", title) | |
def save_post_to_markdown(title, content, slug): | |
"""Saves the blog post as a markdown file.""" | |
filename = f"{clean_filename(slug)}.md" | |
filepath = os.path.join(OUTPUT_DIR, filename) | |
with open(filepath, "w", encoding="utf-8") as file: | |
file.write(f"# {title}\n\n") | |
file.write(content) | |
print(f"Saved: {filepath}") | |
def check_exists(slug): | |
filename = f"{clean_filename(slug)}.md" | |
filepath = Path(OUTPUT_DIR, filename) | |
if filepath.is_file(): | |
return True | |
def scrape_post(post_url): | |
"""Scrapes an individual blog post.""" | |
slug = post_url.rstrip("/").split("/")[-1] | |
if check_exists(slug): | |
print(f"Skipping {slug} since it was already saved") | |
return slug | |
if not "review" in slug.split("-"): | |
print(f"Skipping: {slug} since not a review") | |
return slug | |
try: | |
soup = get_soup(post_url) | |
if soup == "error": | |
print(f"Skipping {slug} due to soup html parsing error") | |
return "skipped" | |
except requests.exceptions.HTTPError: | |
print(f"Skipping {slug} due to HTTP error") | |
return "skipped" | |
try: | |
title = soup.find("h1").get_text(strip=True) | |
except AttributeError: | |
print(f"Skipping {slug} since it has no title") | |
return "skipped" | |
content_section = soup.find("section", class_="article-content") | |
if not content_section: | |
print(f"Skipping: {post_url} (content not found)") | |
return "skipped" | |
content = "\n\n".join( | |
[p.get_text(strip=True) for p in content_section.find_all("p")] | |
) | |
save_post_to_markdown(title, content, slug) | |
return slug | |
def get_radavist_sitemap(year) -> List[str]: | |
sitemap_url = f"https://theradavist.com/sitemap-posttype-post.{year}.xml" | |
response = requests.get(sitemap_url) | |
if response.status_code == 200: | |
soup = BeautifulSoup(response.content, "xml") | |
urls = [loc.get_text() for loc in soup.find_all("loc")] | |
print(f"Found example url for year {year}: {urls[0]}") | |
return urls | |
else: | |
print(f"Sitemap not found at {sitemap_url}") | |
return [] | |
def get_urls(): | |
print("Getting URLs from sitemap") | |
with ThreadPoolExecutor(max_workers=16) as executor: | |
urls = list(executor.map(get_radavist_sitemap, range(2024, 2025))) | |
for url in urls: | |
url | |
print("Saving articles from URLs for each year") | |
for year in urls: | |
year = [url for url in year if url.lower().endswith("/")] | |
print(f"Got {len(year)} urls to parse") | |
random.shuffle(year) | |
with ThreadPoolExecutor(max_workers=4) as executor: | |
posts = list(executor.map(scrape_post, year)) | |
for post in posts: | |
post # force futures to complete | |
# scrape_post("https://theradavist.com/top-ten-bike-reviews-2024/") | |
# scrape_post("https://theradavist.com/moots-womble-29er-review/") | |
# scrape_post("https://theradavist.com/radavist-mosaic-gt2x-review/") | |
get_urls() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment