Last active
June 10, 2019 07:54
-
-
Save goingtomaine/0d920ae9c12ef5409e804287557f7564 to your computer and use it in GitHub Desktop.
Read the infodump into a dictionary of dataframes, an HDF5 file, or SQLite DB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from glob import glob | |
import os | |
from datetime import datetime | |
import numpy as np | |
import pandas as pd | |
from tqdm import tqdm | |
def _mefi_strptime(x): | |
try: | |
return datetime.strptime(x, '%Y-%m-%d %H:%M:%S:%f%p') | |
except ValueError: | |
return datetime(1900, 1, 1, 0, 0, 0) | |
def _int_with_filled_nans(x): | |
import numpy as np | |
try: | |
return np.int64(x) | |
except ValueError: | |
return -1 | |
def _posttitles_to_df(fpath): | |
"""Necessitated by a wonky, long title in posttitles_askme.txt | |
Args: | |
fpath (str): path to the posttitles file. | |
Returns: | |
pandas.DataFrame: DataFrame extracted from the file. | |
""" | |
df_dict = {'postid': [], 'title': []} | |
with open(fpath, 'r') as infile: | |
infile.readline() # pre-date | |
infile.readline() # header | |
for line in infile: | |
data = line.strip().split('\t') | |
df_dict['postid'].append(_int_with_filled_nans(data[0])) | |
if len(data) < 2: | |
df_dict['title'].append('') | |
elif len(data) == 2: | |
df_dict['title'].append(data[1]) | |
else: | |
df_dict['title'].append(' '.join(data[1:])) | |
return pd.DataFrame(df_dict) | |
def _infodump_file_to_df(fpath): | |
"""Convert an infodump file to a pandas.DataFrame. | |
Args: | |
fpath (str): Path to the infodump file. | |
Returns: | |
pandas.DataFrame: DataFrame made from the infodump file. | |
""" | |
if fpath.find('posttitles_') > -1: | |
return _posttitles_to_df(fpath) | |
df = pd.read_csv(fpath, | |
sep='\t', | |
skiprows=1, | |
converters={ | |
'above': _int_with_filled_nans, | |
'below': _int_with_filled_nans, | |
'best answer?': bool, | |
'category': _int_with_filled_nans, | |
'comments': _int_with_filled_nans, | |
'date': _mefi_strptime, | |
'datestamp': _mefi_strptime, | |
'deleted': bool, | |
'favorites': _int_with_filled_nans, | |
'joindate': _mefi_strptime, | |
'link_date': _mefi_strptime, | |
'link_id': _int_with_filled_nans, | |
'name': str, | |
'postid': _int_with_filled_nans, | |
'reason': str, | |
'tag_id': _int_with_filled_nans, | |
'tag_name': str, | |
'title': str, | |
'url': _int_with_filled_nans, | |
'urldesc': _int_with_filled_nans, | |
'userid': _int_with_filled_nans | |
}) | |
# All the comment data has a "best answer?" column, but it's meaningless | |
# except for when dealing with AskMe. | |
if 'best answer?' in df.columns: | |
if fpath.find('commentdata_askme.txt') == -1: | |
del df['best answer?'] | |
# MetaFilter's post data contains a category column, but it's always 0 | |
# -except, it seems, on 4 broken posts where it comes up as -1- | |
# so should be dropped from this df. | |
if 'category' in df.columns: | |
if fpath.find('postdata_mefi.txt') > -1: | |
del df['category'] | |
return df | |
def infodump_to_df_dict(infodump_path='.'): | |
"""Process the infodump directory into a dictionary of DataFrames and | |
dictionaries of DataFrames for particular subsites. | |
Args: | |
infodump_path (str): Path to the infodump directory | |
Returns: | |
dict[pandas.DataFrame|dict[pandas.DataFrame]]: Dict of DataFrames and | |
dictionaries of DataFrames | |
""" | |
dfs = dict() | |
all_files = sorted(glob(os.path.join(infodump_path, '*.txt')), | |
key=os.path.getsize) | |
sizes = [os.path.getsize(x) for x in all_files] | |
with tqdm(total=sum(sizes), unit='B', unit_scale=True, unit_divisor=1024) as pbar: | |
for size, fpath in zip(sizes, all_files): | |
fname = os.path.basename(fpath) | |
pbar.set_description(fname) | |
df = _infodump_file_to_df(fpath) | |
if fname.find('_') > -1: | |
data_type, subsite = fname[:-4].split('_') | |
if subsite not in dfs: | |
dfs[subsite] = dict() | |
dfs[subsite][data_type] = df | |
else: | |
dfs[fname[:-4]] = df | |
pbar.update(size) | |
return dfs | |
def infodump_to_hdf5(infodump_path='.', hdf_path='infodump.h5'): | |
"""Process the infodump directory into an HDF5 table. | |
Args: | |
infodump_path (str): Path to the infodump directory | |
hdf_path (str): Path to HDF data store (will be created if missing) | |
Returns: | |
str: Path to HDF data store | |
""" | |
all_files = sorted(glob(os.path.join(infodump_path, '*.txt')), | |
key=os.path.getsize) | |
sizes = [os.path.getsize(x) for x in all_files] | |
with tqdm(total=sum(sizes), unit='B', unit_scale=True, unit_divisor=1024) as pbar: | |
for size, fpath in zip(sizes, all_files): | |
fname = os.path.basename(fpath) | |
pbar.set_description(fname) | |
df = _infodump_file_to_df(fpath) | |
with pd.HDFStore(hdf_path) as store: | |
if fname.find('_') > -1: | |
data_type, subsite = fname[:-4].split('_') | |
store['/'.join([subsite, data_type])] = df | |
else: | |
store[fname[:-4]] = df | |
pbar.update(size) | |
return hdf_path | |
def infodump_to_sqlite(infodump_path='.', db_path='infodump.db'): | |
"""Process the infodump directory into a SQLite DB. | |
Args: | |
infodump_path (str): Path to the infodump directory | |
db_path (str): Path to the SQLite DB (will be created if missing) | |
Returns: | |
str: Path to the SQLite DB | |
""" | |
from odo import odo | |
all_files = sorted(glob(os.path.join(infodump_path, '*.txt')), | |
key=os.path.getsize) | |
sizes = [os.path.getsize(x) for x in all_files] | |
with tqdm(total=sum(sizes), unit='B', unit_scale=True, unit_divisor=1024) as pbar: | |
for size, fpath in zip(sizes, all_files): | |
fname = os.path.basename(fpath) | |
pbar.set_description(fname) | |
df = _infodump_file_to_df(fpath) | |
sql_str = 'sqlite:///{}::{}'.format(db_path, fname[:-4]) | |
odo(df, sql_str) | |
pbar.update(size) | |
return db_path | |
def _parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('infodump_dir', help='Path to the directory containing the infodump files') | |
parser.add_argument('output_path', help='Path to the output file (.h5, .db, .pkl)') | |
return parser.parse_args() | |
def main(): | |
args = _parse_args() | |
if args.output_path.endswith('.h5'): | |
infodump_to_hdf5(args.infodump_dir, args.output_path) | |
elif args.output_path.endswith('.db'): | |
infodump_to_sqlite(args.infodump_dir, args.output_path) | |
elif args.output_path.endswith('.pkl'): | |
df_dict = infodump_to_df_dict(args.infodump_dir) | |
import pickle | |
with open(args.output_path, 'w') as outfile: | |
pickle.dump(df_dict, outfile) | |
else: | |
raise ValueError( | |
'output_fpath must end in .h5, .db, or .pkl: {}'.format( | |
args.output_fpath)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment