Last active
May 19, 2023 17:04
OAGBERT
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# @Author: Sadamori Kojaku | |
# @Date: 2022-10-05 06:24:53 | |
# @Last Modified by: Sadamori Kojaku | |
# @Last Modified time: 2023-05-19 13:04:26 | |
import sys | |
from tqdm.auto import tqdm | |
import numpy as np | |
import pandas as pd | |
import torch | |
from cogdl.oag import oagbert | |
from scipy import sparse | |
from joblib import Parallel, delayed | |
from joblib import parallel_backend | |
class oagbertEmbedder: | |
""" | |
Embeds input text using an OAG-BERT model. | |
Example: | |
---------- | |
data = [{"title":"title 1", "journal": "journal 1" }, {"title":"title 2", "journal": "journal 2" }] | |
data_attrs = ["title", "journal"] # Supported attributes are: "title", "abstract", "affiliation", "journal", "concepts", "coauthors", and "author". | |
devices = ["cuda:0", "cuda:1"] # If GPU is not available, specify "cpu" instead. | |
embedder = oagbertEmbedder( | |
model_file=model_file, # optional | |
data_attrs=data_attrs, | |
devices=devices, | |
show_progress_bar=True, | |
) | |
emb = embedder.transform(data) | |
Parameters: | |
----------- | |
model_file: str | |
Path to the pretrained model file. | |
data_attrs: dict | |
Mapping from attribute names to their corresponding values. | |
devices: Union[str, List[str]] | |
Device(s) to use for embedding computation. Default is "cpu". | |
show_progress_bar: bool | |
Whether to display a progress bar during embedding computation. Default is False. | |
**params: | |
Additional parameters to pass to the underlying transformer model. | |
Attributes: | |
----------- | |
devices: List[str] | |
The device(s) used for embedding computation. | |
n_jobs: int | |
Number of parallel jobs to use. | |
show_progress_bar: bool | |
Whether to display a progress bar during embedding computation. | |
model_file: str | |
Path to the pretrained model file. | |
data_attrs: dict | |
Mapping from attribute names to their corresponding values. | |
Returns: | |
-------- | |
numpy.ndarray | |
An array of shape (n_samples, n_features) containing the embeddings for the input text. | |
""" | |
def __init__( | |
self, model_file, data_attrs, devices="cpu", show_progress_bar=False, **params | |
): | |
if not isinstance(devices, list): | |
devices = [devices] | |
self.devices = devices | |
self.n_jobs = len(devices) | |
self.show_progress_bar = show_progress_bar | |
self.model_file = model_file | |
self.data_attrs = data_attrs | |
def transform(self, texts): | |
""" | |
Transforms a list of texts into their embeddings using parallel computing. | |
Parameters: | |
----------- | |
texts : list | |
A list of texts to be embedded. | |
Attributes: | |
----------- | |
n_jobs : int | |
Number of parallel jobs to run. | |
model_file : str | |
Path to the model file. | |
data_attrs : dict | |
Dictionary containing attributes that define the input data format. | |
devices : list | |
List of devices to use for parallel processing. | |
show_progress_bar : bool | |
Whether to show progress bar for the embedding process. | |
Returns: | |
-------- | |
numpy.ndarray | |
An array of shape (n_samples, n_features) containing the text embeddings. | |
""" | |
def _embed(paper, device, model_file, data_attrs, show_progress_bar, idx): | |
model = oagbertModel( | |
encoder_model_file=model_file, | |
data_attrs=data_attrs, | |
device=device, | |
show_progress_bar=show_progress_bar, | |
) | |
emb = model.transform(paper) | |
del model | |
return [idx, emb] | |
if len(texts) < self.n_jobs * 10: | |
return _embed(texts, self.devices[0], self.show_progress_bar, 0)[1] | |
n = int(np.ceil(len(texts) / self.n_jobs)) | |
text_chunks = [texts[i : i + n] for i in range(0, len(texts), n)] | |
with parallel_backend("threading", n_jobs=self.n_jobs): | |
results = Parallel()( | |
delayed(_embed)( | |
paper=text_chunks[i], | |
device=self.devices[i], | |
model_file=self.model_file, | |
data_attrs=self.data_attrs, | |
show_progress_bar=self.show_progress_bar, | |
idx=i, | |
) | |
for i in range(self.n_jobs) | |
) | |
order = np.argsort([results[i][0] for i in range(len(results))]) | |
return np.vstack([results[i][1] for i in order]) | |
class oagbertModel: | |
""" | |
Trait vectorizer based on OAGBERT. | |
Examples: | |
----------- | |
data = [ | |
{ | |
"title": "Higgs boson: large hadron collider", | |
"author": "John Smith", | |
"journal": "Theory of Physics", | |
}, | |
{ | |
"concepts": "Theory", | |
}, | |
{ | |
"concepts": "Experimental", | |
}, | |
] | |
model = TextVectorizerOAGBERT( | |
data_attrs=["title", "author", "journal", "concepts"], | |
) | |
emb = model.transform(data) | |
Parameters: | |
----------- | |
encoder_model_file: str, optional | |
Path to the encoder model file. If None, "oagbert-v2-sim" is used. | |
data_attrs: list of str or str, required | |
List of document attributes to use for vectorization. | |
Supported attributes are: "title", "abstract", "affiliation", "journal", | |
"concepts", "coauthors", and "author". | |
show_progress_bar: bool, optional | |
Whether to display a progress bar during vectorization. Defaults to False. | |
filename: str, optional | |
Name of the pre-trained model file to load. Ignored if encoder_model_file is provided. | |
device: str, optional | |
Device to use for vectorization. Defaults to "cpu". | |
Attributes: | |
----------- | |
attri_name_mapping: dict | |
Dictionary mapping supported data_attrs to corresponding attribute names. | |
dim: int | |
Dimensionality of the output vectors. | |
do_lower_case: dict | |
Dictionary indicating whether to lowercase each data attribute during tokenization. | |
encoder_model_file: str | |
Path to the encoder model file. | |
is_list_type_input: dict | |
Dictionary indicating whether each data attribute should be treated as a list of | |
strings (True) or a single string (False) during tokenization. | |
model: OAGBertForPreTraining | |
Pretrained OAGBERT model loaded from encoder_model_file. | |
show_progress_bar: bool | |
Whether to display a progress bar during vectorization. | |
tokenizer: transformers.PreTrainedTokenizerFast | |
Tokenizer to use for encoding text inputs. | |
Returns: | |
-------- | |
None | |
""" | |
def __init__( | |
self, | |
data_attrs=None, | |
show_progress_bar=False, | |
encoder_model_file=None, | |
device="cpu", | |
# n_jobs=1, | |
): | |
self.show_progress_bar = show_progress_bar | |
self.data_attrs = data_attrs | |
self.device = device | |
self.dim = 768 | |
self.attri_name_mapping = { | |
"title": "title", | |
"abstract": "abstract", | |
"affiliation": "affiliations", | |
"journal": "venue", | |
"concepts": "concepts", | |
"coauthors": "authors", | |
"author": "authors", | |
} | |
self.is_list_type_input = { | |
"title": False, | |
"abstract": False, | |
"affiliation": True, | |
"journal": False, | |
"concepts": True, | |
"coauthors": True, | |
"author": True, | |
} | |
self.do_lower_case = { | |
"title": False, | |
"abstract": False, | |
"affiliations": True, | |
"venue": True, | |
"concepts": True, | |
"authors": False, | |
} | |
self.tokenizer, self.model = oagbert( | |
"oagbert-v2-sim" if encoder_model_file is None else encoder_model_file | |
) | |
self.model.eval() | |
self.model.to(self.device) | |
def save(self, filename): | |
pass | |
def load(self, filename, device=None): | |
pass | |
def transform(self, data, concatenate=False): | |
""" | |
Transform the input data to a set of normalized feature vectors grouped by attributes. | |
This function will retrieve the traits in data specified by the keys. | |
Then, for each key, it will generate an embedding vector. Once the embedding for all keys | |
are generated, they are concatenated into one single vector as a vector representation | |
of the given data ('if concatenate = False'. Otherwise return a dict containing the subvectors). | |
Parameters: | |
----------- | |
data : list of dict | |
The input data where each element is a dictionary of attributes and their values. | |
concatenate : bool (default = False) | |
Whether to concatenate all subvectors into one vector or return a dictionary of subvectors grouped by attributes. | |
Returns: | |
-------- | |
subvecs : dict or ndarray | |
A dictionary of subvectors grouped by attributes, or the concatenated subvectors if `concatenate` is True. | |
""" | |
trait_vec = [] | |
# for data in tqdm(dict_list, disable=not self.show_progress_bar): | |
for i, paper in tqdm( | |
enumerate(data), disable=not self.show_progress_bar, total=len(data) | |
): | |
trait_vec += [self._encode(paper)] | |
trait_vec = np.vstack(trait_vec) | |
return trait_vec | |
def fit(self, train_data_file, data_attrs, output_file, chunksize=100000): | |
pass | |
def _encode(self, _data): | |
""" | |
Encodes a list of dictionaries using a pre-trained BERT model. | |
Parameters | |
---------- | |
dict_list : List[Dict] | |
A list of dictionaries representing input data for BERT model. | |
Attributes | |
---------- | |
emb_list : List[np.ndarray] | |
A list of numpy arrays representing the encoded output embeddings. | |
Returns | |
------- | |
emb : np.ndarray | |
A two-dimensional numpy array containing all the encoded embeddings. | |
""" | |
# | |
# to_oag_format(_data): | |
# | |
_oagbert_input = {} | |
for k in self.data_attrs: | |
if k not in _data: | |
continue | |
v = _data[k] | |
if self.is_list_type_input[k] & (not isinstance(v, list)): | |
v = [v] | |
if len(v) == 0: | |
continue | |
_k = self.attri_name_mapping[k] | |
if _k in _oagbert_input: | |
_oagbert_input[_k] += v if isinstance(v, list) else "_" + v | |
else: | |
_oagbert_input[_k] = v | |
# Lower casing | |
for k, v in _oagbert_input.items(): | |
if self.do_lower_case[k]: | |
_oagbert_input[k] = ( | |
[d.lower() for d in v] if isinstance(v, list) else v.lower() | |
) | |
if len(_oagbert_input) == 0: | |
return np.zeros(self.dim) | |
( | |
input_ids, | |
input_masks, | |
token_type_ids, | |
masked_lm_labels, | |
position_ids, | |
position_ids_second, | |
masked_positions, | |
num_spans, | |
) = self.model.build_inputs(**_oagbert_input) | |
_, emb = self.model.bert.forward( | |
input_ids=torch.LongTensor(input_ids).unsqueeze(0).to(self.device), | |
token_type_ids=torch.LongTensor(token_type_ids) | |
.unsqueeze(0) | |
.to(self.device), | |
attention_mask=torch.LongTensor(input_masks).unsqueeze(0).to(self.device), | |
output_all_encoded_layers=False, | |
checkpoint_activations=False, | |
position_ids=torch.LongTensor(position_ids).unsqueeze(0).to(self.device), | |
position_ids_second=torch.LongTensor(position_ids_second) | |
.unsqueeze(0) | |
.to(self.device), | |
) | |
if self.device != "cpu": | |
emb = np.array(emb.cpu().detach().numpy()).reshape((1, -1)) | |
else: | |
emb = np.array(emb.detach().numpy()).reshape((1, -1)) | |
return emb.reshape((1, -1)) | |
def keys(self): | |
return [self.get_key_name(attrs) for attrs in self.data_attrs] | |
def get_key_name(self, attr): | |
if isinstance(attr, list): | |
return "_".join(attr) | |
return attr | |
def get_vector_dims(self): | |
return {k: self.dim for k in self.keys()} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment