Last active
August 14, 2021 12:13
-
-
Save yashbonde/cadb515b6c658f18147d948fac685c7b to your computer and use it in GitHub Desktop.
Using GPT right now is very tedious as you have to keep calling `model.generate()` method. This code simplifies this by making __call__ first class and store results in a searchable history!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# wrapper for using GPT generation first-class | |
# MIT - License, 2021, Yash Bonde | |
import os | |
import torch | |
import pickle | |
import hashlib | |
import warnings | |
import numpy as np | |
from time import time | |
from scipy.sparse import vstack | |
from sklearn.neighbors import NearestNeighbors | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
# ------ functions | |
def folder(): | |
try: | |
return os.path.dirname(os.path.realpath(__file__)) | |
except: | |
return "." | |
def md5(t): | |
return hashlib.md5(t.encode("utf-8")).hexdigest() | |
# ------ classes | |
class History(): | |
"""Class that records the responses given by GPT so you don't have to waste | |
time copy pasting things into some notion file. Data is stored as a dictionary in | |
a pickle object and each item is a dictionary with keys like this: | |
{ | |
"b10a8db164e0754105b7a99be72e3fe5": { | |
"prompt": "Hello world", | |
"args": {"n": 10, "r": 2}, | |
"responses": [ | |
'Hello world!" "Hello?" "Yeah, I'm in', | |
'Hello world" command, that is, the command that' | |
] | |
} | |
... | |
} | |
NOTE: we only create keys based on the prompts and not on generation args so | |
args are cached only for first time. | |
""" | |
def __init__(self, loc = None): | |
self.loc = os.path.join(folder(), 'gpt_history.p') if loc is None else loc | |
hist = {} | |
keys = [] | |
if os.path.exists(self.loc): | |
# create the history list and load data from location | |
st = time() | |
with open(self.loc, "rb") as f: | |
hist = pickle.load(f) | |
keys = list(hist.keys()) | |
warnings.warn(f"Loading took: {time() - st:.2f}s") | |
else: | |
warnings.warn(f"No file found at: {self.loc}! Will create a new one.") | |
self.hist = hist | |
self.keys = keys | |
# we will always generate new vectoriser and neighbors objects | |
# we are using tf-idf and fuzzy matching with NN | |
# from: https://gist.github.com/audhiaprilliant/a86a4488d5a029cfae27bfa28efc8b7b | |
all_r = [] # all responses | |
idx_ = [] # index of all responses | |
for k in self.keys: | |
r = self.hist[k]["responses"] | |
all_r.extend(r) | |
idx_.extend([(k, i) for i in range(len(r))]) | |
all_r = [x.lower().strip() for x in all_r] | |
vectorizer = TfidfVectorizer(ngram_range = (1, 4)) | |
X = vectorizer.fit_transform(all_r) | |
nbrs = NearestNeighbors(n_neighbors = 1, metric = 'cosine').fit(X) | |
self.vectorizer = vectorizer | |
self.nbrs = nbrs | |
self.X = X | |
self.idx_ = idx_ | |
def __del__(self): | |
pass | |
def __len__(self): | |
return len(self.hist) | |
def __getitem__(self, i): | |
return self.hist[i] | |
def __iter__(self): | |
for k in self.keys: | |
yield k | |
def __repr__(self): | |
n = -100; m = 70 | |
keys = self.keys[n:][::-1] | |
_t = f"<gpt.History :: {len(self)} >\n" | |
_t += "-" * 70 + "\n" | |
for i,x in enumerate(keys): | |
_t += f"[{i:03d} :: {len(self.hist[x]['responses']):04d}] {self.hist[x]['prompt'][:60]}" + "\n" | |
return _t | |
def __call__(self, x, n_return = 10, return_keys = False): | |
"""search for the closest responses to the input (x)""" | |
assert isinstance(x, str), "Input needs to be a string" | |
# perform tf-idf nn | |
input_vec = self.vectorizer.transform([x.lower().strip()]) | |
_, indices = self.nbrs.kneighbors(input_vec, n_neighbors = n_return) | |
indices = indices.flatten() | |
keys = [self.idx_[x] for x in indices] | |
items = [] | |
for k,i in keys: | |
items.append(self.hist[k]["responses"][i]) | |
if return_keys: | |
return items, keys | |
return items | |
def save(self): | |
# save the pickle file | |
with open(self.loc, "wb") as f: | |
pickle.dump(self.hist, f) | |
def add_item(self, x): | |
"""add a new item to the history, save the new item as pickle file and finally | |
re-generate the nearest neighbors by updating X sparse matrix""" | |
assert isinstance(x, Response), "Input needs to be a gpt.Response object" | |
key = md5(x.prompt) | |
if key in self.keys: | |
# this prompt already exists in the history, we don't update the gen kwargs | |
n = self.hist[key] | |
n["responses"] = list(set(self.hist[key]['responses'] + x.decoded)) | |
else: | |
n = { | |
"prompt": x.prompt, | |
"responses": x.decoded, | |
"args": x.gen_kwargs | |
} | |
self.keys.append(key) | |
self.hist[key] = n | |
# ideally you would like to implement this in __del__ method, that can be called when | |
# code recieves SIGINT or SIGTERM signal, but this does not work in notebooks and so | |
# still saving at each iteration | |
self.save() | |
# update the nearest neighbors, note only add x.decoded since some of n["responses"] | |
# might have already been added | |
new_r = [y.lower().strip() for y in x.decoded] | |
X_new = self.vectorizer.transform(new_r) | |
self.X = vstack((self.X, X_new)) | |
self.nbrs = NearestNeighbors(n_neighbors = 1, metric = 'cosine').fit(self.X) | |
# update the new indices | |
idx_ = [] | |
for k in self.keys: | |
r = self.hist[k]["responses"] | |
idx_.extend([(k, i) for i in range(len(r))]) | |
self.idx_ = idx_ | |
class Response(): | |
"""Class that makes getting generated results chill, simply `print(out)`""" | |
def __init__(self, prompt, out, t, gen_kwargs): | |
self.prompt = prompt # need to add for history | |
self.gen_kwargs = gen_kwargs # need to add for history | |
self.t = t | |
# get the generated hidden_states/attentions/strings | |
self.sequences = out.sequences.cpu().tolist() | |
self.scores = [x.cpu().numpy() for x in out.scores] if out.scores != None else None | |
self.hidden_states = [ | |
[y.cpu().numpy() for y in x] | |
for x in out.hidden_states | |
] if out.hidden_states != None else None | |
self.attentions = [ | |
[y.cpu().numpy() for y in x] | |
for x in out.attentions | |
] if out.attentions != None else None | |
self.decoded = self.t.batch_decode(self.sequences, skip_special_tokens = True) | |
def __repr__(self): | |
str_ = "" | |
for x in self.decoded: | |
str_ += x + "\n" | |
str_ += "-"* 70 + "\n" | |
return str_ | |
def __len__(self): | |
return len(self.decoded) | |
def __getitem__(self, i): | |
return self.decoded[i] | |
def __iter__(self): | |
for x in self.decoded: | |
yield x | |
class GPT(): | |
"""Make GPT a first class object and using it as simple as possible. | |
First define the model and tokenizer | |
>>> device = torch.device("cuda:0") if torch.cuda.is_available() else "CPU" | |
>>> tokenizer = AutoTokenizer.from_pretrained(name) | |
>>> model = AutoModelForCausalLM.from_pretrained(name, cache_dir = "../hf-cache/").eval().to(device) | |
Make GPT wrapper: output is `Response`, a class with __repr__ overloaded so print gives the generation | |
>>> gpt = GPT(model, tokenizer) | |
>>> out = gpt("Hello world", n = 10, r = 2) | |
>>> out | |
... Hello world!" "Hello?" "Yeah, I'm in | |
---------------------------------------------------------------------- | |
Hello world" command, that is, the command that | |
---------------------------------------------------------------------- | |
""" | |
def __init__(self, model, tokenizer, history_loc = None): | |
self.model = model | |
self.tokenizer = tokenizer | |
self.eot_id = tokenizer.eos_token_id | |
self.device = self.model.device | |
self.history = History() | |
@torch.no_grad() | |
def __call__( | |
self, | |
prompt: str, | |
n: int = 16, # number of tokens | |
r: int = 1, # number of sequences | |
do_sample = True, | |
temp = 0.9, | |
top_p = 0.9, | |
top_k = None, | |
output_scores = None, | |
output_hidden_states = None, | |
output_attentions = None, | |
stop_sequence = None, | |
return_response = True, | |
**gen_kwargs | |
): | |
""" __call__ overloader initialises the model.generate() function. We emphasise | |
a lot more on most powerful arguments, but you can always pass custom kwargs through | |
`gen_kwargs`. As you can see that we have not added many beam-search related arguments. | |
Args: | |
prompt (str): prompt string, tokens will be generated in continuation | |
n (int, optional): number of tokens to return | |
r (int, optional): number of sequences to return | |
temp (float, optional): sampling temperature | |
top_p (float, optional): tokens whose probability adds up to this are considered | |
top_k (int, optional): top-k tokens to consider for each distribution | |
output_scores (bool, optional): output scores for each generted token, returns shape `[r,n]` | |
output_hidden_states (bool, optional): output the hidden states of the generation, returns shape `[r,n+1,...]` | |
output_attentions (bool, optional): Whether or not to return the attentions tensors of all attention layers | |
stop_sequence (str, optional): Stop generation once the first token of this string is achieved | |
return_response (bool, optional): To parse the generated dictionary to `Response` class | |
gen_kwargs (dict, optional): any extra arguments to pass to the model.generate() method | |
Returns: | |
if return_response: | |
Response instance | |
else: | |
model.generate() output | |
""" | |
t = self.tokenizer | |
m = self.model | |
# tokenize the input prompt and stop token if provided | |
input_ids = t(prompt, return_tensors = "pt")["input_ids"].to(self.device) | |
if stop_sequence is not None: | |
eos_token_id = t(stop_sequence)["input_ids"][0] | |
else: | |
eos_token_id = self.eot_id | |
# generate the items | |
out = m.generate( | |
input_ids, | |
max_length = len(input_ids[0]) + n, | |
temperature = temp, | |
top_p=top_p, | |
top_k=top_k, | |
num_return_sequences=r, | |
pad_token_id = self.eot_id, | |
output_scores = output_scores, | |
output_hidden_states = output_hidden_states, | |
output_attentions = output_attentions, | |
do_sample = do_sample, | |
return_dict_in_generate = True, | |
eos_token_id = eos_token_id, | |
**gen_kwargs | |
) | |
# return items or | |
if return_response: | |
x = Response(prompt, out, t, { | |
"n": n, "r": r, | |
"do_sample": do_sample, | |
"temp": temp, | |
"top_p": top_p, | |
"top_k": top_k, | |
**gen_kwargs | |
}) | |
self.history.add_item(x) | |
return x | |
else: | |
return out | |
def classify( | |
self, | |
prompt: str, | |
labels: list, | |
softmax_temp = 0.9, | |
add_unknown = False, | |
**gen_kwargs, | |
) -> dict: | |
"""Perform classification directly. | |
NOTE: ensure that first tokens in labels are not the same. | |
Args: | |
prompt (str): prompt string to be given as input | |
labels (list): list of strings that are labels | |
gen_kwargs (dict, optional): extra arguments to be passed for generation | |
softmax_temp (float, optional): temprature for scoring labels. Defaults to 0.9. | |
add_unknown (bool, optional): adds an extra "Unknown" label. Defaults to False. | |
Returns: | |
dict: values are 0. if model returns 'nan' | |
""" | |
# we will use the same format that OpenAI uses for GPT-3 | |
# read: https://beta.openai.com/docs/guides/classifications | |
# We normalize all labels by `label.strip().lower().capitalize()` at the API | |
# backend. Thus corresponding output labels are always capitalized. | |
unq_options = set([x.strip().lower().capitalize() for x in labels]) | |
unq_options = sorted(list(unq_options)) | |
# each label must have a distinct first token, because classification | |
# works by looking only one step ahead. Also encode the labels with extra | |
# white space prepended. | |
label_ids = [self.tokenizer.encode(" " + x)[0] for x in unq_options] | |
# since the labels are always prepended with a " ", we don't need to | |
# add that to the prompt, thus .strip() | |
prompt = prompt.strip() | |
# call the model | |
out = self(prompt, n = 1, r = 1, output_scores = True, **gen_kwargs, return_response = False) | |
logits = out.scores[0][0] | |
logits = (logits / softmax_temp)[label_ids].softmax(-1).cpu() | |
logits = logits.numpy() | |
scores = {o:i for o,i in zip(unq_options, logits)} | |
# naaaaan - check | |
scores = {k: 0. if np.isnan(l) else l for k,l in scores.items()} | |
if add_unknown: | |
# fill the Probability for the special "Unknown" token | |
scores["Unknown"] = 1 - sum(scores.values()) | |
return scores | |
class Prompt(): | |
"""Prompts are ideas and language and thus they sit at the core of this project and not GPT(), | |
it is merely a tool and Prompt the force that runs it. Add your own custom operations inside | |
the prompts instead of strings. | |
>>> p = prompt("Hello world", n = 10, r = 2) | |
>>> p(gpt) | |
... Hello world!" "Hello?" "Yeah, I'm in | |
---------------------------------------------------------------------- | |
Hello world" command, that is, the command that | |
---------------------------------------------------------------------- | |
""" | |
def __init__(self, x: str, **gen_kwargs): | |
self.gen_kwargs = gen_kwargs | |
self.x = x | |
def __call__(self, gpt, **gen_kwargs): | |
gen_kwargs = gen_kwargs.update(self.gen_kwargs) if gen_kwargs else self.gen_kwargs | |
out = gpt(prompt = self.x, **gen_kwargs) if gen_kwargs != None else gpt(prompt = self.x) | |
return out | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The objective of this script is just like a सूत्र (pronounced "Sutra"):
In Consideration
onnxruntime
modelDocs
Prompt
: Prompt can be programmed for specific macro, like each formula it stays unattached to any gptGPT
: The wrapper forgpt.generate()
that auto-manages device handling and tokenizationResponse
: The wrapper for output that makes going over generations simplerHistory
: Cache the output, don't copy paste prompt here and there.urls
TODO
History
that can automatically cluster the documents by ideas / words so you don't see things like this: