Skip to content

Instantly share code, notes, and snippets.

@shibacow
Last active September 17, 2017 05:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shibacow/574f0c247e5a99a04cef426f4c860173 to your computer and use it in GitHub Desktop.
Save shibacow/574f0c247e5a99a04cef426f4c860173 to your computer and use it in GitHub Desktop.
[{"mode": "NULLABLE",
"name": "video_id",
"type": "STRING"},
{"mode": "NULLABLE",
"name": "date",
"type": "TIMESTAMP"},
{"mode": "NULLABLE",
"name": "content",
"type": "STRING"},
{"mode": "NULLABLE",
"name": "command",
"type": "STRING"},
{"mode": "NULLABLE",
"name": "vpos",
"type": "INTEGER"}]
3
# -*- coding:utf-8 -*-
from glob import glob
import logging
log_fmt = '%(asctime)s- %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.DEBUG,format=log_fmt)
from zipfile import ZipFile
import json
from pprint import pprint
from multiprocessing import Pool
import gzip
import shutil
src='/home/foobar/comment/*.zip'
donedir='/home/foobar/comment_done/'
def readzip(f):
with ZipFile(f) as zipf:
for zi in zipf.infolist():
with zipf.open(zi,'rU') as readf:
for l in readf.readlines():
yield l.decode('utf-8'),zi.filename
class ConvDict(object):
def __conv_zf(self,zf):
zf=zf.split('/')[1]
zf=zf.split('.')[0]
return zf
def __init__(self,l,zf):
self.d=json.loads(l)
self.zf=self.__conv_zf(zf)
self.d['video_id']=self.zf
def conv(self):
return json.dumps(self.d)
def __str__(self):
return "zf={}".format(self.zf)
def conv_f(f):
dst=f.split('/')[-1]
dst=dst.split('.')[0]
gff="gzip/{}.gz".format(dst)
msg="src={} dst={}".format(f,gff)
logging.info(msg)
#outl=[]
with gzip.open(gff,'w') as gfile:
for l,zfname in readzip(f):
cd=ConvDict(l,zfname)
ll=cd.conv()+'\n'
ll=ll.encode('utf-8')
#outl.append(ll)
#gfile.writelines(outl)
gfile.write(ll)
done=f.split('/')[-1]
doned=donedir+done
shutil.move(f,doned)
msg="done={}".format(doned)
logging.info(msg)
def main():
clist=sorted(glob(src))
with Pool(processes=20) as p:
p.map(conv_f,clist)
if __name__=='__main__':main()
[{"name": "length",
"type": "INTEGER",
"mode": "NULLABLE"},
{"name": "watch_num",
"type": "INTEGER",
"mode": "NULLABLE"},
{"name": "size_low",
"type": "INTEGER",
"mode": "NULLABLE"},
{"name": "comment_num",
"type": "INTEGER",
"mode": "NULLABLE"},
{"name": "description",
"type": "STRING",
"mode": "NULLABLE"},
{"name": "size_high",
"type": "INTEGER",
"mode": "NULLABLE"},
{"name": "tags",
"type": "STRING",
"mode": "REPEATED"},
{"name": "title",
"type": "STRING",
"mode": "NULLABLE"},
{"name": "mylist_num",
"type": "INTEGER",
"mode": "NULLABLE"},
{"name": "video_id",
"type": "STRING",
"mode": "NULLABLE"},
{"name": "upload_time",
"type":"TIMESTAMP",
"mode": "NULLABLE"},
{"name": "category",
"type": "STRING",
"mode": "NULLABLE"},
{"name": "file_type",
"type": "STRING",
"mode": "NULLABLE"}]
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from glob import glob
from zipfile import ZipFile
from gzip import GzipFile
import gzip
src="/home/foobar/video/*.zip"
def readzip(f):
with ZipFile(f) as zipf:
for zi in zipf.infolist():
with zipf.open(zi,'rU') as readf:
yield readf.read().decode('utf-8')
def main():
for i,f in enumerate(sorted(glob(src))):
ii=int(i/1000)
gff="gzip/{0:03d}.gz".format(ii)
print("i={0:04d} gff={1} f={2}".format(i,gff,f))
with gzip.open(gff,'a') as gfile:
for ll in readzip(f):
#print(len(ll))
#print(ll[:20])
gfile.write(ll.encode('utf-8'))
if __name__=='__main__':main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment