Skip to content

Instantly share code, notes, and snippets.

@avelican
Created April 14, 2023 15:56
Show Gist options
  • Save avelican/34a9e0edfb96617b3369682d4c190dda to your computer and use it in GitHub Desktop.
Save avelican/34a9e0edfb96617b3369682d4c190dda to your computer and use it in GitHub Desktop.
Accio (GPT powered text file search with PDF support)
import os
import openai
import sys
from pdfminer.high_level import extract_text # pip install pdfminer.six
openai.api_key = os.getenv("OPENAI_API_KEY")
OPENAI_MODEL="text-davinci-003"
# OPENAI_MODEL="text-curie-001"
# OPENAI_MODEL="text-babbage-001"
# OPENAI_MODEL="text-ada-001"
work_dir = "output"
MAX_OUTPUT_LENGTH=100
# def gpt(prompt, max_tokens=MAX_OUTPUT_LENGTH):
# response = openai.Completion.create(
# # model="gpt-3.5-turbo", # Alas, chat-only... same for gpt-4. TODO ?
# model=OPENAI_MODEL,
# prompt=prompt,
# temperature=0,
# max_tokens=max_tokens,
# top_p=1.0,
# frequency_penalty=0.0,
# presence_penalty=0.0
# )
# # todo catch error
# return response.choices[0].text.strip()
# # chat models
OPENAI_CHAT_MODEL="gpt-3.5-turbo"
# # OPENAI_CHAT_MODEL="gpt-4"
def gpt_chat_search_contains(text, question)->str:
temperature = 0.5
max_tokens = 500
messages = [
{"role": "system", "content": 'You are a search engine. The user will provide a text excerpt, and a question, separated by "###". Your task: determine if the text contains the answer to the question or not. Respond ONLY with "Yes." or "No." Please say yes only if you are 100% sure, to minimize false positives.'},
# {'role': 'user', 'content': f'Text:{text}\n\n\n###\n\n\nQuestion:{question}'}
{'role': 'user', 'content': f'Text:{text}\n\n\n###\n\n\nQuestion:{question} -- (To eliminate false positives, please only say "Yes." if the text contains the answer to the question!)'}
]
# TODO see if system message is enough, or if we need to adjust user message
response = openai.ChatCompletion.create(
model=OPENAI_CHAT_MODEL,
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
return response.choices[0].message["content"]
def gpt_chat_search_get_relevant_sentence(text, question)->str:
temperature = 0.5
max_tokens = 500
messages = [
{"role": "system", "content": 'You are a search engine. The user will provide a text excerpt, and a question, separated by "###". Your task: Find the sentence which answers the user\'s question. Respond ONLY with this sentence. If there is no relevant sentence, respond with "ERROR_NOTFOUND".'},
{'role': 'user', 'content': f'Text:{text}\n\n\n###\n\n\nQuestion:{question}'}
]
# TODO see if system message is enough, or if we need to adjust user message
response = openai.ChatCompletion.create(
model=OPENAI_CHAT_MODEL,
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
return response.choices[0].message["content"]
def count_tokens(text):
# who needs tiktoken amirite?
tokens = len(text) / 3
return int(tokens*1.05) # margin of error
def split_text_ch(text, chunk_len):
res = []
while(len(text) > 0):
res.append(text[0:chunk_len])
text = text[chunk_len:]
return res
# def get_task_prompt(text, task):
# return f"""{text}
# ###
# Using the text above, perform the following task.
# Task:{task}
# Output:"""
# def get_task_prompt(text, task):
# return f"""{text}
# ###
# Using the text above, answer the following question.
# Question:{task}
# Answer:"""
def get_contains_question_prompt(text, task):
return f"""{text}
###
Does the text above contain the answer to the following question? Answer only with "Yes" or "No" (no punctuation).
Question:{task}
Answer:"""
# def process_chunk(text, task):
# prompt = get_task_prompt(text, task)
# return gpt(prompt) # todo catch GPT error
# def process(text, task):
# MAX_INPUT_LENGTH = 2049 - 500 # for cheap and fast GPT-3 models. Subtract 500 for the prompt and output (half-assing it here...)
# if(count_tokens(text) > MAX_INPUT_LENGTH):
# chunks = split_text_ch(text, MAX_INPUT_LENGTH)
# else:
# chunks = [text]
# output = ''
# for chunk in chunks:
# output += process_chunk(chunk, task) + '\n\n'
# return output
#######
def safe_join(base, *paths):
"""Join one or more path components intelligently."""
new_path = os.path.join(base, *paths)
norm_new_path = os.path.normpath(new_path)
if os.path.commonprefix([base, norm_new_path]) != base:
raise ValueError("Attempted to access outside of working directory.")
return norm_new_path
def save(name, text):
# TODO: make a new folder for each run? Else prefix output w timestamp
output_data = text + "\n\n"
try:
filepath = safe_join(work_dir, name)
with open(filepath, "a") as f:
f.write(output_data)
return True
# return "Text appended successfully."
except Exception as e:
print("ERROR: Could not save file " + name + " because: " + str(e))
return False
# return "Error: " + str(e)
####
def find(text, question):
MAX_INPUT_LENGTH = 2049 - 500 # for cheap and fast GPT-3 models. Subtract 500 for the prompt and output (half-assing it here...)
if(count_tokens(text) > MAX_INPUT_LENGTH):
chunks = split_text_ch(text, MAX_INPUT_LENGTH)
else:
chunks = [text]
# output = ''
counter = 0
for chunk in chunks:
counter += 1
print(f'Searching chunk {counter}/{len(chunks)}')
response = find_chunk(chunk, task)
if response == 'Yes.':
# output += chunk + '\n\n\n###\n\n\n'
append_txt = chunk + '\n\n\n###\n\n\n'
save(filename_out, append_txt)
print('#######')
print(chunk)
print("----")
sentence = find_sentence(chunk, task)
print(sentence)
print('#######')
save(filename_out, sentence + "\n\n\n######\n\n\n")
needMore = input("Continue? ENTER means yes (don't type anything else)")
if needMore != '':
print('Quitting')
exit()
elif response == 'No.':
pass
else:
print(f'Unknown response: {response}')
# return output
def find_chunk(text, question):
# prompt = get_contains_question_prompt(text, task)
# return gpt(prompt) # todo catch GPT error
return gpt_chat_search_contains(text, question) # updated to use Chat API and system message
def find_sentence(text, question):
return gpt_chat_search_get_relevant_sentence(text, question)
####
# todo move to main function
# if len(sys.argv) != 2:
# print("No file given")
# exit()
# filename = sys.argv[1]
filename = input('File: ')
file_path = 'input/' + filename
task = input('Question: ')
text = ''
if filename.lower().endswith('.pdf'):
print('Loading text from pdf...')
text = extract_text(file_path)
if not text:
print('Could not load pdf')
exit()
else: # assume plain text
with open(file_path) as f:
text = f.read()
if not text:
print("No text")
exit()
# output = find(text, task)
filename_out = filename + ".ANSWER.txt"
# save(filename_out, output)
find(text, task) # find updated to save incrementally
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment