Created
June 16, 2024 20:48
-
-
Save Jong-Sig/201c087d974f33a2cc6e7653a122db39 to your computer and use it in GitHub Desktop.
Sentiment & Emotion Analysis Using LLM (CUDA)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pandas as pd | |
| import numpy as np | |
| import os | |
| import re | |
| from tqdm import tqdm | |
| from CreateSQL import * | |
| import torch | |
| from transformers import AutoModelForSequenceClassification | |
| from transformers import TFAutoModelForSequenceClassification | |
| from transformers import AutoTokenizer, AutoConfig | |
| from transformers import RobertaTokenizerFast, TFRobertaForSequenceClassification, pipeline | |
| from optimum.bettertransformer import BetterTransformer | |
| from scipy.special import softmax | |
| ################## | |
| # Use GPU | |
| ################## | |
| # Better to use device_map = 'auto' | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| ################ | |
| # Get comments | |
| ################ | |
| def comments(database, table): | |
| # Create query | |
| query = f""" | |
| SELECT FileID, PostYear, PostMonth, CommenterID, CommentText | |
| FROM {table} | |
| WHERE CommenterID IS NOT NULL; | |
| """ | |
| # Create connection to MySQL | |
| connection = create_engine(db = database) | |
| # Import dataframe | |
| dataframe = pd.read_sql(query, | |
| connection) | |
| # Disconnect from MySQL | |
| connection.dispose() | |
| return dataframe | |
| ################## | |
| # Preprocess texts | |
| ################## | |
| # Preprocess texts (username and link placeholders) | |
| def preprocess(text): | |
| new_text = [] | |
| for t in text.split(" "): | |
| t = '@user' if t.startswith('@') and len(t) > 1 else t | |
| t = 'http' if t.startswith('http') else t | |
| new_text.append(t) | |
| return " ".join(new_text) | |
| #################### | |
| # Sentiment analysis | |
| #################### | |
| # Source: https://huggingface.co/cardiffnlp/twitter-roberta-base-sentiment-latest | |
| ## We used Roberta-based models as Roberta models can tokenize emojis | |
| # # Get model card | |
| MODEL = f"cardiffnlp/twitter-roberta-base-sentiment-latest" | |
| # Get tokenizer from the model card | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL, | |
| device_map = 'auto') | |
| config = AutoConfig.from_pretrained(MODEL, | |
| device_map = 'auto') | |
| # Get the pretrained model from the model card | |
| model = AutoModelForSequenceClassification.from_pretrained(MODEL, | |
| device_map = 'auto') | |
| # # Set model on GPU | |
| # model = model.to(device) | |
| # # Convert to better transformer | |
| model = BetterTransformer.transform(model) | |
| # run sentiment analysis to get scores | |
| def sentiment(text): | |
| # Preprocess text | |
| text = preprocess(text) | |
| # model2.config.max_position_embeddings - truncation issue is detected. max length = 514 | |
| # tokenize text | |
| input = tokenizer(text, return_tensors = 'pt', | |
| truncation = True, | |
| max_length = 512).to(device) | |
| # get output | |
| # output = model.to(device)(**input) | |
| output = model(**input) | |
| # get sentiment scores | |
| scores = output[0][0].detach().cpu().numpy() | |
| scores = softmax(scores) | |
| # print labels and scores | |
| labels = config.id2label | |
| return labels, scores | |
| ################## | |
| # Emotion analysis | |
| ################## | |
| # Source: https://huggingface.co/SamLowe/roberta-base-go_emotions | |
| ## We used Roberta-based models as Roberta models can tokenize emojis | |
| # Get model card | |
| MODEL2 = f"SamLowe/roberta-base-go_emotions" | |
| # Get tokenizer from the model card | |
| tokenizer2 = AutoTokenizer.from_pretrained(MODEL2, | |
| device_map = 'auto') | |
| config2 = AutoConfig.from_pretrained(MODEL2, | |
| device_map = 'auto') | |
| # Get the pretrained model from the model card | |
| model2 = AutoModelForSequenceClassification.from_pretrained(MODEL2, | |
| device_map = 'auto') | |
| # # Set model on GPU | |
| # model2 = model2.to(device) | |
| # # Convert to better transformer | |
| model2 = BetterTransformer.transform(model2) | |
| # run model to get scores | |
| def emotion(text): | |
| # Preprocess text | |
| text = preprocess(text) | |
| # tokenize text | |
| input = tokenizer2(text, return_tensors = 'pt', | |
| truncation=True, | |
| max_length = 512).to(device) | |
| # get output | |
| # output = model2.to(device)(**input) | |
| output = model2(**input) | |
| # get sentiment scores | |
| scores = output[0][0].detach().cpu().numpy() | |
| scores = softmax(scores) | |
| # print labels and scores | |
| labels = config2.id2label | |
| return labels, scores | |
| def analysis(dataframe, text_column, model): | |
| if model == 'sentiment': | |
| # get column names for sentiment analysis | |
| dataframe.loc[:, 'CommentSentNeg'] = None | |
| dataframe.loc[:, 'CommentSentNeut'] = None | |
| dataframe.loc[:, 'CommentSentPos'] = None | |
| # Create array from the column | |
| texts = dataframe[text_column].tolist() | |
| # run sentiment analysis | |
| results = [sentiment(ele)[1] for ele in tqdm(texts)] | |
| dataframe.loc[:, ['CommentSentNeg', 'CommentSentNeut', 'CommentSentPos']] = results | |
| if model == 'emotion': | |
| # get column names for emotion analysis | |
| cols = list(config2.id2label.values()) | |
| cols = ['Comment' + x.capitalize() for x in cols] | |
| dataframe.loc[:, cols] = None | |
| # Create array from the column | |
| texts = dataframe[text_column].tolist() | |
| # run emotion analysis | |
| results = [emotion(ele)[1] for ele in tqdm(texts)] | |
| dataframe.loc[:, cols] = results | |
| return dataframe | |
| # Create datachunk (when dataframe is too large to process) | |
| def chunk(df: pd.DataFrame, chunk_size: int): | |
| start = 0 | |
| length = df.shape[0] | |
| # If DF is smaller than the chunk, return the DF | |
| if length <= chunk_size: | |
| yield df[:] | |
| return | |
| # Yield individual chunks | |
| while start + chunk_size <= length: | |
| yield df[start:chunk_size + start] | |
| start = start + chunk_size | |
| # Yield the remainder chunk, if needed | |
| if start < length: | |
| yield df[start:] | |
| if __name__ == '__main__': | |
| ################ | |
| # Get comments | |
| ################ | |
| # Get the list of tables | |
| tables = ['organicpost1_comments_2018_04', | |
| 'organicpost1_comments_rest', | |
| 'organicpost2_comments_2018_04', | |
| 'organicpost2_comments_rest'] | |
| ################ | |
| # Run analysis | |
| ################ | |
| # process by database | |
| for table in tables: | |
| # Import table as dataframe | |
| df_cmts = comments('organic', table) | |
| # # Test with 1000 files per table | |
| # df_cmts = df_cmts.head(1000) | |
| # get counter | |
| i = 1 | |
| # Process by chunk | |
| for df in chunk(df_cmts, 10000000): # process by 10M chunks | |
| # Calculate the number of comments to be processed | |
| total_file = len(df) | |
| tqdm.write(f'{table} Summary: \n {total_file} comments to be processed.') | |
| # Specify the model to run | |
| analysis_model = 'sentiment' | |
| # run analysis by database | |
| df = analysis(df, 'CommentText', analysis_model) | |
| # save chunk as csv | |
| df.to_parquet(f'sentiment/{analysis_model}_{table}_{i}.parquet.gzip', | |
| compression = 'gzip', | |
| index = False) | |
| # count | |
| i += 1 | |
| del df |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment