Skip to content

Instantly share code, notes, and snippets.

@TonyWuLihu
Created December 26, 2016 08:59
Show Gist options
  • Save TonyWuLihu/686b77ea1332fcc6fc6b2fd2f50c06d5 to your computer and use it in GitHub Desktop.
Save TonyWuLihu/686b77ea1332fcc6fc6b2fd2f50c06d5 to your computer and use it in GitHub Desktop.
Merge parquet files locally
import sys
from datetime import date,datetime,timedelta
import datetime
import string
from pexpect import *
from os import remove,listdir
import os
import pprint
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
if __name__ == '__main__':
s3path_prefix = '/data/s3fs/warehouse/ott_user_info'
merge_command = 'java -jar /usr/local/parquet-tools-1.9.1-SNAPSHOT.jar merge %s %s'
year,month,day = (sys.argv[1][0:4],sys.argv[1][4:6],sys.argv[1][6:8])
filelist = [os.path.join(s3path_prefix+'/year=%s/month=%s/day=%s' % (year,month,day),filename) for filename in listdir(s3path_prefix+'/year=%s/month=%s/day=%s' % (year,month,day)) if filename.startswith('part-')]
subfilelists = list(chunks(filelist,int(sys.argv[2])))
#TODO ugly code, flat it later
i=0
fx = lambda x: os.remove(x)
for sublist in subfilelists:
src = ' '.join(sublist)
dest = os.path.join(s3path_prefix+'/year=%s/month=%s/day=%s' % (year,month,day),'userinfo_%05d.parquet' % i)
i+=1
childp = spawn(merge_command % (src,dest))
childp.expect(EOF,timeout=1000)
childp.close()
list(map(fx,sublist))
print("Job Done!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment