Skip to content

Instantly share code, notes, and snippets.

@ji1kang
Created March 2, 2023 05:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ji1kang/6cfa1810d754c3087ef3ed4b95ee6c27 to your computer and use it in GitHub Desktop.
Save ji1kang/6cfa1810d754c3087ef3ed4b95ee6c27 to your computer and use it in GitHub Desktop.
HuggingFace VisualBERT Demo with DataLoader
import os
import numpy as np
import pandas as pd
# torch
import torch
from torch.utils.data import Dataset
from transformers import BertTokenizer
# ours
from helper import label_cols # preprocessing labels
# visualbert - you need to download huggingface visualbert codes
# https://github.com/huggingface/transformers/tree/main/examples/research_projects/visual_bert
from utils import Config
from modeling_frcnn import GeneralizedRCNN
from processing_image import Preprocess
class ImageProcessor:
def __init__(self, device='cuda'):
frcnn_cfg = Config.from_pretrained("unc-nlp/frcnn-vg-finetuned")
frcnn_cfg.MODEL.DEVICE = device
self.device = device
self.frcnn = GeneralizedRCNN.from_pretrained("unc-nlp/frcnn-vg-finetuned", config=frcnn_cfg)
self.frcnn_cfg = frcnn_cfg
self.image_preprocess = Preprocess(frcnn_cfg)
def get_visual_embeddings(self, image_path):
# run frcnn
images, sizes, scales_yx = self.image_preprocess(image_path)
output_dict = self.frcnn(
images,
sizes,
scales_yx=scales_yx,
padding="max_detections",
max_detections=self.frcnn_cfg.max_detections,
return_tensors="pt",
)
features = output_dict.get("roi_features").detach().cpu()
return features
class TrainDataset(Dataset):
def __init__(self, base_path, data_path, max_len, device='cuda:1'):
"""Dataset for training steps
Args:
base_path: the directory for data
data_path: a .csv file with columns as follows:
['path', 'preprocessed_title', 'preprocessed_text', 'majority_vote']
word2idx: a dictionary where a key is a word and a value is an index word in word embedding
max_len: maximum length of texts
"""
self.data_path = data_path
self.data = pd.read_csv(data_path).values
self.base_path = base_path
self.max_len = max_len
# feature extractor
self.tokenizer = BertTokenizer.from_pretrained(
"bert-base-uncased", cache_dir="cache")
self.visual_extractor = ImageProcessor(device=device)
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
"""
Args:
idx: the index of image
Return:
training data
"""
# load meta data (customize this code for your dataset)
file_path, title_text, ocr_text, label = self.data[idx]
# load image
image_path = os.path.join(self.base_path, file_path)
visual_embeds = self.visual_extractor.get_visual_embeddings(image_path)
visual_token_type_ids = torch.ones(
visual_embeds.shape[:-1], dtype=torch.long)
visual_attention_mask = torch.ones(
visual_embeds.shape[:-1], dtype=torch.float)
# load text (customize this code for your dataset)
title_text, title_text_length = self.preprocess_text(title_text)
ocr_text, ocr_text_length = self.preprocess_text(ocr_text)
text = f'title: {title_text} body: {ocr_text}'
inputs = self.tokenizer(text, return_tensors="pt")
input_ids = inputs["input_ids"]
token_type_ids = inputs["token_type_ids"]
attention_mask = inputs["attention_mask"]
# load label (customize this code for your dataset)
label_index = label_cols.index(label)
label_index = torch.tensor(label_index).long()
return (input_ids, token_type_ids, attention_mask,
visual_embeds, visual_token_type_ids, visual_attention_mask,
label_index)
def preprocess_text(self, text):
"""Get word vectors (customize this function for your dataset)
Args:
text: a string of text
"""
text_1 = str(text)
text_1 = [x.lower() for x in text_1.split()]
idx = np.zeros(self.max_len)
start = 0
for i in text_1:
try:
index = self.word2idx[i]
idx[start] = index
start += 1
if start == self.max_len:
break
except:
continue
return idx, start
# run visualbert with the train dataloader
seed = 2022
max_len = 100
image_path = 'your-path'
train_path = 'your-path'
train_path = train_path + f'your-path.csv'
train_dataset = TrainDataset(
base_path=image_path,
data_path=train_path,
max_len=max_len)
batch_size = 32
train_dataloader = DataLoader(
train_dataset, batch_size=batch_size, shuffle=True)
for data in train_dataloader:
(input_ids, token_type_ids, attention_mask,
visual_embeds, visual_token_type_ids, visual_attention_mask,
train_labels) = data
outputs = model(**inputs)
@guanhdrmq
Copy link

HI Jiwon, How are you? I am trying to reproduce huggingface visualbert but I have lower validation accuracy. here is my code
import os

from attacks import fgsm

os.environ["CUDA_VISIBLE_DEVICES"] = "0,1"
os.environ["CURL_CA_BUNDLE"] = ""

import torch

torch.cuda.empty_cache()
from torch import nn, optim
from torch.utils.data import DataLoader
from torch.utils.data.dataset import Dataset

print("Use GPU", torch.cuda.is_available()) # true 查看GPU是否可用
print("GPU:", torch.cuda.device_count()) # GPU数量
torch.cuda.current_device() # 当前GPU的索引, 0
torch.cuda.get_device_name(0) # 输出GPU名称
torch.cuda.memory_summary(device=None, abbreviated=False)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

https://huggingface.co/models?other=visual_bert

======================VQA read data==========================================

===========================================================================================

Read questions

import json

Checking path and Opening JSON file,

f_read_questions = open(
"./multimodal_data/vqa2/v2_OpenEnded_mscoco_val2014_questions.json"
)

Return JSON object as dictionary

data_questions = json.load(f_read_questions)
print(data_questions.keys())

questions = data_questions["questions"]
print("Number of questions:", len(questions))

from os import listdir
from os.path import isfile, join

root at which all images are stored

root = "./multimodal_data/vqa2/val2014"
file_names = [f for f in listdir(root) if isfile(join(root, f))]

import re
from typing import Optional

filename_re = re.compile(r".*(\d{12}).((jpg)|(png))")

source: https://github.com/allenai/allennlp-models/blob/a36aed540e605c4293c25f73d6674071ca9edfc3/allennlp_models/vision/dataset_readers/vqav2.py#L141

def id_from_filename(filename: str) -> Optional[int]:
match = filename_re.fullmatch(filename)
if match is None:
return None
return int(match.group(1))

filename_to_id = {root + "/" + file: id_from_filename(file) for file in file_names}
id_to_filename = {v: k for k, v in filename_to_id.items()}

from os import listdir
from os.path import isfile, join

root at which all images are stored

root = "./multimodal_data/vqa2/val2014"
file_names = [f for f in listdir(root) if isfile(join(root, f))]

Read annotations

f_read_annotations = open("./multimodal_data/vqa2/v2_mscoco_val2014_annotations.json")

Return JSON object as dictionary

data_annotations = json.load(f_read_annotations)
print(data_annotations.keys())

show answers

annotations = data_annotations["annotations"]
print("Number of annotations:", len(annotations))

from transformers import VisualBertConfig

config = VisualBertConfig.from_pretrained(
"./pretrained/visualBERT/uclanlp-visualbert-vqa"
)

from tqdm.notebook import tqdm

def get_score(count: int) -> float:
return min(1.0, count / 3)

for annotation in tqdm(annotations):
answers = annotation["answers"]
answer_count = {}
for answer in answers:
answer_ = answer["answer"]
answer_count[answer_] = answer_count.get(answer_, 0) + 1
labels = []
scores = []
for answer in answer_count:
if answer not in list(config.label2id.keys()):
continue
labels.append(config.label2id[answer])
score = get_score(answer_count[answer])
scores.append(score)
annotation["labels"] = labels
annotation["scores"] = scores

===================================================================

class VQADataset(torch.utils.data.Dataset):
"""VQA (v2) dataset."""

def __init__(
    self, questions, annotations, tokenizer, image_preprocess, frcnn, frcnn_cfg
):
    self.questions = questions
    self.annotations = annotations
    self.tokenizer = tokenizer
    self.image_preprocess = image_preprocess
    self.frcnn = frcnn
    self.frcnn_cfg = frcnn_cfg

def __len__(self):
    return len(self.annotations)

def __getitem__(self, idx):
    # answer
    annotation = self.annotations[idx]
    #  question
    questions = self.questions[idx]
    image_path = id_to_filename[annotation["image_id"]]
    image_path = image_path.replace("./multimodal_data/vqa2/val2014/.", "", 1)
    text = questions["question"]

    print("question", text)

    images, sizes, scales_yx = self.image_preprocess(image_path)
    output_dict = self.frcnn(
        images,
        sizes,
        scales_yx=scales_yx,
        padding="max_detections",
        max_detections=self.frcnn_cfg.max_detections,
        return_tensors="pt",
    )

    # Very important that the boxes are normalized
    feature = output_dict.get("roi_features")
    normalized_boxes = output_dict.get("normalized_boxes")

    inputs = self.tokenizer(
        text,
        padding="max_length",
        max_length=25,
        truncation=True,
        return_token_type_ids=True,
        return_attention_mask=True,
        add_special_tokens=True,
        return_tensors="pt",
    )

    inputs.update(
        {
            "visual_embeds": feature,
            "visual_attention_mask": torch.ones(
                feature.shape[:-1], dtype=torch.float
            ),
            "visual_token_type_ids": torch.ones(
                feature.shape[:-1], dtype=torch.long
            ),
            # "output_attentions": False
        }
    )

    # remove batch dimension
    for k, v in inputs.items():
        if isinstance(v, torch.Tensor):
            inputs[k] = v.squeeze()

    # add labels
    labels = annotation["labels"]
    # print("label candidate:", labels)
    # scores = torch.tensor(annotation['scores'])
    scores = annotation["scores"]

    targets = torch.zeros(len(config.id2label), dtype=torch.float)
    for label, score in zip(labels, scores):
        # print(f"Setting target at index {label} to {score}")
        targets[label] = score
    inputs["labels"] = targets

    inputs["text"] = text
    return inputs

if name == "main":
# image iput
from visualbert.processing_image import Preprocess
from visualbert.visualizing_image import SingleImageViz
from visualbert.modeling_frcnn import GeneralizedRCNN
from visualbert.utils import Config

frcnn_cfg = Config.from_pretrained("unc-nlp/frcnn-vg-finetuned")
frcnn = GeneralizedRCNN.from_pretrained(
    "unc-nlp/frcnn-vg-finetuned", config=frcnn_cfg
)
image_preprocess = Preprocess(frcnn_cfg)

# text input
from transformers import (
    VisualBertForQuestionAnswering,
    AutoTokenizer,
    BertTokenizerFast,
)

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

model = VisualBertForQuestionAnswering.from_pretrained(
    "uclanlp/visualbert-vqa",
    num_labels=len(config.id2label),
    id2label=config.id2label,
    label2id=config.label2id,
    output_hidden_states=True,
)

# if cfg.use_multi_gpu:
# model = nn.DataParallel(model)
# model = model.to(device=device)
model.to(device)
model.eval()

dataset = VQADataset(
    questions=questions[:100],
    annotations=annotations[:100],
    tokenizer=tokenizer,
    image_preprocess=image_preprocess,
    frcnn=frcnn,
    frcnn_cfg=frcnn_cfg,
)

test_dataloader = DataLoader(dataset, batch_size=1, shuffle=False)
correct = 0.0
# 计数器
total = 0

# loss_function = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.classifier.parameters(), lr=0.001)

# from visualbert import utils
# VQA_URL = "https://dl.fbaipublicfiles.com/pythia/data/answers_vqa.txt"
# vqa_answers = utils.get_data(VQA_URL)

for batch in tqdm(test_dataloader):
    batch_text = batch.copy()
    if "text" in batch:
        del batch["text"]

    batch = {k: v.to(device) for k, v in batch.items()

    outputs = model(**batch)
    logits = outputs.logits  # [batch_size, 3129]
    _, pre = torch.max(logits, 1)
    _, target = torch.max(batch["labels"], 1)

    print("prediction:", pre)
    print("target:", target)

    # print("prediction from VisualBert VQA:", vqa_answers[pre])
    # print("Predicted answer:", model.config.id2label[pre.item()])
    # TODO label not right
    # print("Target answer:", model.config.id2label[target.item()])

    correct += (pre == target).sum()
    total = total + 1
    print(total)
    print("==============================================================")

final_acc = correct / float(len(test_dataloader.dataset))
print("Accuracy of test: %f %%" % (100 * float(final_acc)))

The only thing I have changed is label2id and id2label in config.json. I copied them from lxmert demo. Do you have opinion about this problem? Much appreciate it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment