Skip to content

Instantly share code, notes, and snippets.

@skojaku
Last active May 19, 2023 17:04
Show Gist options
  • Save skojaku/b2dddb538713371837d4e06f99e70635 to your computer and use it in GitHub Desktop.
Save skojaku/b2dddb538713371837d4e06f99e70635 to your computer and use it in GitHub Desktop.
OAGBERT
# -*- coding: utf-8 -*-
# @Author: Sadamori Kojaku
# @Date: 2022-10-05 06:24:53
# @Last Modified by: Sadamori Kojaku
# @Last Modified time: 2023-05-19 13:04:26
import sys
from tqdm.auto import tqdm
import numpy as np
import pandas as pd
import torch
from cogdl.oag import oagbert
from scipy import sparse
from joblib import Parallel, delayed
from joblib import parallel_backend
class oagbertEmbedder:
"""
Embeds input text using an OAG-BERT model.
Example:
----------
data = [{"title":"title 1", "journal": "journal 1" }, {"title":"title 2", "journal": "journal 2" }]
data_attrs = ["title", "journal"] # Supported attributes are: "title", "abstract", "affiliation", "journal", "concepts", "coauthors", and "author".
devices = ["cuda:0", "cuda:1"] # If GPU is not available, specify "cpu" instead.
embedder = oagbertEmbedder(
model_file=model_file, # optional
data_attrs=data_attrs,
devices=devices,
show_progress_bar=True,
)
emb = embedder.transform(data)
Parameters:
-----------
model_file: str
Path to the pretrained model file.
data_attrs: dict
Mapping from attribute names to their corresponding values.
devices: Union[str, List[str]]
Device(s) to use for embedding computation. Default is "cpu".
show_progress_bar: bool
Whether to display a progress bar during embedding computation. Default is False.
**params:
Additional parameters to pass to the underlying transformer model.
Attributes:
-----------
devices: List[str]
The device(s) used for embedding computation.
n_jobs: int
Number of parallel jobs to use.
show_progress_bar: bool
Whether to display a progress bar during embedding computation.
model_file: str
Path to the pretrained model file.
data_attrs: dict
Mapping from attribute names to their corresponding values.
Returns:
--------
numpy.ndarray
An array of shape (n_samples, n_features) containing the embeddings for the input text.
"""
def __init__(
self, model_file, data_attrs, devices="cpu", show_progress_bar=False, **params
):
if not isinstance(devices, list):
devices = [devices]
self.devices = devices
self.n_jobs = len(devices)
self.show_progress_bar = show_progress_bar
self.model_file = model_file
self.data_attrs = data_attrs
def transform(self, texts):
"""
Transforms a list of texts into their embeddings using parallel computing.
Parameters:
-----------
texts : list
A list of texts to be embedded.
Attributes:
-----------
n_jobs : int
Number of parallel jobs to run.
model_file : str
Path to the model file.
data_attrs : dict
Dictionary containing attributes that define the input data format.
devices : list
List of devices to use for parallel processing.
show_progress_bar : bool
Whether to show progress bar for the embedding process.
Returns:
--------
numpy.ndarray
An array of shape (n_samples, n_features) containing the text embeddings.
"""
def _embed(paper, device, model_file, data_attrs, show_progress_bar, idx):
model = oagbertModel(
encoder_model_file=model_file,
data_attrs=data_attrs,
device=device,
show_progress_bar=show_progress_bar,
)
emb = model.transform(paper)
del model
return [idx, emb]
if len(texts) < self.n_jobs * 10:
return _embed(texts, self.devices[0], self.show_progress_bar, 0)[1]
n = int(np.ceil(len(texts) / self.n_jobs))
text_chunks = [texts[i : i + n] for i in range(0, len(texts), n)]
with parallel_backend("threading", n_jobs=self.n_jobs):
results = Parallel()(
delayed(_embed)(
paper=text_chunks[i],
device=self.devices[i],
model_file=self.model_file,
data_attrs=self.data_attrs,
show_progress_bar=self.show_progress_bar,
idx=i,
)
for i in range(self.n_jobs)
)
order = np.argsort([results[i][0] for i in range(len(results))])
return np.vstack([results[i][1] for i in order])
class oagbertModel:
"""
Trait vectorizer based on OAGBERT.
Examples:
-----------
data = [
{
"title": "Higgs boson: large hadron collider",
"author": "John Smith",
"journal": "Theory of Physics",
},
{
"concepts": "Theory",
},
{
"concepts": "Experimental",
},
]
model = TextVectorizerOAGBERT(
data_attrs=["title", "author", "journal", "concepts"],
)
emb = model.transform(data)
Parameters:
-----------
encoder_model_file: str, optional
Path to the encoder model file. If None, "oagbert-v2-sim" is used.
data_attrs: list of str or str, required
List of document attributes to use for vectorization.
Supported attributes are: "title", "abstract", "affiliation", "journal",
"concepts", "coauthors", and "author".
show_progress_bar: bool, optional
Whether to display a progress bar during vectorization. Defaults to False.
filename: str, optional
Name of the pre-trained model file to load. Ignored if encoder_model_file is provided.
device: str, optional
Device to use for vectorization. Defaults to "cpu".
Attributes:
-----------
attri_name_mapping: dict
Dictionary mapping supported data_attrs to corresponding attribute names.
dim: int
Dimensionality of the output vectors.
do_lower_case: dict
Dictionary indicating whether to lowercase each data attribute during tokenization.
encoder_model_file: str
Path to the encoder model file.
is_list_type_input: dict
Dictionary indicating whether each data attribute should be treated as a list of
strings (True) or a single string (False) during tokenization.
model: OAGBertForPreTraining
Pretrained OAGBERT model loaded from encoder_model_file.
show_progress_bar: bool
Whether to display a progress bar during vectorization.
tokenizer: transformers.PreTrainedTokenizerFast
Tokenizer to use for encoding text inputs.
Returns:
--------
None
"""
def __init__(
self,
data_attrs=None,
show_progress_bar=False,
encoder_model_file=None,
device="cpu",
# n_jobs=1,
):
self.show_progress_bar = show_progress_bar
self.data_attrs = data_attrs
self.device = device
self.dim = 768
self.attri_name_mapping = {
"title": "title",
"abstract": "abstract",
"affiliation": "affiliations",
"journal": "venue",
"concepts": "concepts",
"coauthors": "authors",
"author": "authors",
}
self.is_list_type_input = {
"title": False,
"abstract": False,
"affiliation": True,
"journal": False,
"concepts": True,
"coauthors": True,
"author": True,
}
self.do_lower_case = {
"title": False,
"abstract": False,
"affiliations": True,
"venue": True,
"concepts": True,
"authors": False,
}
self.tokenizer, self.model = oagbert(
"oagbert-v2-sim" if encoder_model_file is None else encoder_model_file
)
self.model.eval()
self.model.to(self.device)
def save(self, filename):
pass
def load(self, filename, device=None):
pass
def transform(self, data, concatenate=False):
"""
Transform the input data to a set of normalized feature vectors grouped by attributes.
This function will retrieve the traits in data specified by the keys.
Then, for each key, it will generate an embedding vector. Once the embedding for all keys
are generated, they are concatenated into one single vector as a vector representation
of the given data ('if concatenate = False'. Otherwise return a dict containing the subvectors).
Parameters:
-----------
data : list of dict
The input data where each element is a dictionary of attributes and their values.
concatenate : bool (default = False)
Whether to concatenate all subvectors into one vector or return a dictionary of subvectors grouped by attributes.
Returns:
--------
subvecs : dict or ndarray
A dictionary of subvectors grouped by attributes, or the concatenated subvectors if `concatenate` is True.
"""
trait_vec = []
# for data in tqdm(dict_list, disable=not self.show_progress_bar):
for i, paper in tqdm(
enumerate(data), disable=not self.show_progress_bar, total=len(data)
):
trait_vec += [self._encode(paper)]
trait_vec = np.vstack(trait_vec)
return trait_vec
def fit(self, train_data_file, data_attrs, output_file, chunksize=100000):
pass
def _encode(self, _data):
"""
Encodes a list of dictionaries using a pre-trained BERT model.
Parameters
----------
dict_list : List[Dict]
A list of dictionaries representing input data for BERT model.
Attributes
----------
emb_list : List[np.ndarray]
A list of numpy arrays representing the encoded output embeddings.
Returns
-------
emb : np.ndarray
A two-dimensional numpy array containing all the encoded embeddings.
"""
#
# to_oag_format(_data):
#
_oagbert_input = {}
for k in self.data_attrs:
if k not in _data:
continue
v = _data[k]
if self.is_list_type_input[k] & (not isinstance(v, list)):
v = [v]
if len(v) == 0:
continue
_k = self.attri_name_mapping[k]
if _k in _oagbert_input:
_oagbert_input[_k] += v if isinstance(v, list) else "_" + v
else:
_oagbert_input[_k] = v
# Lower casing
for k, v in _oagbert_input.items():
if self.do_lower_case[k]:
_oagbert_input[k] = (
[d.lower() for d in v] if isinstance(v, list) else v.lower()
)
if len(_oagbert_input) == 0:
return np.zeros(self.dim)
(
input_ids,
input_masks,
token_type_ids,
masked_lm_labels,
position_ids,
position_ids_second,
masked_positions,
num_spans,
) = self.model.build_inputs(**_oagbert_input)
_, emb = self.model.bert.forward(
input_ids=torch.LongTensor(input_ids).unsqueeze(0).to(self.device),
token_type_ids=torch.LongTensor(token_type_ids)
.unsqueeze(0)
.to(self.device),
attention_mask=torch.LongTensor(input_masks).unsqueeze(0).to(self.device),
output_all_encoded_layers=False,
checkpoint_activations=False,
position_ids=torch.LongTensor(position_ids).unsqueeze(0).to(self.device),
position_ids_second=torch.LongTensor(position_ids_second)
.unsqueeze(0)
.to(self.device),
)
if self.device != "cpu":
emb = np.array(emb.cpu().detach().numpy()).reshape((1, -1))
else:
emb = np.array(emb.detach().numpy()).reshape((1, -1))
return emb.reshape((1, -1))
def keys(self):
return [self.get_key_name(attrs) for attrs in self.data_attrs]
def get_key_name(self, attr):
if isinstance(attr, list):
return "_".join(attr)
return attr
def get_vector_dims(self):
return {k: self.dim for k in self.keys()}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment