Skip to content

Instantly share code, notes, and snippets.

@ismdeep
Created December 13, 2020 12:37
Show Gist options
  • Save ismdeep/c2ab8f4f1685cfffb34b6db3080041e8 to your computer and use it in GitHub Desktop.
Save ismdeep/c2ab8f4f1685cfffb34b6db3080041e8 to your computer and use it in GitHub Desktop.
Dumper for massive data to csv file
#!/usr/bin/env python3.7
import codecs
import csv
import os
import sys
import threading
import time
class Monitor:
current = 0
max = 100
done_flag = False
thread_obj = None
def __init__(self, __current__, __max__):
self.current = __current__
self.max = __max__
self.done_flag = False
def done(self):
self.done_flag = True
def work(self):
while not self.done_flag:
print('\r%d / %d => %.2f%%' % (self.current, self.max, (self.current * 100.0 / self.max)), end='')
time.sleep(0.2)
print()
def start(self):
self.thread_obj = threading.Thread(target=self.work)
self.thread_obj.start()
def wait_done(self):
self.thread_obj.join()
class Dumper:
source_file_path = None
dest_file_path = None
headers = []
rule_func = None
rows = []
ids = []
def __init__(self):
self.source_file_path = None
self.dest_file_path = None
self.headers = None
self.rule_func = None
self.rows = []
def parse_headers(self, __headers_raw__):
__headers__ = sys.argv[2].split(',')
self.ids = []
self.headers = []
for i in range(len(__headers__)):
if __headers__[i] != '':
self.ids.append(i)
self.headers.append(__headers__[i])
def dump(self):
if self.dest_file_path is None:
self.dest_file_path = self.source_file_path + ".csv"
if os.path.isfile(self.dest_file_path):
print("FILE [\"%s\"] Already exists." % self.dest_file_path)
return
import subprocess
out = subprocess.getoutput("wc -l \"%s\"" % self.source_file_path)
total_line_cnt = int(out.split()[0])
done_cnt = 0
monitor = Monitor(done_cnt, total_line_cnt)
monitor.start()
error_dump_file = open(self.source_file_path + ".err", 'w')
csv_file = open(self.dest_file_path, 'w')
csv_file_writer = csv.writer(csv_file)
csv_file_writer.writerow(self.headers)
for line in codecs.open(self.source_file_path, 'r', 'utf-8', 'ignore'):
result = self.rule_func(line, self.ids)
if result is not None:
self.rows.append(result)
else:
if len(line.strip()) > 0:
print("ERROR: [%s]" % line.strip())
error_dump_file.write(line)
if len(self.rows) >= 1000:
csv_file_writer.writerows(self.rows)
self.rows = []
done_cnt += 1
monitor.current = done_cnt
if len(self.rows) > 0:
csv_file_writer.writerows(self.rows)
self.rows = []
csv_file.close()
monitor.done()
monitor.wait_done()
def dump_func_split(__line__, __split_str__, __ids__):
try:
items = __line__.strip().split(__split_str__)
ans = []
for i in __ids__:
ans.append(items[i].strip())
return ans
except:
return None
def dump_func(__line__, __ids__):
for split_str in ['----', '---', ',', '#', ':', '|', '\t', ' ', ' ']:
result = dump_func_split(__line__, split_str, __ids__)
if result is not None:
return result
return None
def show_help_msg():
print('Usage: dumper PATH HEADERS\n'
'\n'
' PATH Data file path\n'
' HEADERS E.g. username,password,email')
def main():
if len(sys.argv) < 3 or '--help' in sys.argv:
show_help_msg()
exit()
dumper = Dumper()
dumper.source_file_path = sys.argv[1]
dumper.parse_headers(sys.argv[2])
dumper.rule_func = dump_func
dumper.dump()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment