Last active
June 16, 2024 21:03
-
-
Save Jong-Sig/4ed447f8b1b5353835d83c7b66fd8319 to your computer and use it in GitHub Desktop.
Instagram Comment Metadata Parser
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import glob | |
import json | |
import os | |
import re | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from datetime import datetime | |
import time | |
import numpy as np | |
import math | |
import pandas as pd | |
import polars as pl | |
from tqdm import tqdm | |
from CreateSQL import * | |
from sqlalchemy.orm import sessionmaker | |
def comment_parser(data): | |
# extract comment relevant information from parsed JSON file | |
# can users comment on the post? | |
try: | |
comment_disabled = data['comments_disabled'] | |
except: | |
comment_disabled = None | |
# get comment json | |
try: | |
cmts_json = data['edge_media_to_comment']['edges'] | |
except KeyError: | |
cmts_json = data['edge_media_to_parent_comment']['edges'] | |
# normalize json data into dataframe | |
df_cmts = pd.json_normalize(cmts_json) | |
# There might be a case where dataframe is empty check if the dataframe is empty | |
if df_cmts.empty: | |
df_cmts.loc[0, 'CommentExist'] = False | |
else: | |
df_cmts['CommentExist'] = True | |
# append comment_diabled as a new column | |
df_cmts['CommentClosed'] = comment_disabled | |
return df_cmts | |
def data_merge(path, files, pbar): | |
# specify the directory | |
zip_path = path | |
# Create dataframe to save result | |
df_sub = pl.DataFrame() | |
# create JSON parser | |
for file in files: | |
try: | |
with open(zip_path + '/' + file, 'r', encoding = 'utf-8') as f: | |
data = json.loads(f.read()) | |
result = comment_parser(data) | |
# Add file ID as a column | |
result['FileID'] = file | |
# Convert pandas to polars to speed up | |
result = pl.from_pandas(result, | |
schema_overrides = {'node.created_at' : pl.Int64, | |
'node.did_report_as_spam' : pl.Boolean, | |
'node.edge_liked_by.count' : pl.Int64, | |
'node.id' : pl.Utf8, | |
'node.owner.id' : pl.Utf8, | |
'node.owner.is_verified': pl.Boolean, | |
'node.owner.profile_pic_url' : pl.Utf8, | |
'node.owner.username' : pl.Utf8, | |
'node.text' : pl.Utf8, | |
'node.viewer_has_liked': pl.Boolean, | |
'CommentExist' : pl.Boolean, | |
'CommentClosed' : pl.Boolean, | |
'FileID' : pl.Utf8, | |
'node.edge_threaded_comments.count' : pl.Int64, | |
'node.edge_threaded_comments.page_info.end_cursor' : pl.Utf8, | |
'node.edge_threaded_comments.page_info.has_next_page' : pl.Boolean, | |
'node.edge_threaded_comments.edges' : pl.List(pl.Utf8)}, | |
rechunk = True) | |
# Append a datachunk to a final dataframe | |
df_sub = pl.concat([df_sub, result], | |
how = 'diagonal', | |
rechunk = True) | |
except: | |
# there are some empty json files (e.g., academy-1704600271848757741.info) | |
tqdm.write(f'{file} raised an error (e.g., data not exists). Skip this file.') | |
# Update progress | |
pbar.update(1) | |
return df_sub | |
# Convert UNIX time to local time | |
def unix_date(unix): | |
if unix is not None: | |
res = datetime.fromtimestamp(unix) | |
else: | |
res = None | |
return res | |
def column_cleaning(dataframe): | |
# Specify column headers of dataframe | |
col_list = ['node.created_at', | |
'node.did_report_as_spam', | |
'node.edge_liked_by.count', | |
'node.id', | |
'node.owner.id', | |
'node.owner.is_verified', | |
'node.owner.profile_pic_url', | |
'node.owner.username', | |
'node.text', | |
'node.viewer_has_liked', | |
'CommentExist', | |
'CommentClosed', | |
'FileID' | |
] | |
new_col_list = ['CommentDate', | |
'CommentSpam', | |
'CommentLikeNo', | |
'CommentID', | |
'CommenterIDNumeric', | |
'CommenterVerified', | |
'CommenterProfilePic', | |
'CommenterID', | |
'CommentText', | |
'CommentLikedByViewer', | |
'CommentExist', | |
'CommentClosed', | |
'FileID' | |
] | |
# Rename columns | |
dataframe = dataframe.rename(dict(zip(col_list, new_col_list))) | |
# Convert UNIX time to local time | |
dataframe = dataframe.with_columns(pl.struct(['CommentDate']).apply(lambda x: unix_date(x['CommentDate']))) | |
dataframe = dataframe.with_columns(pl.col("CommentDate").dt.year().alias('CommentYear')) | |
dataframe = dataframe.with_columns(pl.col("CommentDate").dt.month().alias('CommentMonth')) | |
dataframe = dataframe.with_columns(pl.col("CommentDate").dt.day().alias('CommentDay')) | |
# Change the order of columns | |
dataframe = dataframe.select(['FileID', | |
'CommentClosed', | |
'CommentExist', | |
'CommenterID', | |
'CommenterIDNumeric', | |
'CommenterVerified', | |
'CommenterProfilePic', | |
'CommentID', | |
'CommentSpam', | |
'CommentDate', | |
'CommentYear', | |
'CommentMonth', | |
'CommentDay', | |
'CommentText', | |
'CommentLikeNo', | |
'CommentLikedByViewer' | |
]) | |
return dataframe | |
def main(path, file_list): | |
# specify the directory | |
tqdm.write('path finding...') | |
# list all files to parse | |
files = file_list | |
tqdm.write(f'total files: {len(files)}') | |
# Create dataframe to save the results | |
final_result = [] | |
dataframe = pl.DataFrame() | |
# Determine chunksize | |
n_workers = 50 | |
chunksize = round(len(files) / n_workers) | |
tqdm.write(f'chunk size: {chunksize}') | |
# Multithread using chunks | |
with tqdm(total = len(files), colour = 'GREEN') as pbar: | |
with ThreadPoolExecutor(n_workers) as exe: | |
# Split the copy operations into chunks | |
for i in range(0, len(files), chunksize): | |
# select a chunk of filenames | |
filenames = files[i:(i + chunksize)] | |
# submit the batch copy task | |
_ = exe.submit(data_merge, path, filenames, pbar) | |
# Append results from seperate thread into a single list file | |
final_result.append(_) | |
# Concat results from final_result | |
for f in as_completed(final_result): | |
dataframe = pl.concat([dataframe, f.result()], | |
how = 'diagonal', | |
rechunk = True) | |
# Cleaning the name and order of columns | |
dataframe = column_cleaning(dataframe) | |
# Summarize result | |
tqdm.write(f'Summary: \n # of JSON files: {len(files)}') | |
tqdm.write(f' # of JSON files w/ data: {dataframe.shape[0]}') | |
return dataframe | |
if __name__ == '__main__': | |
########################################## | |
# Get the list of files and identify paths | |
########################################## | |
# Identify the path where raw data exists | |
path = r'D:/Data/Influencer dataset/Post metadata/info' | |
# Identify the table to be processed | |
table = 'organicpost1' | |
# Get the list of file names that contain comments | |
## Connect to MySQL - DB: Instagram | |
connection = create_engine(db = 'organic') | |
## Get all the table names in the DB | |
tables = """ | |
SHOW tables; | |
""" | |
table_names = pd.read_sql(tables, connection) | |
## Get total number of posts to be processed | |
table_name = f'{table}_update' | |
query = f""" | |
SELECT COUNT(FileID) | |
FROM {table_name}; | |
""" | |
total_posts = pd.read_sql(query, connection) | |
total_posts = total_posts.iloc[0, 0].item() | |
## Extract posts table as dataframe | |
query = f""" | |
SELECT * | |
FROM {table_name}; | |
""" | |
## Extract influencer table as dataframe | |
df_inf = pd.read_csv(r'F:\Projects\Brokerage\Insta_Analysis_Ver2\data\dataframe\influencers\influencers_2018-04.csv') | |
## Chunk process from here to reduce memory burden | |
# Specify chunksize | |
chunksize = 200000 | |
total_chunk = math.ceil(total_posts / chunksize) | |
# Count chunk number | |
i = 1 | |
## Start processing | |
# start_time = time.time() | |
for df_chunk in pd.read_sql(query, connection, chunksize = chunksize): | |
connection.dispose() #close engine after finishing the task to avoid errors | |
tqdm.write(f'Processing {i} / {total_chunk} chunk.') | |
## Confine to influencer samples which has posts from at least 2018-04 | |
df_chunk = df_chunk.\ | |
loc[df_chunk['PosterID'].\ | |
isin(df_inf['PosterID'])].\ | |
reset_index(drop = True).copy() | |
## Get the FileID list where comments are identifiable | |
file_list = df_chunk.loc[df_chunk['PostCommentText'] == 1, 'FileID'].values.tolist() | |
############## | |
# Run function | |
############## | |
# Run main function to get dataframe | |
df_fin = main(path, file_list) | |
# Append post date to dataframe -> Do this later. | |
df_chunk = pl.from_pandas(df_chunk.loc[:, ['FileID', 'PostDate', 'PostYear', 'PostMonth', 'PostDay']]) | |
df_fin = df_fin.join(df_chunk, | |
on = 'FileID', | |
how = 'left') | |
# convert to pandas dataframe to use to_sql | |
df_fin = df_fin.to_pandas() | |
# save | |
df_fin.to_parquet(f'comments/{table}_comments{i}.parquet.gzip', | |
compression = 'gzip', | |
index = False) # Import files like: pd.read_parquet(f'comments/organicpost1_comments{i}.parquet.gzip') | |
# Delete the last file and Update count | |
del df_fin | |
i += 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment