Skip to content

Instantly share code, notes, and snippets.

@Jong-Sig
Created June 16, 2024 20:48
Show Gist options
  • Select an option

  • Save Jong-Sig/201c087d974f33a2cc6e7653a122db39 to your computer and use it in GitHub Desktop.

Select an option

Save Jong-Sig/201c087d974f33a2cc6e7653a122db39 to your computer and use it in GitHub Desktop.
Sentiment & Emotion Analysis Using LLM (CUDA)
import pandas as pd
import numpy as np
import os
import re
from tqdm import tqdm
from CreateSQL import *
import torch
from transformers import AutoModelForSequenceClassification
from transformers import TFAutoModelForSequenceClassification
from transformers import AutoTokenizer, AutoConfig
from transformers import RobertaTokenizerFast, TFRobertaForSequenceClassification, pipeline
from optimum.bettertransformer import BetterTransformer
from scipy.special import softmax
##################
# Use GPU
##################
# Better to use device_map = 'auto'
device = "cuda" if torch.cuda.is_available() else "cpu"
################
# Get comments
################
def comments(database, table):
# Create query
query = f"""
SELECT FileID, PostYear, PostMonth, CommenterID, CommentText
FROM {table}
WHERE CommenterID IS NOT NULL;
"""
# Create connection to MySQL
connection = create_engine(db = database)
# Import dataframe
dataframe = pd.read_sql(query,
connection)
# Disconnect from MySQL
connection.dispose()
return dataframe
##################
# Preprocess texts
##################
# Preprocess texts (username and link placeholders)
def preprocess(text):
new_text = []
for t in text.split(" "):
t = '@user' if t.startswith('@') and len(t) > 1 else t
t = 'http' if t.startswith('http') else t
new_text.append(t)
return " ".join(new_text)
####################
# Sentiment analysis
####################
# Source: https://huggingface.co/cardiffnlp/twitter-roberta-base-sentiment-latest
## We used Roberta-based models as Roberta models can tokenize emojis
# # Get model card
MODEL = f"cardiffnlp/twitter-roberta-base-sentiment-latest"
# Get tokenizer from the model card
tokenizer = AutoTokenizer.from_pretrained(MODEL,
device_map = 'auto')
config = AutoConfig.from_pretrained(MODEL,
device_map = 'auto')
# Get the pretrained model from the model card
model = AutoModelForSequenceClassification.from_pretrained(MODEL,
device_map = 'auto')
# # Set model on GPU
# model = model.to(device)
# # Convert to better transformer
model = BetterTransformer.transform(model)
# run sentiment analysis to get scores
def sentiment(text):
# Preprocess text
text = preprocess(text)
# model2.config.max_position_embeddings - truncation issue is detected. max length = 514
# tokenize text
input = tokenizer(text, return_tensors = 'pt',
truncation = True,
max_length = 512).to(device)
# get output
# output = model.to(device)(**input)
output = model(**input)
# get sentiment scores
scores = output[0][0].detach().cpu().numpy()
scores = softmax(scores)
# print labels and scores
labels = config.id2label
return labels, scores
##################
# Emotion analysis
##################
# Source: https://huggingface.co/SamLowe/roberta-base-go_emotions
## We used Roberta-based models as Roberta models can tokenize emojis
# Get model card
MODEL2 = f"SamLowe/roberta-base-go_emotions"
# Get tokenizer from the model card
tokenizer2 = AutoTokenizer.from_pretrained(MODEL2,
device_map = 'auto')
config2 = AutoConfig.from_pretrained(MODEL2,
device_map = 'auto')
# Get the pretrained model from the model card
model2 = AutoModelForSequenceClassification.from_pretrained(MODEL2,
device_map = 'auto')
# # Set model on GPU
# model2 = model2.to(device)
# # Convert to better transformer
model2 = BetterTransformer.transform(model2)
# run model to get scores
def emotion(text):
# Preprocess text
text = preprocess(text)
# tokenize text
input = tokenizer2(text, return_tensors = 'pt',
truncation=True,
max_length = 512).to(device)
# get output
# output = model2.to(device)(**input)
output = model2(**input)
# get sentiment scores
scores = output[0][0].detach().cpu().numpy()
scores = softmax(scores)
# print labels and scores
labels = config2.id2label
return labels, scores
def analysis(dataframe, text_column, model):
if model == 'sentiment':
# get column names for sentiment analysis
dataframe.loc[:, 'CommentSentNeg'] = None
dataframe.loc[:, 'CommentSentNeut'] = None
dataframe.loc[:, 'CommentSentPos'] = None
# Create array from the column
texts = dataframe[text_column].tolist()
# run sentiment analysis
results = [sentiment(ele)[1] for ele in tqdm(texts)]
dataframe.loc[:, ['CommentSentNeg', 'CommentSentNeut', 'CommentSentPos']] = results
if model == 'emotion':
# get column names for emotion analysis
cols = list(config2.id2label.values())
cols = ['Comment' + x.capitalize() for x in cols]
dataframe.loc[:, cols] = None
# Create array from the column
texts = dataframe[text_column].tolist()
# run emotion analysis
results = [emotion(ele)[1] for ele in tqdm(texts)]
dataframe.loc[:, cols] = results
return dataframe
# Create datachunk (when dataframe is too large to process)
def chunk(df: pd.DataFrame, chunk_size: int):
start = 0
length = df.shape[0]
# If DF is smaller than the chunk, return the DF
if length <= chunk_size:
yield df[:]
return
# Yield individual chunks
while start + chunk_size <= length:
yield df[start:chunk_size + start]
start = start + chunk_size
# Yield the remainder chunk, if needed
if start < length:
yield df[start:]
if __name__ == '__main__':
################
# Get comments
################
# Get the list of tables
tables = ['organicpost1_comments_2018_04',
'organicpost1_comments_rest',
'organicpost2_comments_2018_04',
'organicpost2_comments_rest']
################
# Run analysis
################
# process by database
for table in tables:
# Import table as dataframe
df_cmts = comments('organic', table)
# # Test with 1000 files per table
# df_cmts = df_cmts.head(1000)
# get counter
i = 1
# Process by chunk
for df in chunk(df_cmts, 10000000): # process by 10M chunks
# Calculate the number of comments to be processed
total_file = len(df)
tqdm.write(f'{table} Summary: \n {total_file} comments to be processed.')
# Specify the model to run
analysis_model = 'sentiment'
# run analysis by database
df = analysis(df, 'CommentText', analysis_model)
# save chunk as csv
df.to_parquet(f'sentiment/{analysis_model}_{table}_{i}.parquet.gzip',
compression = 'gzip',
index = False)
# count
i += 1
del df
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment