Skip to content

Instantly share code, notes, and snippets.

@danieltgrant
Created June 5, 2023 20:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danieltgrant/de67809c9e964660893913cee6757405 to your computer and use it in GitHub Desktop.
Save danieltgrant/de67809c9e964660893913cee6757405 to your computer and use it in GitHub Desktop.
import os
from tqdm import tqdm
import json
from tokenizers import ByteLevelBPETokenizer
from transformers import RobertaTokenizerFast, RobertaConfig, RobertaForMaskedLM
from transformers import LineByLineTextDataset, DataCollatorForLanguageModeling
from transformers import Trainer, TrainingArguments
from datasets import load_dataset, Dataset
from typing import List
import logging
import torch
logger = logging.getLogger("transformers")
logger.setLevel(logging.ERROR)
"""
`StringListIterator` is a class that provides an iterator for a list of strings.
Attributes:
data (List[str]): A list of strings to be iterated over.
This class is designed to be used where an iterator over a list of strings is needed. It could be useful in scenarios where the list is large and only one string needs to be in memory at a time.
Methods:
__iter__: This method returns an iterator that yields each string in the data list one at a time.
Example:
iterator = StringListIterator(['apple', 'banana', 'cherry'])
for fruit in iterator:
print(fruit) # Prints each fruit one by one.
"""
class StringListIterator:
def __init__(self, data: List[str]):
self.data = data
def __iter__(self):
for text in self.data:
yield text
"""
The `create_tokenizer` function trains a Byte-Level BPE (Byte-Pair Encoding) tokenizer on a given list of HTTP data
and saves it to the specified model path.
Parameters:
model_path (str): The path where the trained tokenizer will be saved.
str_data (List[str]): The list of HTTP data strings on which the tokenizer will be trained.
This function works by first converting the HTTP data into an iterator using the `StringListIterator` class.
This iterator is then passed to the tokenizer's training function.
The tokenizer is trained to have a vocabulary size of 30,000 tokens, and it includes a set of special tokens
commonly used in transformer models. Tokens that appear less than twice in the data are not included in the vocabulary.
After training, the tokenizer is saved to the specified model path and then reloaded using HuggingFace's
RobertaTokenizerFast class, which is a fast version of the Roberta tokenizer.
Returns:
tokenizer (RobertaTokenizerFast): The trained tokenizer.
Example:
tokenizer = create_tokenizer("tokenizer_model", str_data_list)
"""
def create_tokenizer(model_path, str_data):
string_iterator = StringListIterator(str_data)
tokenizer = ByteLevelBPETokenizer()
tokenizer.train_from_iterator(
iterator=string_iterator,
vocab_size=30000,
min_frequency=2,
special_tokens=[
"<s>",
"<pad>",
"</s>",
"<unk>",
"<mask>",
],
)
tokenizer.save_model(model_path)
tokenizer = RobertaTokenizerFast.from_pretrained(model_path, max_len=512)
return tokenizer
"""
The `create_model_from_scratch` function builds a new RoBERTa model using the HuggingFace Transformers library.
Parameters:
model_path (str): The path where the newly created model will be saved.
tokenizer (RobertaTokenizerFast): A pre-trained tokenizer that will be used to configure the new model.
The function begins by creating a configuration object (`RobertaConfig`) for the new RoBERTa model. The configuration
specifies the following parameters:
- `vocab_size` set to the size of the vocabulary of the tokenizer.
- `max_position_embeddings` set to 514, which is the maximum sequence length the model can accept.
- `num_attention_heads` set to 12, which is the number of attention heads in the self-attention mechanism.
- `num_hidden_layers` set to 6, which is the number of hidden layers in the transformer model.
- `type_vocab_size` set to 1, which is the size of the token type vocabulary.
Using this configuration, a new RoBERTa model for masked language modeling (`RobertaForMaskedLM`) is created.
Finally, the newly created model is saved to the specified model path and returned.
Returns:
model (RobertaForMaskedLM): The newly created RoBERTa model.
Example:
model = create_model_from_scratch("model_directory", tokenizer)
"""
def create_model_from_scratch(model_path, tokenizer):
config = RobertaConfig(
vocab_size=tokenizer.vocab_size,
max_position_embeddings=514,
num_attention_heads=12,
num_hidden_layers=6,
type_vocab_size=1,
)
model = RobertaForMaskedLM(config)
model.save_pretrained(model_path)
return model
"""
The `tokenize_data` function tokenizes a given list of text data using a specific tokenizer and prepares it in a format suitable for model training.
Parameters:
str_data (list): A list of texts (strings) that need to be tokenized.
tokenizer (transformers.tokenization_utils_base.PreTrainedTokenizerBase): The tokenizer used for processing the text data.
The function first defines an inner function, `tokenize_function`, that tokenizes a batch of examples, pads them to the maximum length, and truncates any that exceed the maximum length.
It then constructs a Dataset object from the str_data, using the 'text' field to hold the data.
The function applies `tokenize_function` to the Dataset, which tokenizes all the data in a batched manner, and removes the original 'text' field from the Dataset.
Returns:
Dataset: A Dataset object containing the tokenized text data.
Example:
tokenized_dataset = tokenize_data(str_data, tokenizer)
"""
def tokenize_data(str_data, tokenizer):
def tokenize_function(examples):
return tokenizer(examples["text"], padding="max_length", truncation=True)
dataset = Dataset.from_dict({'text': str_data})
tokenized_dataset = dataset.map(tokenize_function, batched=True, remove_columns=["text"])
return tokenized_dataset
"""
The `train_model` function trains a language model on a given dataset of HTTP data.
Parameters:
tokenizer (transformers.tokenization_utils_base.PreTrainedTokenizerBase): The tokenizer used for processing the text data.
model (transformers.modeling_roberta.RobertaForMaskedLM): The pre-trained language model to be fine-tuned.
model_path (str): The directory path where the trained model will be saved.
str_data (list): The HTTP data to be tokenized and used for training, where each entry in the list is a string.
The function begins by tokenizing the str_data using the provided tokenizer. It then prepares a `DataCollatorForLanguageModeling`
which will be used to collate the samples into batch during training. The MLM probability is set to 0.15.
The `TrainingArguments` are specified with parameters like the output directory for model, number of training epochs,
batch size and the frequency of saving the model during training.
Finally, a `Trainer` is created with the provided model, training arguments, data collator and training dataset.
The model is then trained using the trainer's `train` method and the final trained model is saved to the `model_path`.
Returns:
transformers.modeling_roberta.RobertaForMaskedLM: The fine-tuned language model.
Example:
fine_tuned_model = train_model(tokenizer, model, "model_directory", str_data)
"""
def train_model(tokenizer, model, model_path, str_data):
tokenized_dataset = tokenize_data(str_data, tokenizer)
data_collator = DataCollatorForLanguageModeling(
tokenizer=tokenizer, mlm=True, mlm_probability=0.15
)
training_args = TrainingArguments(
output_dir=model_path,
overwrite_output_dir=True,
num_train_epochs=1,
per_device_train_batch_size=12,
save_steps=10_000,
save_total_limit=2
)
trainer = Trainer(
model=model,
args=training_args,
data_collator=data_collator,
train_dataset=tokenized_dataset,
)
train_output = trainer.train()
model.save_pretrained(model_path)
print(train_output)
return model
"""
The `get_embeddings` function generates embeddings for a list of texts using a specified language model and tokenizer.
Args:
texts (list): A list of texts to generate embeddings for.
model (transformers.modeling_roberta.RobertaForMaskedLM): The language model used to generate embeddings.
tokenizer (transformers.tokenization_utils_base.PreTrainedTokenizerBase): The tokenizer used for processing the text data.
max_length (int, optional): The maximum length to which the texts should be truncated or padded. Defaults to 512.
batch_size (int, optional): The number of texts processed at each iteration. Defaults to 32.
The function first determines the device to use for computations, defaulting to CUDA if available and falling back to CPU otherwise.
It moves the model to the chosen device.
Then, it iteratively processes the texts in batches, tokenizing each batch and moving the
encoded inputs to the device.
With gradient calculations turned off (for efficiency), it feeds the inputs to the model and extracts
the embeddings from the output.
The embeddings corresponding to the first token (typically "[CLS]") of each input are extracted
and added to a list.
This process is repeated for all batches until all texts have been processed.
Returns:
list: A list of numpy arrays representing the embeddings of the input texts.
Example:
embeddings = get_embeddings(texts, model, tokenizer, max_length=512, batch_size=32)
"""
def get_embeddings(texts, model, tokenizer, max_length=512, batch_size=32):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Determine the available device
model.to(device)
embeddings = []
for i in tqdm(range(0, len(texts), batch_size)):
batch_texts = texts[i:i + batch_size]
encoded_inputs = tokenizer(batch_texts, return_tensors='pt', padding=True, truncation=True, max_length=max_length)
input_ids = encoded_inputs["input_ids"].to(device)
with torch.no_grad():
outputs = model.base_model(**encoded_inputs.to(device))
batch_embeddings = outputs.last_hidden_state[:, 0, :].cpu().detach().numpy()
embeddings.extend(batch_embeddings)
return embeddings
## example use:
# model_path = '.'
## load your own list of `str_data`
# ## only do the first time, load them all other times
# # tokenizer = create_tokenizer(model_path, str_data[0:50000])
# # model = create_model_from_scratch(model_path, tokenizer)
# tokenizer = RobertaTokenizerFast.from_pretrained(model_path, max_len=512)
# model = RobertaForMaskedLM.from_pretrained(model_path)
# model = train_model(tokenizer, model, model_path, str_data[0:20000])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment