Skip to content

Instantly share code, notes, and snippets.

@Miopas
Created December 9, 2020 20:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Miopas/32f904d1afc5ebe246cfb26505bac606 to your computer and use it in GitHub Desktop.
Save Miopas/32f904d1afc5ebe246cfb26505bac606 to your computer and use it in GitHub Desktop.
Process compressed files in Python.
'''
python parse_bz2.py *.bz2 ${dest}
'''
import sys
from bz2 import BZ2File as bzopen
import json
import pandas as pd
infile = sys.argv[1]
outpath = sys.argv[2]
# reading a bz2 archive
new_df = {'text':[]}
with bzopen(infile, "r") as bzfin:
""" Handle lines here """
count = 0
# In this case, each line is a JSON string.
for i, line in enumerate(bzfin):
#if i == 2: break
doc = json.loads(line.strip())
new_df['text'].append(str(doc['body']))
count += 1
if count % 100000 == 0:
surfix = int(count/100000)
pd.DataFrame(new_df).to_csv('{}/{}.csv'.format(outpath, surfix), header=['text'], index=False)
new_df = {'text':[]}
'''
python parse_zst.py raw/${infile}.zst ${dest}
'''
import zstandard
import pathlib
import shutil
import os
import math
from preprocesstwitter import process_reddit
import pandas as pd
import sys
def decompress_zstandard_to_folder(input_file, destination_dir):
input_file = pathlib.Path(input_file)
with open(input_file, 'rb') as compressed:
decomp = zstandard.ZstdDecompressor()
output_path = pathlib.Path(destination_dir) / input_file.stem
with open(output_path, 'wb') as destination:
decomp.copy_stream(compressed, destination)
#decompress_zstandard_to_folder('raw/RC_2019-12.zst', 'raw')
import json
size = int(math.pow(2, 24))
infile = sys.argv[1]
outpath = sys.argv[2]
with open(infile, 'rb') as fh:
dctx = zstandard.ZstdDecompressor()
new_df = {'text':[]}
count = 0
with dctx.stream_reader(fh) as reader:
previous_line = ""
while True:
chunk = reader.read(size)
if not chunk:
break
string_data = chunk.decode('utf-8')
lines = string_data.split("\n")
for i, line in enumerate(lines[:-1]):
if i == 0:
line = previous_line + line
doc = json.loads(line)
#print(doc['body'])
#text = process_reddit(doc['body'])
text = str(doc['body'].strip())
#print(text)
if len(text) == 0:
continue
new_df['text'].append(text)
count += 1
if count % 10000 == 0:
sys.stderr.write('processed {}...\n'.format(count))
if count % 100000 == 0:
surfix = int(count/100000)
pd.DataFrame(new_df).to_csv('{}/reddit_comments.text.{}.csv'.format(outpath, surfix), header=['text'], index=False)
new_df = {'text':[]}
previous_line = lines[-1]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment