Skip to content

Instantly share code, notes, and snippets.

@yashbonde
Last active August 14, 2021 12:13
Show Gist options
  • Save yashbonde/cadb515b6c658f18147d948fac685c7b to your computer and use it in GitHub Desktop.
Save yashbonde/cadb515b6c658f18147d948fac685c7b to your computer and use it in GitHub Desktop.
Using GPT right now is very tedious as you have to keep calling `model.generate()` method. This code simplifies this by making __call__ first class and store results in a searchable history!
# wrapper for using GPT generation first-class
# MIT - License, 2021, Yash Bonde
import os
import torch
import pickle
import hashlib
import warnings
import numpy as np
from time import time
from scipy.sparse import vstack
from sklearn.neighbors import NearestNeighbors
from sklearn.feature_extraction.text import TfidfVectorizer
# ------ functions
def folder():
try:
return os.path.dirname(os.path.realpath(__file__))
except:
return "."
def md5(t):
return hashlib.md5(t.encode("utf-8")).hexdigest()
# ------ classes
class History():
"""Class that records the responses given by GPT so you don't have to waste
time copy pasting things into some notion file. Data is stored as a dictionary in
a pickle object and each item is a dictionary with keys like this:
{
"b10a8db164e0754105b7a99be72e3fe5": {
"prompt": "Hello world",
"args": {"n": 10, "r": 2},
"responses": [
'Hello world!" "Hello?" "Yeah, I'm in',
'Hello world" command, that is, the command that'
]
}
...
}
NOTE: we only create keys based on the prompts and not on generation args so
args are cached only for first time.
"""
def __init__(self, loc = None):
self.loc = os.path.join(folder(), 'gpt_history.p') if loc is None else loc
hist = {}
keys = []
if os.path.exists(self.loc):
# create the history list and load data from location
st = time()
with open(self.loc, "rb") as f:
hist = pickle.load(f)
keys = list(hist.keys())
warnings.warn(f"Loading took: {time() - st:.2f}s")
else:
warnings.warn(f"No file found at: {self.loc}! Will create a new one.")
self.hist = hist
self.keys = keys
# we will always generate new vectoriser and neighbors objects
# we are using tf-idf and fuzzy matching with NN
# from: https://gist.github.com/audhiaprilliant/a86a4488d5a029cfae27bfa28efc8b7b
all_r = [] # all responses
idx_ = [] # index of all responses
for k in self.keys:
r = self.hist[k]["responses"]
all_r.extend(r)
idx_.extend([(k, i) for i in range(len(r))])
all_r = [x.lower().strip() for x in all_r]
vectorizer = TfidfVectorizer(ngram_range = (1, 4))
X = vectorizer.fit_transform(all_r)
nbrs = NearestNeighbors(n_neighbors = 1, metric = 'cosine').fit(X)
self.vectorizer = vectorizer
self.nbrs = nbrs
self.X = X
self.idx_ = idx_
def __del__(self):
pass
def __len__(self):
return len(self.hist)
def __getitem__(self, i):
return self.hist[i]
def __iter__(self):
for k in self.keys:
yield k
def __repr__(self):
n = -100; m = 70
keys = self.keys[n:][::-1]
_t = f"<gpt.History :: {len(self)} >\n"
_t += "-" * 70 + "\n"
for i,x in enumerate(keys):
_t += f"[{i:03d} :: {len(self.hist[x]['responses']):04d}] {self.hist[x]['prompt'][:60]}" + "\n"
return _t
def __call__(self, x, n_return = 10, return_keys = False):
"""search for the closest responses to the input (x)"""
assert isinstance(x, str), "Input needs to be a string"
# perform tf-idf nn
input_vec = self.vectorizer.transform([x.lower().strip()])
_, indices = self.nbrs.kneighbors(input_vec, n_neighbors = n_return)
indices = indices.flatten()
keys = [self.idx_[x] for x in indices]
items = []
for k,i in keys:
items.append(self.hist[k]["responses"][i])
if return_keys:
return items, keys
return items
def save(self):
# save the pickle file
with open(self.loc, "wb") as f:
pickle.dump(self.hist, f)
def add_item(self, x):
"""add a new item to the history, save the new item as pickle file and finally
re-generate the nearest neighbors by updating X sparse matrix"""
assert isinstance(x, Response), "Input needs to be a gpt.Response object"
key = md5(x.prompt)
if key in self.keys:
# this prompt already exists in the history, we don't update the gen kwargs
n = self.hist[key]
n["responses"] = list(set(self.hist[key]['responses'] + x.decoded))
else:
n = {
"prompt": x.prompt,
"responses": x.decoded,
"args": x.gen_kwargs
}
self.keys.append(key)
self.hist[key] = n
# ideally you would like to implement this in __del__ method, that can be called when
# code recieves SIGINT or SIGTERM signal, but this does not work in notebooks and so
# still saving at each iteration
self.save()
# update the nearest neighbors, note only add x.decoded since some of n["responses"]
# might have already been added
new_r = [y.lower().strip() for y in x.decoded]
X_new = self.vectorizer.transform(new_r)
self.X = vstack((self.X, X_new))
self.nbrs = NearestNeighbors(n_neighbors = 1, metric = 'cosine').fit(self.X)
# update the new indices
idx_ = []
for k in self.keys:
r = self.hist[k]["responses"]
idx_.extend([(k, i) for i in range(len(r))])
self.idx_ = idx_
class Response():
"""Class that makes getting generated results chill, simply `print(out)`"""
def __init__(self, prompt, out, t, gen_kwargs):
self.prompt = prompt # need to add for history
self.gen_kwargs = gen_kwargs # need to add for history
self.t = t
# get the generated hidden_states/attentions/strings
self.sequences = out.sequences.cpu().tolist()
self.scores = [x.cpu().numpy() for x in out.scores] if out.scores != None else None
self.hidden_states = [
[y.cpu().numpy() for y in x]
for x in out.hidden_states
] if out.hidden_states != None else None
self.attentions = [
[y.cpu().numpy() for y in x]
for x in out.attentions
] if out.attentions != None else None
self.decoded = self.t.batch_decode(self.sequences, skip_special_tokens = True)
def __repr__(self):
str_ = ""
for x in self.decoded:
str_ += x + "\n"
str_ += "-"* 70 + "\n"
return str_
def __len__(self):
return len(self.decoded)
def __getitem__(self, i):
return self.decoded[i]
def __iter__(self):
for x in self.decoded:
yield x
class GPT():
"""Make GPT a first class object and using it as simple as possible.
First define the model and tokenizer
>>> device = torch.device("cuda:0") if torch.cuda.is_available() else "CPU"
>>> tokenizer = AutoTokenizer.from_pretrained(name)
>>> model = AutoModelForCausalLM.from_pretrained(name, cache_dir = "../hf-cache/").eval().to(device)
Make GPT wrapper: output is `Response`, a class with __repr__ overloaded so print gives the generation
>>> gpt = GPT(model, tokenizer)
>>> out = gpt("Hello world", n = 10, r = 2)
>>> out
... Hello world!" "Hello?" "Yeah, I'm in
----------------------------------------------------------------------
Hello world" command, that is, the command that
----------------------------------------------------------------------
"""
def __init__(self, model, tokenizer, history_loc = None):
self.model = model
self.tokenizer = tokenizer
self.eot_id = tokenizer.eos_token_id
self.device = self.model.device
self.history = History()
@torch.no_grad()
def __call__(
self,
prompt: str,
n: int = 16, # number of tokens
r: int = 1, # number of sequences
do_sample = True,
temp = 0.9,
top_p = 0.9,
top_k = None,
output_scores = None,
output_hidden_states = None,
output_attentions = None,
stop_sequence = None,
return_response = True,
**gen_kwargs
):
""" __call__ overloader initialises the model.generate() function. We emphasise
a lot more on most powerful arguments, but you can always pass custom kwargs through
`gen_kwargs`. As you can see that we have not added many beam-search related arguments.
Args:
prompt (str): prompt string, tokens will be generated in continuation
n (int, optional): number of tokens to return
r (int, optional): number of sequences to return
temp (float, optional): sampling temperature
top_p (float, optional): tokens whose probability adds up to this are considered
top_k (int, optional): top-k tokens to consider for each distribution
output_scores (bool, optional): output scores for each generted token, returns shape `[r,n]`
output_hidden_states (bool, optional): output the hidden states of the generation, returns shape `[r,n+1,...]`
output_attentions (bool, optional): Whether or not to return the attentions tensors of all attention layers
stop_sequence (str, optional): Stop generation once the first token of this string is achieved
return_response (bool, optional): To parse the generated dictionary to `Response` class
gen_kwargs (dict, optional): any extra arguments to pass to the model.generate() method
Returns:
if return_response:
Response instance
else:
model.generate() output
"""
t = self.tokenizer
m = self.model
# tokenize the input prompt and stop token if provided
input_ids = t(prompt, return_tensors = "pt")["input_ids"].to(self.device)
if stop_sequence is not None:
eos_token_id = t(stop_sequence)["input_ids"][0]
else:
eos_token_id = self.eot_id
# generate the items
out = m.generate(
input_ids,
max_length = len(input_ids[0]) + n,
temperature = temp,
top_p=top_p,
top_k=top_k,
num_return_sequences=r,
pad_token_id = self.eot_id,
output_scores = output_scores,
output_hidden_states = output_hidden_states,
output_attentions = output_attentions,
do_sample = do_sample,
return_dict_in_generate = True,
eos_token_id = eos_token_id,
**gen_kwargs
)
# return items or
if return_response:
x = Response(prompt, out, t, {
"n": n, "r": r,
"do_sample": do_sample,
"temp": temp,
"top_p": top_p,
"top_k": top_k,
**gen_kwargs
})
self.history.add_item(x)
return x
else:
return out
def classify(
self,
prompt: str,
labels: list,
softmax_temp = 0.9,
add_unknown = False,
**gen_kwargs,
) -> dict:
"""Perform classification directly.
NOTE: ensure that first tokens in labels are not the same.
Args:
prompt (str): prompt string to be given as input
labels (list): list of strings that are labels
gen_kwargs (dict, optional): extra arguments to be passed for generation
softmax_temp (float, optional): temprature for scoring labels. Defaults to 0.9.
add_unknown (bool, optional): adds an extra "Unknown" label. Defaults to False.
Returns:
dict: values are 0. if model returns 'nan'
"""
# we will use the same format that OpenAI uses for GPT-3
# read: https://beta.openai.com/docs/guides/classifications
# We normalize all labels by `label.strip().lower().capitalize()` at the API
# backend. Thus corresponding output labels are always capitalized.
unq_options = set([x.strip().lower().capitalize() for x in labels])
unq_options = sorted(list(unq_options))
# each label must have a distinct first token, because classification
# works by looking only one step ahead. Also encode the labels with extra
# white space prepended.
label_ids = [self.tokenizer.encode(" " + x)[0] for x in unq_options]
# since the labels are always prepended with a " ", we don't need to
# add that to the prompt, thus .strip()
prompt = prompt.strip()
# call the model
out = self(prompt, n = 1, r = 1, output_scores = True, **gen_kwargs, return_response = False)
logits = out.scores[0][0]
logits = (logits / softmax_temp)[label_ids].softmax(-1).cpu()
logits = logits.numpy()
scores = {o:i for o,i in zip(unq_options, logits)}
# naaaaan - check
scores = {k: 0. if np.isnan(l) else l for k,l in scores.items()}
if add_unknown:
# fill the Probability for the special "Unknown" token
scores["Unknown"] = 1 - sum(scores.values())
return scores
class Prompt():
"""Prompts are ideas and language and thus they sit at the core of this project and not GPT(),
it is merely a tool and Prompt the force that runs it. Add your own custom operations inside
the prompts instead of strings.
>>> p = prompt("Hello world", n = 10, r = 2)
>>> p(gpt)
... Hello world!" "Hello?" "Yeah, I'm in
----------------------------------------------------------------------
Hello world" command, that is, the command that
----------------------------------------------------------------------
"""
def __init__(self, x: str, **gen_kwargs):
self.gen_kwargs = gen_kwargs
self.x = x
def __call__(self, gpt, **gen_kwargs):
gen_kwargs = gen_kwargs.update(self.gen_kwargs) if gen_kwargs else self.gen_kwargs
out = gpt(prompt = self.x, **gen_kwargs) if gen_kwargs != None else gpt(prompt = self.x)
return out
@yashbonde
Copy link
Author

yashbonde commented Jun 21, 2021

The objective of this script is just like a सूत्र (pronounced "Sutra"):

  • Minimum Code
  • Unambiguous
  • Concise
  • Non-redundant

In Consideration

  • Load the onnxruntime model
  • Should I add the macros I use here or should that be a different file?

Docs

  • Prompt: Prompt can be programmed for specific macro, like each formula it stays unattached to any gpt
  • GPT: The wrapper for gpt.generate() that auto-manages device handling and tokenization
  • Response: The wrapper for output that makes going over generations simpler
  • History: Cache the output, don't copy paste prompt here and there.

urls

  • This page has very useful instructions on how to get good GPT generations.

TODO

  • Standardise to latest call and not to when the prompt was first made.
  • Build a better data store inside History that can automatically cluster the documents by ideas / words so you don't see things like this:
>>> hist
... <gpt.History :: 14 >
    ----------------------------------------------------------------------
    [000 :: 0010] I am writing different narratives for my cloud developement 
    [001 :: 0026] I am writing different narratives for my cloud developement 
    [002 :: 0001] I am writing different narratives for my cloud developement 
    [003 :: 0001] I am writing different narratives for my cloud developement 
    .... more things ....

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment