Created
April 14, 2023 15:56
-
-
Save avelican/34a9e0edfb96617b3369682d4c190dda to your computer and use it in GitHub Desktop.
Accio (GPT powered text file search with PDF support)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import openai | |
import sys | |
from pdfminer.high_level import extract_text # pip install pdfminer.six | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
OPENAI_MODEL="text-davinci-003" | |
# OPENAI_MODEL="text-curie-001" | |
# OPENAI_MODEL="text-babbage-001" | |
# OPENAI_MODEL="text-ada-001" | |
work_dir = "output" | |
MAX_OUTPUT_LENGTH=100 | |
# def gpt(prompt, max_tokens=MAX_OUTPUT_LENGTH): | |
# response = openai.Completion.create( | |
# # model="gpt-3.5-turbo", # Alas, chat-only... same for gpt-4. TODO ? | |
# model=OPENAI_MODEL, | |
# prompt=prompt, | |
# temperature=0, | |
# max_tokens=max_tokens, | |
# top_p=1.0, | |
# frequency_penalty=0.0, | |
# presence_penalty=0.0 | |
# ) | |
# # todo catch error | |
# return response.choices[0].text.strip() | |
# # chat models | |
OPENAI_CHAT_MODEL="gpt-3.5-turbo" | |
# # OPENAI_CHAT_MODEL="gpt-4" | |
def gpt_chat_search_contains(text, question)->str: | |
temperature = 0.5 | |
max_tokens = 500 | |
messages = [ | |
{"role": "system", "content": 'You are a search engine. The user will provide a text excerpt, and a question, separated by "###". Your task: determine if the text contains the answer to the question or not. Respond ONLY with "Yes." or "No." Please say yes only if you are 100% sure, to minimize false positives.'}, | |
# {'role': 'user', 'content': f'Text:{text}\n\n\n###\n\n\nQuestion:{question}'} | |
{'role': 'user', 'content': f'Text:{text}\n\n\n###\n\n\nQuestion:{question} -- (To eliminate false positives, please only say "Yes." if the text contains the answer to the question!)'} | |
] | |
# TODO see if system message is enough, or if we need to adjust user message | |
response = openai.ChatCompletion.create( | |
model=OPENAI_CHAT_MODEL, | |
messages=messages, | |
temperature=temperature, | |
max_tokens=max_tokens | |
) | |
return response.choices[0].message["content"] | |
def gpt_chat_search_get_relevant_sentence(text, question)->str: | |
temperature = 0.5 | |
max_tokens = 500 | |
messages = [ | |
{"role": "system", "content": 'You are a search engine. The user will provide a text excerpt, and a question, separated by "###". Your task: Find the sentence which answers the user\'s question. Respond ONLY with this sentence. If there is no relevant sentence, respond with "ERROR_NOTFOUND".'}, | |
{'role': 'user', 'content': f'Text:{text}\n\n\n###\n\n\nQuestion:{question}'} | |
] | |
# TODO see if system message is enough, or if we need to adjust user message | |
response = openai.ChatCompletion.create( | |
model=OPENAI_CHAT_MODEL, | |
messages=messages, | |
temperature=temperature, | |
max_tokens=max_tokens | |
) | |
return response.choices[0].message["content"] | |
def count_tokens(text): | |
# who needs tiktoken amirite? | |
tokens = len(text) / 3 | |
return int(tokens*1.05) # margin of error | |
def split_text_ch(text, chunk_len): | |
res = [] | |
while(len(text) > 0): | |
res.append(text[0:chunk_len]) | |
text = text[chunk_len:] | |
return res | |
# def get_task_prompt(text, task): | |
# return f"""{text} | |
# ### | |
# Using the text above, perform the following task. | |
# Task:{task} | |
# Output:""" | |
# def get_task_prompt(text, task): | |
# return f"""{text} | |
# ### | |
# Using the text above, answer the following question. | |
# Question:{task} | |
# Answer:""" | |
def get_contains_question_prompt(text, task): | |
return f"""{text} | |
### | |
Does the text above contain the answer to the following question? Answer only with "Yes" or "No" (no punctuation). | |
Question:{task} | |
Answer:""" | |
# def process_chunk(text, task): | |
# prompt = get_task_prompt(text, task) | |
# return gpt(prompt) # todo catch GPT error | |
# def process(text, task): | |
# MAX_INPUT_LENGTH = 2049 - 500 # for cheap and fast GPT-3 models. Subtract 500 for the prompt and output (half-assing it here...) | |
# if(count_tokens(text) > MAX_INPUT_LENGTH): | |
# chunks = split_text_ch(text, MAX_INPUT_LENGTH) | |
# else: | |
# chunks = [text] | |
# output = '' | |
# for chunk in chunks: | |
# output += process_chunk(chunk, task) + '\n\n' | |
# return output | |
####### | |
def safe_join(base, *paths): | |
"""Join one or more path components intelligently.""" | |
new_path = os.path.join(base, *paths) | |
norm_new_path = os.path.normpath(new_path) | |
if os.path.commonprefix([base, norm_new_path]) != base: | |
raise ValueError("Attempted to access outside of working directory.") | |
return norm_new_path | |
def save(name, text): | |
# TODO: make a new folder for each run? Else prefix output w timestamp | |
output_data = text + "\n\n" | |
try: | |
filepath = safe_join(work_dir, name) | |
with open(filepath, "a") as f: | |
f.write(output_data) | |
return True | |
# return "Text appended successfully." | |
except Exception as e: | |
print("ERROR: Could not save file " + name + " because: " + str(e)) | |
return False | |
# return "Error: " + str(e) | |
#### | |
def find(text, question): | |
MAX_INPUT_LENGTH = 2049 - 500 # for cheap and fast GPT-3 models. Subtract 500 for the prompt and output (half-assing it here...) | |
if(count_tokens(text) > MAX_INPUT_LENGTH): | |
chunks = split_text_ch(text, MAX_INPUT_LENGTH) | |
else: | |
chunks = [text] | |
# output = '' | |
counter = 0 | |
for chunk in chunks: | |
counter += 1 | |
print(f'Searching chunk {counter}/{len(chunks)}') | |
response = find_chunk(chunk, task) | |
if response == 'Yes.': | |
# output += chunk + '\n\n\n###\n\n\n' | |
append_txt = chunk + '\n\n\n###\n\n\n' | |
save(filename_out, append_txt) | |
print('#######') | |
print(chunk) | |
print("----") | |
sentence = find_sentence(chunk, task) | |
print(sentence) | |
print('#######') | |
save(filename_out, sentence + "\n\n\n######\n\n\n") | |
needMore = input("Continue? ENTER means yes (don't type anything else)") | |
if needMore != '': | |
print('Quitting') | |
exit() | |
elif response == 'No.': | |
pass | |
else: | |
print(f'Unknown response: {response}') | |
# return output | |
def find_chunk(text, question): | |
# prompt = get_contains_question_prompt(text, task) | |
# return gpt(prompt) # todo catch GPT error | |
return gpt_chat_search_contains(text, question) # updated to use Chat API and system message | |
def find_sentence(text, question): | |
return gpt_chat_search_get_relevant_sentence(text, question) | |
#### | |
# todo move to main function | |
# if len(sys.argv) != 2: | |
# print("No file given") | |
# exit() | |
# filename = sys.argv[1] | |
filename = input('File: ') | |
file_path = 'input/' + filename | |
task = input('Question: ') | |
text = '' | |
if filename.lower().endswith('.pdf'): | |
print('Loading text from pdf...') | |
text = extract_text(file_path) | |
if not text: | |
print('Could not load pdf') | |
exit() | |
else: # assume plain text | |
with open(file_path) as f: | |
text = f.read() | |
if not text: | |
print("No text") | |
exit() | |
# output = find(text, task) | |
filename_out = filename + ".ANSWER.txt" | |
# save(filename_out, output) | |
find(text, task) # find updated to save incrementally |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment