Skip to content

Instantly share code, notes, and snippets.

@CJHwong
Created November 2, 2020 03:44
Show Gist options
  • Save CJHwong/dfe7dd539b5417a6335998c08079e503 to your computer and use it in GitHub Desktop.
Save CJHwong/dfe7dd539b5417a6335998c08079e503 to your computer and use it in GitHub Desktop.
Extract partial info from a table of mysqldump backup SQL file.
import argparse
import codecs
import csv
def extract_table_sql(table_name, source_dir, target_dir):
TABLE_STRUCTURE_START = '-- Table structure for table'
TARGET_TABLE_STRUCTURE_START = f'{TABLE_STRUCTURE_START} `{table_name}'
backup_sql = codecs.open(f'{source_dir}/backup.sql', 'r', encoding='utf-8', errors='replace')
ext_table_sql = open(f'{target_dir}/{table_name}.sql', 'w')
start_write = False
for line in backup_sql.readlines():
if start_write and line.startswith(TABLE_STRUCTURE_START):
# break when new table starts
# extract one table at a time
break
if not start_write and line.startswith(TARGET_TABLE_STRUCTURE_START):
# start writing when reach target table
start_write = True
if start_write and not line.startswith('--'):
# ignore comments
ext_table_sql.write(line)
backup_sql.close()
ext_table_sql.close()
def parse_table_field_list(table_name, target_dir):
CREATE_TABLE_START = f'CREATE TABLE `{table_name}`'
with open(f'{target_dir}/{table_name}.sql', 'r') as ext_table_sql:
field_list = []
start_add = False
for line in ext_table_sql.readlines():
if start_add and line.startswith(')'):
# break when create table ends
break
if not start_add and line.startswith(CREATE_TABLE_START):
# start recording when create table starts
start_add = True
strip_line = line.strip()
if start_add and strip_line.startswith('`'):
field_list.append(strip_line.split(' ')[0].replace('`', ''))
return field_list
def yield_inline_rows(row_len, line):
buffer = ''
field_pos = 0
take_next_one_and_continue = False
is_in_str = False
for c in line:
if take_next_one_and_continue:
buffer += c
take_next_one_and_continue = False
continue
if c == '\\':
buffer += c
take_next_one_and_continue = True
continue
if c == '\'':
buffer += c
is_in_str = not is_in_str
continue
if not is_in_str and c in (',', ';'):
field_pos += 1
if field_pos == row_len:
yield buffer
buffer = ''
field_pos = 0
continue
buffer += c
def yield_row(target_dir, table_name, row_len):
INSERT_START = f'INSERT INTO `{table_name}` VALUES '
with open(f'{target_dir}/{table_name}.sql', 'r') as ext_table_sql:
for line in ext_table_sql.readlines():
if line.startswith(INSERT_START):
for row_str in yield_inline_rows(
row_len, line.replace(INSERT_START, '')
):
yield eval(row_str.replace('NULL', 'None'))
def extract_table_values(table_name, target_dir, fields=None):
field_list = parse_table_field_list(table_name, target_dir)
fields = field_list if not fields else fields
field_index_map = {f: idx for idx, f in enumerate(field_list)}
with open(f'{target_dir}/{table_name}.csv', 'w', newline='\n') as csv_file:
writer = csv.writer(csv_file)
writer.writerow(fields)
for row in yield_row(target_dir, table_name, len(field_list)):
writer.writerow([row[field_index_map[f]] for f in fields])
if __name__ == '__main__':
# table_name -f field_name_1 -f field_name_2 -f field_name_3
parser = argparse.ArgumentParser()
parser.add_argument('table', help='the name of the table to extract')
parser.add_argument('-f', '--fields', help='the fields to extract', action='append')
parser.add_argument('-s', '--source-dir', help='the directory to read backup data', default='.')
parser.add_argument('-t', '--target-dir', help='the directory to store extracted data', default='.')
args = parser.parse_args()
# action
extract_table_sql(args.table, args.source_dir, args.target_dir)
extract_table_values(args.table, args.target_dir, args.fields)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment