Created
June 11, 2021 20:50
-
-
Save syedarehaq/0c459b0b198025acfab096a36483d0e9 to your computer and use it in GitHub Desktop.
A slightly modified script to convert mysql dump to csv. Utilizing it for the sql dump of wikipedia interwiki links and pagelinks found here: https://dumps.wikimedia.org/enwiki/latest/ . It is slightly modified to accommodate some special unicode characters from its original source found here: https://github.com/jamesmishra/mysqldump-to-csv
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
## Source: https://github.com/jamesmishra/mysqldump-to-csv | |
import fileinput | |
import csv | |
import sys | |
# This prevents prematurely closed pipes from raising | |
# an exception in Python | |
from signal import signal, SIGPIPE, SIG_DFL | |
signal(SIGPIPE, SIG_DFL) | |
# allow large content in the dump | |
csv.field_size_limit(sys.maxsize) | |
def is_insert(line): | |
""" | |
Returns true if the line begins a SQL insert statement. | |
""" | |
return line.startswith('INSERT INTO') or False | |
def get_values(line): | |
""" | |
Returns the portion of an INSERT statement containing values | |
""" | |
return line.partition('` VALUES ')[2] | |
def values_sanity_check(values): | |
""" | |
Ensures that values from the INSERT statement meet basic checks. | |
""" | |
assert values | |
assert values[0] == '(' | |
# Assertions have not been raised | |
return True | |
def parse_values(values, outfile): | |
""" | |
Given a file handle and the raw values from a MySQL INSERT | |
statement, write the equivalent CSV to the file | |
""" | |
latest_row = [] | |
reader = csv.reader([values], delimiter=',', | |
doublequote=False, | |
escapechar='\\', | |
quotechar="'", | |
strict=True | |
) | |
writer = csv.writer(outfile, quoting=csv.QUOTE_MINIMAL) | |
for reader_row in reader: | |
for column in reader_row: | |
# If our current string is empty... | |
if len(column) == 0 or column == 'NULL': | |
latest_row.append(chr(0)) | |
continue | |
# If our string starts with an open paren | |
if column[0] == "(": | |
# Assume that this column does not begin | |
# a new row. | |
new_row = False | |
# If we've been filling out a row | |
if len(latest_row) > 0: | |
# Check if the previous entry ended in | |
# a close paren. If so, the row we've | |
# been filling out has been COMPLETED | |
# as: | |
# 1) the previous entry ended in a ) | |
# 2) the current entry starts with a ( | |
if latest_row[-1][-1] == ")": | |
# Remove the close paren. | |
latest_row[-1] = latest_row[-1][:-1] | |
new_row = True | |
# If we've found a new row, write it out | |
# and begin our new one | |
if new_row: | |
writer.writerow(latest_row) | |
latest_row = [] | |
# If we're beginning a new row, eliminate the | |
# opening parentheses. | |
if len(latest_row) == 0: | |
column = column[1:] | |
# Add our column to the row we're working on. | |
latest_row.append(column) | |
# At the end of an INSERT statement, we'll | |
# have the semicolon. | |
# Make sure to remove the semicolon and | |
# the close paren. | |
if latest_row[-1][-2:] == ");": | |
latest_row[-1] = latest_row[-1][:-2] | |
writer.writerow(latest_row) | |
def main(): | |
""" | |
Parse arguments and start the program | |
""" | |
# Iterate over all lines in all files | |
# listed in sys.argv[1:] | |
# or stdin if no args given. | |
try: | |
for line in fileinput.input(openhook=fileinput.hook_encoded("utf-8",errors="ignore")): | |
# Look for an INSERT statement and parse it. | |
if is_insert(line): | |
values = get_values(line) | |
if values_sanity_check(values): | |
parse_values(values, sys.stdout) | |
except KeyboardInterrupt: | |
sys.exit(0) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment