Created
December 13, 2020 12:37
-
-
Save ismdeep/c2ab8f4f1685cfffb34b6db3080041e8 to your computer and use it in GitHub Desktop.
Dumper for massive data to csv file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.7 | |
import codecs | |
import csv | |
import os | |
import sys | |
import threading | |
import time | |
class Monitor: | |
current = 0 | |
max = 100 | |
done_flag = False | |
thread_obj = None | |
def __init__(self, __current__, __max__): | |
self.current = __current__ | |
self.max = __max__ | |
self.done_flag = False | |
def done(self): | |
self.done_flag = True | |
def work(self): | |
while not self.done_flag: | |
print('\r%d / %d => %.2f%%' % (self.current, self.max, (self.current * 100.0 / self.max)), end='') | |
time.sleep(0.2) | |
print() | |
def start(self): | |
self.thread_obj = threading.Thread(target=self.work) | |
self.thread_obj.start() | |
def wait_done(self): | |
self.thread_obj.join() | |
class Dumper: | |
source_file_path = None | |
dest_file_path = None | |
headers = [] | |
rule_func = None | |
rows = [] | |
ids = [] | |
def __init__(self): | |
self.source_file_path = None | |
self.dest_file_path = None | |
self.headers = None | |
self.rule_func = None | |
self.rows = [] | |
def parse_headers(self, __headers_raw__): | |
__headers__ = sys.argv[2].split(',') | |
self.ids = [] | |
self.headers = [] | |
for i in range(len(__headers__)): | |
if __headers__[i] != '': | |
self.ids.append(i) | |
self.headers.append(__headers__[i]) | |
def dump(self): | |
if self.dest_file_path is None: | |
self.dest_file_path = self.source_file_path + ".csv" | |
if os.path.isfile(self.dest_file_path): | |
print("FILE [\"%s\"] Already exists." % self.dest_file_path) | |
return | |
import subprocess | |
out = subprocess.getoutput("wc -l \"%s\"" % self.source_file_path) | |
total_line_cnt = int(out.split()[0]) | |
done_cnt = 0 | |
monitor = Monitor(done_cnt, total_line_cnt) | |
monitor.start() | |
error_dump_file = open(self.source_file_path + ".err", 'w') | |
csv_file = open(self.dest_file_path, 'w') | |
csv_file_writer = csv.writer(csv_file) | |
csv_file_writer.writerow(self.headers) | |
for line in codecs.open(self.source_file_path, 'r', 'utf-8', 'ignore'): | |
result = self.rule_func(line, self.ids) | |
if result is not None: | |
self.rows.append(result) | |
else: | |
if len(line.strip()) > 0: | |
print("ERROR: [%s]" % line.strip()) | |
error_dump_file.write(line) | |
if len(self.rows) >= 1000: | |
csv_file_writer.writerows(self.rows) | |
self.rows = [] | |
done_cnt += 1 | |
monitor.current = done_cnt | |
if len(self.rows) > 0: | |
csv_file_writer.writerows(self.rows) | |
self.rows = [] | |
csv_file.close() | |
monitor.done() | |
monitor.wait_done() | |
def dump_func_split(__line__, __split_str__, __ids__): | |
try: | |
items = __line__.strip().split(__split_str__) | |
ans = [] | |
for i in __ids__: | |
ans.append(items[i].strip()) | |
return ans | |
except: | |
return None | |
def dump_func(__line__, __ids__): | |
for split_str in ['----', '---', ',', '#', ':', '|', '\t', ' ', ' ']: | |
result = dump_func_split(__line__, split_str, __ids__) | |
if result is not None: | |
return result | |
return None | |
def show_help_msg(): | |
print('Usage: dumper PATH HEADERS\n' | |
'\n' | |
' PATH Data file path\n' | |
' HEADERS E.g. username,password,email') | |
def main(): | |
if len(sys.argv) < 3 or '--help' in sys.argv: | |
show_help_msg() | |
exit() | |
dumper = Dumper() | |
dumper.source_file_path = sys.argv[1] | |
dumper.parse_headers(sys.argv[2]) | |
dumper.rule_func = dump_func | |
dumper.dump() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment