Skip to content

Instantly share code, notes, and snippets.

@Jong-Sig
Last active June 16, 2024 21:03
Show Gist options
  • Save Jong-Sig/4ed447f8b1b5353835d83c7b66fd8319 to your computer and use it in GitHub Desktop.
Save Jong-Sig/4ed447f8b1b5353835d83c7b66fd8319 to your computer and use it in GitHub Desktop.
Instagram Comment Metadata Parser
import csv
import glob
import json
import os
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
import time
import numpy as np
import math
import pandas as pd
import polars as pl
from tqdm import tqdm
from CreateSQL import *
from sqlalchemy.orm import sessionmaker
def comment_parser(data):
# extract comment relevant information from parsed JSON file
# can users comment on the post?
try:
comment_disabled = data['comments_disabled']
except:
comment_disabled = None
# get comment json
try:
cmts_json = data['edge_media_to_comment']['edges']
except KeyError:
cmts_json = data['edge_media_to_parent_comment']['edges']
# normalize json data into dataframe
df_cmts = pd.json_normalize(cmts_json)
# There might be a case where dataframe is empty check if the dataframe is empty
if df_cmts.empty:
df_cmts.loc[0, 'CommentExist'] = False
else:
df_cmts['CommentExist'] = True
# append comment_diabled as a new column
df_cmts['CommentClosed'] = comment_disabled
return df_cmts
def data_merge(path, files, pbar):
# specify the directory
zip_path = path
# Create dataframe to save result
df_sub = pl.DataFrame()
# create JSON parser
for file in files:
try:
with open(zip_path + '/' + file, 'r', encoding = 'utf-8') as f:
data = json.loads(f.read())
result = comment_parser(data)
# Add file ID as a column
result['FileID'] = file
# Convert pandas to polars to speed up
result = pl.from_pandas(result,
schema_overrides = {'node.created_at' : pl.Int64,
'node.did_report_as_spam' : pl.Boolean,
'node.edge_liked_by.count' : pl.Int64,
'node.id' : pl.Utf8,
'node.owner.id' : pl.Utf8,
'node.owner.is_verified': pl.Boolean,
'node.owner.profile_pic_url' : pl.Utf8,
'node.owner.username' : pl.Utf8,
'node.text' : pl.Utf8,
'node.viewer_has_liked': pl.Boolean,
'CommentExist' : pl.Boolean,
'CommentClosed' : pl.Boolean,
'FileID' : pl.Utf8,
'node.edge_threaded_comments.count' : pl.Int64,
'node.edge_threaded_comments.page_info.end_cursor' : pl.Utf8,
'node.edge_threaded_comments.page_info.has_next_page' : pl.Boolean,
'node.edge_threaded_comments.edges' : pl.List(pl.Utf8)},
rechunk = True)
# Append a datachunk to a final dataframe
df_sub = pl.concat([df_sub, result],
how = 'diagonal',
rechunk = True)
except:
# there are some empty json files (e.g., academy-1704600271848757741.info)
tqdm.write(f'{file} raised an error (e.g., data not exists). Skip this file.')
# Update progress
pbar.update(1)
return df_sub
# Convert UNIX time to local time
def unix_date(unix):
if unix is not None:
res = datetime.fromtimestamp(unix)
else:
res = None
return res
def column_cleaning(dataframe):
# Specify column headers of dataframe
col_list = ['node.created_at',
'node.did_report_as_spam',
'node.edge_liked_by.count',
'node.id',
'node.owner.id',
'node.owner.is_verified',
'node.owner.profile_pic_url',
'node.owner.username',
'node.text',
'node.viewer_has_liked',
'CommentExist',
'CommentClosed',
'FileID'
]
new_col_list = ['CommentDate',
'CommentSpam',
'CommentLikeNo',
'CommentID',
'CommenterIDNumeric',
'CommenterVerified',
'CommenterProfilePic',
'CommenterID',
'CommentText',
'CommentLikedByViewer',
'CommentExist',
'CommentClosed',
'FileID'
]
# Rename columns
dataframe = dataframe.rename(dict(zip(col_list, new_col_list)))
# Convert UNIX time to local time
dataframe = dataframe.with_columns(pl.struct(['CommentDate']).apply(lambda x: unix_date(x['CommentDate'])))
dataframe = dataframe.with_columns(pl.col("CommentDate").dt.year().alias('CommentYear'))
dataframe = dataframe.with_columns(pl.col("CommentDate").dt.month().alias('CommentMonth'))
dataframe = dataframe.with_columns(pl.col("CommentDate").dt.day().alias('CommentDay'))
# Change the order of columns
dataframe = dataframe.select(['FileID',
'CommentClosed',
'CommentExist',
'CommenterID',
'CommenterIDNumeric',
'CommenterVerified',
'CommenterProfilePic',
'CommentID',
'CommentSpam',
'CommentDate',
'CommentYear',
'CommentMonth',
'CommentDay',
'CommentText',
'CommentLikeNo',
'CommentLikedByViewer'
])
return dataframe
def main(path, file_list):
# specify the directory
tqdm.write('path finding...')
# list all files to parse
files = file_list
tqdm.write(f'total files: {len(files)}')
# Create dataframe to save the results
final_result = []
dataframe = pl.DataFrame()
# Determine chunksize
n_workers = 50
chunksize = round(len(files) / n_workers)
tqdm.write(f'chunk size: {chunksize}')
# Multithread using chunks
with tqdm(total = len(files), colour = 'GREEN') as pbar:
with ThreadPoolExecutor(n_workers) as exe:
# Split the copy operations into chunks
for i in range(0, len(files), chunksize):
# select a chunk of filenames
filenames = files[i:(i + chunksize)]
# submit the batch copy task
_ = exe.submit(data_merge, path, filenames, pbar)
# Append results from seperate thread into a single list file
final_result.append(_)
# Concat results from final_result
for f in as_completed(final_result):
dataframe = pl.concat([dataframe, f.result()],
how = 'diagonal',
rechunk = True)
# Cleaning the name and order of columns
dataframe = column_cleaning(dataframe)
# Summarize result
tqdm.write(f'Summary: \n # of JSON files: {len(files)}')
tqdm.write(f' # of JSON files w/ data: {dataframe.shape[0]}')
return dataframe
if __name__ == '__main__':
##########################################
# Get the list of files and identify paths
##########################################
# Identify the path where raw data exists
path = r'D:/Data/Influencer dataset/Post metadata/info'
# Identify the table to be processed
table = 'organicpost1'
# Get the list of file names that contain comments
## Connect to MySQL - DB: Instagram
connection = create_engine(db = 'organic')
## Get all the table names in the DB
tables = """
SHOW tables;
"""
table_names = pd.read_sql(tables, connection)
## Get total number of posts to be processed
table_name = f'{table}_update'
query = f"""
SELECT COUNT(FileID)
FROM {table_name};
"""
total_posts = pd.read_sql(query, connection)
total_posts = total_posts.iloc[0, 0].item()
## Extract posts table as dataframe
query = f"""
SELECT *
FROM {table_name};
"""
## Extract influencer table as dataframe
df_inf = pd.read_csv(r'F:\Projects\Brokerage\Insta_Analysis_Ver2\data\dataframe\influencers\influencers_2018-04.csv')
## Chunk process from here to reduce memory burden
# Specify chunksize
chunksize = 200000
total_chunk = math.ceil(total_posts / chunksize)
# Count chunk number
i = 1
## Start processing
# start_time = time.time()
for df_chunk in pd.read_sql(query, connection, chunksize = chunksize):
connection.dispose() #close engine after finishing the task to avoid errors
tqdm.write(f'Processing {i} / {total_chunk} chunk.')
## Confine to influencer samples which has posts from at least 2018-04
df_chunk = df_chunk.\
loc[df_chunk['PosterID'].\
isin(df_inf['PosterID'])].\
reset_index(drop = True).copy()
## Get the FileID list where comments are identifiable
file_list = df_chunk.loc[df_chunk['PostCommentText'] == 1, 'FileID'].values.tolist()
##############
# Run function
##############
# Run main function to get dataframe
df_fin = main(path, file_list)
# Append post date to dataframe -> Do this later.
df_chunk = pl.from_pandas(df_chunk.loc[:, ['FileID', 'PostDate', 'PostYear', 'PostMonth', 'PostDay']])
df_fin = df_fin.join(df_chunk,
on = 'FileID',
how = 'left')
# convert to pandas dataframe to use to_sql
df_fin = df_fin.to_pandas()
# save
df_fin.to_parquet(f'comments/{table}_comments{i}.parquet.gzip',
compression = 'gzip',
index = False) # Import files like: pd.read_parquet(f'comments/organicpost1_comments{i}.parquet.gzip')
# Delete the last file and Update count
del df_fin
i += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment