Created
June 16, 2024 22:41
-
-
Save Jong-Sig/78b8faeaa305dd59871e104372dc71ea to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Basic lib | |
| import pandas as pd | |
| import numpy as np | |
| import os | |
| import re | |
| import glob | |
| import csv | |
| import json | |
| from tqdm import tqdm | |
| from ast import literal_eval | |
| from lingua import Language, LanguageDetectorBuilder | |
| from CreateSQL import * | |
| # ChatGPT | |
| import openai | |
| import requests | |
| import time | |
| import docx | |
| from tenacity import ( | |
| retry, | |
| stop_after_attempt, | |
| wait_random_exponential, | |
| ) # for exponential backoff | |
| ############################ | |
| # Set text-davinci-003 model | |
| ############################ | |
| # Add retry behavior to avoid rate limit error | |
| @retry(wait = wait_random_exponential(min = 1, max = 60), stop = stop_after_attempt(6)) | |
| def davinci(texts, request_limit = 3000): | |
| # calculate the delay based on rate limit | |
| limit_per_min = request_limit | |
| delay = 60.0 / limit_per_min | |
| # sleep for the delay | |
| time.sleep(delay) | |
| # Create a list to store the prompts | |
| prompts = [] | |
| # create prompts from the texts | |
| for text in texts: | |
| prompt = f""" | |
| Title: Brand Identification in Advertising Text | |
| Text: | |
| You are a language model that excels at understanding advertising texts and identifying the brands being endorsed. | |
| Please read the following advertising text and answer the questions that follow step-by-step: | |
| {text} | |
| Questions: | |
| 1. Identify the name of the brand(s) being endorsed in the given advertising text, as well as the brand's associated ID (starting with @) mentioned in the text. Note that the brand's associated ID does not start with #. | |
| 2. Provide a list the name of the brand(s) identified in the first question, formatted as a single list: [] | |
| 3. Provide a list of the associated IDs (e.g., @mentions) identified in the first question, formatted as a single list: [] | |
| """ | |
| prompts.append(prompt) | |
| # call the completion API | |
| response = openai.Completion.create( | |
| model="text-davinci-003", | |
| # set parameters | |
| temperature = 0.5, | |
| max_tokens = 512, | |
| top_p=1, | |
| frequency_penalty=0, | |
| presence_penalty=0, | |
| prompt = prompts | |
| ) | |
| # Extracting the model's reply from the API response | |
| model_reply = [x['text'] for x in response['choices']] | |
| return model_reply | |
| ############################## | |
| # Format output from the model | |
| ############################## | |
| def davinci_format(response): | |
| # get first answer | |
| a1 = re.split(r'2(?:.|:)', | |
| re.split(r'1(?:.|:)', response)[1])[0].strip() | |
| # get second answer | |
| try: | |
| a2 = re.split(r'3(?:.|:)', | |
| re.split(r'2(?:.|:)', response)[1])[0].strip() | |
| except: # three observations are missing with a2 (less than 0.1% of observations) | |
| a2 = None | |
| # get third answer | |
| try: | |
| a3 = re.split(r'3(?:.|:)', response)[1].strip() | |
| except: #four observations are missing with a3 (less than 0.1% of observations) | |
| a3 = None | |
| return a1, a2, a3 | |
| if __name__ == "__main__": | |
| ######################## | |
| """ | |
| Creating direct AD post samples | |
| Cleaning procedure: | |
| 1. English posts | |
| Confine to English posts | |
| 2. Official & Unofficial (need to adjust this criteria later) | |
| Confine to advertisement posts, which are either official or unofficial | |
| 3. Confine to direct posts | |
| """ | |
| ######################## | |
| # Connect to MySQL - DB: Instagram | |
| connection = create_engine(db = 'Instagram') | |
| # Get all the table names in the DB | |
| tables = """ | |
| SHOW tables; | |
| """ | |
| table_names = pd.read_sql(tables, connection) | |
| # Extract advertisements table to in python | |
| table_name = 'advertisements' | |
| query = f""" | |
| SELECT * FROM {table_name} | |
| """ | |
| df_ad = pd.read_sql(query, connection) | |
| ############### | |
| # English posts | |
| ############### | |
| df_ad = df_ad.loc[df_ad['EnglishTextDum'] == 1,:].\ | |
| copy().\ | |
| reset_index(drop = True) | |
| ###################################### | |
| # Either official or unofficial posts | |
| ###################################### | |
| df_ad = df_ad.loc[(df_ad['OfficialAD'] == 1) | (df_ad['UnofficialAD'] == 1)].\ | |
| copy().\ | |
| reset_index(drop = True) | |
| ###################################### | |
| # brand's direct posts | |
| ###################################### | |
| df_ad = df_ad.loc[(df_ad['PostDirect2'] == 1)].\ | |
| copy().\ | |
| reset_index(drop = True) | |
| # Identifying direct posts using poster's ID == mentioned brand's ID is incomplete | |
| # This may be further refined by checking if poster's ID == mentioned sponsor's ID | |
| # 'PostSpon' == 'PosterID' | |
| df_ad['@' + df_ad['PostSpon'] == df_ad['PosterID']] | |
| # None of the samples meet the criteria. Thus, remove all the direct posts classified as official posts | |
| df_ad = df_ad.loc[~((df_ad['PostDirect2'] == 1) & (df_ad['OfficialAD'] == 1))].\ | |
| copy().\ | |
| reset_index(drop = True) | |
| ############################ | |
| # Run text-davinci-003 model | |
| ############################ | |
| # set OpenAI API key | |
| openai.api_key = 'APIKEY' # Replace with your OpenAI API key | |
| # create columns to store LLM results | |
| df_ad['LLMResult'] = None | |
| # run batched prompts to speed up the analysis | |
| for batch_number, batch_df in tqdm(df_ad.groupby(np.arange(len(df_ad)) // 10), | |
| total = len(df_ad.groupby(np.arange(len(df_ad)) // 10))): | |
| # get the texts of batched dataframe | |
| texts = batch_df['PostText'].tolist() | |
| # get the index of batched dataframe | |
| index = batch_df.index.values.tolist() | |
| # get the result from LLM | |
| response = davinci(texts) | |
| # append the results to the original dataframe | |
| for x, i in zip(response, index): | |
| df_ad.loc[i, 'LLMResult'] = x | |
| ############################## | |
| # Run variables from LLMResult | |
| ############################## | |
| # Get A1, A2, A3 from LLMResult | |
| df_ad['A1'] = None | |
| df_ad['A2'] = None | |
| df_ad['A3'] = None | |
| df_ad[['A1', 'A2', 'A3']] = df_ad.apply(lambda x: davinci_format(x['LLMResult']), | |
| axis = 1, | |
| result_type = 'expand') | |
| # Store results | |
| connection = create_engine(db = 'Instagram') | |
| df_ad.to_sql(con = connection, | |
| name = 'llmcoding', # Don't forget to replace table name! | |
| if_exists = 'replace', | |
| chunksize = 50000, | |
| index = False) | |
| df_ad.to_csv('llm_result.csv', index = False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment