import requests
import re
import urllib.request
from bs4 import BeautifulSoup
from collections import deque
from html.parser import HTMLParser
from urllib.parse import urlparse
import os
# Regex pattern to match a URL
HTTP_URL_PATTERN = r'^http[s]*://.+'
domain = "" # <- put your domain to be crawled
full_url = "" # <- put your domain to be crawled with https or http
# Create a class to parse the HTML and get the hyperlinks
class HyperlinkParser(HTMLParser):
def __init__(self):
# Create a list to store the hyperlinks
self.hyperlinks = []
# Override the HTMLParser's handle_starttag method to get the hyperlinks
def handle_starttag(self, tag, attrs):
attrs = dict(attrs)
# If the tag is an anchor tag and it has an href attribute, add the href attribute to the list of hyperlinks
if tag == "a" and "href" in attrs:
# Function to get the hyperlinks from a URL
def get_hyperlinks(url):
# Try to open the URL and read the HTML
# Open the URL and read the HTML
with urllib.request.urlopen(url) as response:
# If the response is not HTML, return an empty list
if not'Content-Type').startswith("text/html"):
return []
# Decode the HTML
html ='utf-8')
except Exception as e:
return []
# Create the HTML Parser and then Parse the HTML to get hyperlinks
parser = HyperlinkParser()
return parser.hyperlinks
# Function to get the hyperlinks from a URL that are within the same domain
def get_domain_hyperlinks(local_domain, url):
clean_links = []
for link in set(get_hyperlinks(url)):
clean_link = None
# If the link is a URL, check if it is within the same domain
if, link):
# Parse the URL and check if the domain is the same
url_obj = urlparse(link)
if url_obj.netloc == local_domain:
clean_link = link
# If the link is not a URL, check if it is a relative link
if link.startswith("/"):
link = link[1:]
elif link.startswith("#") or link.startswith("mailto:"):
clean_link = "https://" + local_domain + "/" + link
if clean_link is not None:
if clean_link.endswith("/"):
clean_link = clean_link[:-1]
# Return the list of hyperlinks that are within the same domain
return list(set(clean_links))
def crawl(url):
# Parse the URL and get the domain
local_domain = urlparse(url).netloc
# Create a queue to store the URLs to crawl
queue = deque([url])
# Create a set to store the URLs that have already been seen (no duplicates)
seen = set([url])
# Create a directory to store the text files
if not os.path.exists("text/"):
if not os.path.exists("text/"+local_domain+"/"):
os.mkdir("text/" + local_domain + "/")
# Create a directory to store the csv files
if not os.path.exists("processed"):
# While the queue is not empty, continue crawling
while queue:
# Get the next URL from the queue
url = queue.pop()
print(url) # for debugging and to see the progress
# Save text from the url to a <url>.txt file
with open('text/'+local_domain+'/'+url[8:].replace("/", "_") + ".txt", "w", encoding="UTF-8") as f:
# Get the text from the URL using BeautifulSoup
r = requests.get(url)
r.encoding = 'utf-8'
soup = BeautifulSoup(r.text, "html.parser")
# Get the text but remove the tags
text = soup.get_text()
# If the crawler gets to a page that requires JavaScript, it will stop the crawl
if ("You need to enable JavaScript to run this app." in text):
print("Unable to parse page " + url + " due to JavaScript being required")
# Otherwise, write the text to the file in the text directory
# Get the hyperlinks from the URL and add them to the queue
for link in get_domain_hyperlinks(local_domain, url):
if link not in seen:
"cells": [
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"def remove_newlines(serie):\n",
" serie = serie.str.replace('\\n', ' ')\n",
" serie = serie.str.replace('\\\\n', ' ')\n",
" serie = serie.str.replace(' ', ' ')\n",
" serie = serie.str.replace(' ', ' ')\n",
" return serie"
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import pandas as pd\n",
"domain = \"\"\n",
"full_url = \"\"\n",
"# Create a list to store the text files\n",
"# Get all the text files in the text directory\n",
"for file in os.listdir(\"text/\" + domain + \"/\"):\n",
" # Open the file and read the text\n",
" with open(\"text/\" + domain + \"/\" + file, \"r\", encoding=\"UTF-8\") as f:\n",
" text =\n",
" # Omit the first 11 lines and the last 4 lines, then replace -, _, and #update with spaces.\n",
" texts.append((file[11:-4].replace('-',' ').replace('_', ' ').replace('#update',''), text))\n",
"# Create a dataframe from the list of texts\n",
"df = pd.DataFrame(texts, columns = ['fname', 'text'])\n",
"# Set the text column to be the raw text with the newlines removed\n",
"df['text'] = df.fname + \". \" + remove_newlines(df.text)\n",
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install tiktoken matplotlib"
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import tiktoken\n",
"import pandas as pd\n",
"# Load the cl100k_base tokenizer which is designed to work with the ada-002 model\n",
"tokenizer = tiktoken.get_encoding(\"cl100k_base\")\n",
"df = pd.read_csv('processed/scraped.csv', index_col=0)\n",
"df.columns = ['title', 'text']\n",
"# Tokenize the text and save the number of tokens to a new column\n",
"df['n_tokens'] = df.text.apply(lambda x: len(tokenizer.encode(x)))\n",
"# Visualize the distribution of the number of tokens per row using a histogram\n",
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"max_tokens = 500\n",
"# Function to split the text into chunks of a maximum number of tokens\n",
"def split_into_many(text, max_tokens = max_tokens):\n",
" # Split the text into sentences\n",
" sentences = text.split('. ')\n",
" # Get the number of tokens for each sentence\n",
" n_tokens = [len(tokenizer.encode(\" \" + sentence)) for sentence in sentences]\n",
" \n",
" chunks = []\n",
" tokens_so_far = 0\n",
" chunk = []\n",
" # Loop through the sentences and tokens joined together in a tuple\n",
" for sentence, token in zip(sentences, n_tokens):\n",
" # If the number of tokens so far plus the number of tokens in the current sentence is greater \n",
" # than the max number of tokens, then add the chunk to the list of chunks and reset\n",
" # the chunk and tokens so far\n",
" if tokens_so_far + token > max_tokens:\n",
" chunks.append(\". \".join(chunk) + \".\")\n",
" chunk = []\n",
" tokens_so_far = 0\n",
" # If the number of tokens in the current sentence is greater than the max number of \n",
" # tokens, go to the next sentence\n",
" if token > max_tokens:\n",
" continue\n",
" # Otherwise, add the sentence to the chunk and add the number of tokens to the total\n",
" chunk.append(sentence)\n",
" tokens_so_far += token + 1\n",
" return chunks\n",
" \n",
"shortened = []\n",
"# Loop through the dataframe\n",
"for row in df.iterrows():\n",
" # If the text is None, go to the next row\n",
" if row[1]['text'] is None:\n",
" continue\n",
" # If the number of tokens is greater than the max number of tokens, split the text into chunks\n",
" if row[1]['n_tokens'] > max_tokens:\n",
" shortened += split_into_many(row[1]['text'])\n",
" \n",
" # Otherwise, add the text to the list of shortened texts\n",
" else:\n",
" shortened.append( row[1]['text'] )"
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df = pd.DataFrame(shortened, columns = ['text'])\n",
"df['n_tokens'] = df.text.apply(lambda x: len(tokenizer.encode(x)))\n",
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import openai\n",
"df['embeddings'] = df.text.apply(lambda x: openai.Embedding.create(input=x, engine='text-embedding-ada-002')['data'][0]['embedding'])\n",
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install numpy"
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import pandas as pd\n",
"df=pd.read_csv('processed/embeddings.csv', index_col=0)\n",
"df['embeddings'] = df['embeddings'].apply(eval).apply(np.array)\n",
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install plotly scipy scikit-learn -i --trusted-host"
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"from openai.embeddings_utils import distances_from_embeddings\n",
"def create_context(question, df, max_len=1800):\n",
" \"\"\"\n",
" Create a context for a question by finding the most similar context from the dataframe\n",
" \"\"\"\n",
" # Get the embeddings for the question\n",
" q_embeddings = openai.Embedding.create(input=question, engine='text-embedding-ada-002')['data'][0]['embedding']\n",
" # Get the distances from the embeddings\n",
" df['distances'] = distances_from_embeddings(q_embeddings, df['embeddings'].values, distance_metric='cosine')\n",
" returns = []\n",
" cur_len = 0\n",
" # Sort by distance and add the text to the context until the context is too long\n",
" for i, row in df.sort_values('distances', ascending=True).iterrows():\n",
" \n",
" # Add the length of the text to the current length\n",
" cur_len += row['n_tokens'] + 4\n",
" \n",
" # If the context is too long, break\n",
" if cur_len > max_len:\n",
" break\n",
" \n",
" # Else add it to the text that is being returned\n",
" returns.append(row[\"text\"])\n",
" # Return the context\n",
" return \"\\n\\n###\\n\\n\".join(returns)"
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"df = pd.read_csv('processed/embeddings.csv', index_col=0)\n",
"df['embeddings'] = df['embeddings'].apply(eval).apply(np.array)\n",
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import openai\n",
"def answer_question(\n",
" df,\n",
" question=\"Kubernetes是什么?\",\n",
" max_len=1800,\n",
" debug=False,\n",
" max_tokens=150,\n",
" stop_sequence=None\n",
" \"\"\"\n",
" 根据 dataframe texts 中最相似的上下文回答问题\n",
" \"\"\"\n",
" context = create_context(\n",
" question,\n",
" df,\n",
" max_len=max_len,\n",
" )\n",
" if debug:\n",
" print(\"Context:\\n\" + context)\n",
" print(\"\\n\\n\")\n",
" try:\n",
" response = openai.ChatCompletion.create(\n",
" model=\"gpt-3.5-turbo\",\n",
" temperature=0,\n",
" max_tokens=max_tokens,\n",
" top_p=1,\n",
" frequency_penalty=0,\n",
" presence_penalty=0,\n",
" stop=stop_sequence,\n",
" messages=[\n",
" {\"role\": \"system\", \"content\": \"You are youdianzhishi's official knowledge base AI robot assistant\"},\n",
" {\"role\": \"user\", \"content\": f\"Answer the question based on the context below with Chinese, and if the question can't be answered based on the context, say \\\"I don't know\\\"\\n\\nContext: {context}\\n\\n---\\n\\nQuestion: {question}\\nAnswer:\"},\n",
" ]\n",
" )\n",
" result = ''\n",
" for choice in response.choices:\n",
" result += choice.message.content\n",
" return result\n",
" except Exception as e:\n",
" print(e)\n",
" return \"\"\n",
"answer_question(df, question=\"如何使用Prometheus Operator?\", debug=True)\n"
