Skip to content

Instantly share code, notes, and snippets.

@zdyxry
Created July 7, 2018 08:18
Show Gist options
  • Save zdyxry/e709b34f88a81f7159f640a04bac3c08 to your computer and use it in GitHub Desktop.
Save zdyxry/e709b34f88a81f7159f640a04bac3c08 to your computer and use it in GitHub Desktop.
Convert csv file to nested json file. Reference: https://github.com/chamkank/hone
#!/usr/bin/python
# MIT License
# Copyright (c) 2018 CK
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import click
import copy
import json
from contextlib import contextmanager
class JsonUtils(object):
@staticmethod
def save_json(json_struct, json_file):
with open(json_file, "w") as f:
json.dump(json_struct, f, indent=4)
class CSVUtils(object):
def __init__(self, csv_file):
self._file = csv_file
@contextmanager
def open_csv_file(self, mode="rb"):
with open(self._file, mode) as f:
if f:
yield f
else:
raise NotImplementedError
def parse_row(self, row):
row = row.strip()
cells = []
quote_flag = False
split_index = 0
for index, value in enumerate(row):
if value == '"':
quote_flag = not quote_flag
elif value == ',':
if not quote_flag:
cells.append(row[split_index:index])
split_index = index + 1
if split_index != len(row):
cells.append(row[split_index:])
cells = [cell.strip(' "') for cell in cells]
return cells
def get_column_names(self):
with self.open_csv_file() as f:
cols = self.parse_row(f.readline())
return cols
def get_data_rows(self):
data_rows = []
with self.open_csv_file() as f:
f.readline()
while True:
row = f.readline()
if row != '':
data_rows.append(self.parse_row(row))
else:
break
return data_rows
class CSV2Json(object):
def __init__(self, csv_file):
self.delimit_chars = [",", "_", " "]
self.csv = CSVUtils(csv_file)
def is_valid_prefix(self, prefix, base):
if base.startswith(prefix):
if base[len(prefix)] in self.delimit_chars:
return True
return False
def clean_split(self, split):
i = len(split) - 1
while i >= 0:
c = split[i]
if c not in self.delimit_chars:
return split[0:i + 1]
i -= 1
return split
def get_leaves(self, structure, path="", result={}):
for key, value in structure.items():
if type(value) is dict:
self.get_leaves(value, path + "['" + key + "']", result)
else:
result[value] = path + "['" + key + "']"
return result
def get_split_suffix(self, split, column_name=""):
suffix = column_name[len(split) + 1:]
for index, value in enumerate(suffix):
if value not in self.delimit_chars:
return suffix[index:]
def get_valid_splits(self, column_name):
splits = []
i = len(column_name) - 1
while i >= 0:
c = column_name[i]
if c in self.delimit_chars:
split = self.clean_split(column_name[0:i])
splits.append(split)
i -= 1
return sorted(list(set(splits)))
def generate_structure(self, column_names=None, parent_structure=None):
if parent_structure and not column_names:
column_names = list(parent_structure.keys())
visited = set()
structure = {}
sorted(column_names, reverse=True)
for c1 in column_names:
if c1 in visited:
continue
splits = self.get_valid_splits(c1)
for split in splits:
nodes = {split: {}}
if split in column_names:
continue
for c2 in column_names:
if c2 not in visited and self.is_valid_prefix(split, c2):
if parent_structure:
nodes[split][self.get_split_suffix(split, c2)] = parent_structure[c2]
visited.add(c2)
else:
nodes[split][self.get_split_suffix(split, c2)] = c2
if len(nodes[split].keys()) > 1:
structure[split] = self.generate_structure(parent_structure=nodes[split])
for val in nodes[split].values():
visited.add(val)
if c1 not in visited:
if parent_structure:
structure[c1] = parent_structure[c1]
else:
structure[c1] = c1
return structure
def populate_structure_with_data(self, structure, column_names, data_rows):
json_struct = []
num_columns = len(column_names)
mapping = self.get_leaves(structure)
for row in data_rows:
json_row = copy.deepcopy(structure)
i = 0
while i < num_columns:
cell = row[i]
column_name = column_names[i]
key_path = mapping[column_name]
exec("json_row" + key_path + "=" + "'" + cell + "'")
i += 1
json_struct.append(json_row)
return json_struct
def convert(self):
column_names = self.csv.get_column_names()
data = self.csv.get_data_rows()
column_schema = self.generate_structure(column_names)
json_struct = self.populate_structure_with_data(column_schema, column_names, data)
return json_struct
@click.command()
@click.option('--csv_file', default=None, help='Input csv file abspath')
@click.option('--json_file', default=None, help='Output json file abspath')
def main(csv_file, json_file):
"""Convert csv file to json file."""
try:
csv2json = CSV2Json(csv_file)
JsonUtils.save_json(csv2json.convert(), json_file)
print "Convert csv file success, json file path is %s" % json_file
except Exception as error:
print "Failed to convert csv file with error %s" % error
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment