Skip to content

Instantly share code, notes, and snippets.

@jgrant41475
Created November 7, 2017 17:38
Show Gist options
  • Save jgrant41475/98f3e5cf81b256233f6a6c00c80e406a to your computer and use it in GitHub Desktop.
Save jgrant41475/98f3e5cf81b256233f6a6c00c80e406a to your computer and use it in GitHub Desktop.
A generic solution for filtering, grouping, and sorting a csv file by two fields
from functools import reduce
from operator import itemgetter as get
from sys import argv, maxsize as max_int
from itertools import groupby as by
class CSVEditor:
"""CSV Editor Class
A generic solution for filtering, grouping, and sorting a csv file by two fields
Creates new csv file in the same directory as file_path using the naming template: '[filename] - edited.csv'
Arguments
---------
@file_path: str
Complete file path of the csv to parse.
Only accepts files with the extension '.csv'
Attributes
----------
@columns : list of str
Columns to transfer over to updated file
@group_by : str
Column to group data by
@group_by_reversed : bool
Sort order
@sort_by : str
Column to sort groups by
@sort_by_reversed : bool
Sort order
@max : int or None
If the data type of the column @sort_by is a number
this should be max_int, otherwise it must be None
@path : str
File path of the original csv
@delimiter : byte
Character delimiter of csv file
@updated_csv : list of dicts of {key(str) : value(str or int)}
Container for updated list
"""
def __init__(self, file_path):
"""
Initialize instance variables
"""
self.columns = ["Keyword", "Search Engine", "Rank", "Rank"]
self.group_by = "Search Engine"
self.group_by_reversed = True
self.sort_by = "Rank"
self.sort_by_reversed = False
self.max = max_int
self.path = file_path
self.delimiter = ','
self.updated_csv = []
def parse(self):
"""
Reads @path into memory and parses the data set into a list of dictionaries
Sorts and groups the rows by @group_by, extracts @columns from the group and
performs another sort on @sort_by After all rows are grouped and sorted push
everything onto the instance variable @updated_csv
:return:
CSVEditor :
returns the instance of itself
FileNotFoundError :
Unable to locate file, exits with an error code
"""
try:
with open(self.path, "r") as file:
# List of column headers
cols = [c.strip('"') for c in file.readline().strip("\n").split(self.delimiter)]
# Read file, Parse lines into a list of dictionaries
temp_list = [reduce(lambda x, y: dict(x, **y),
[{cols[x]: line.strip("\n").split(self.delimiter)[x].strip('"')} for x in
range(len(cols))])
for line in file]
# If sort_by type is int, convert column to int type
if self.max is max_int:
for x in temp_list:
x[self.sort_by] = self.max if x[self.sort_by] == "" else int(x[self.sort_by])
# Group by primary key, select subset of columns, sort by secondary key and push onto the updated list
[self.updated_csv.extend(
sorted([{self.columns[x]: i[self.columns[x]] for x in range(len(self.columns))} for i in g],
key=get(self.sort_by), reverse=self.sort_by_reversed)) for _, g in
by(sorted(temp_list, key=get(self.group_by), reverse=self.group_by_reversed),
key=get(self.group_by))]
except FileNotFoundError:
exit("Error: File not found!")
return self
def make_new(self) -> bool:
"""
Write sorted data to disk
:return:
True :
Created and wrote to file with no errors
PermissionError :
Unable to get write lock, throws a fatal error and exits program
"""
try:
with open(self.path[0: self.path.find(".csv")] + " - edited.csv", "w") as file:
if file.writable() is True:
# If writable, write column headers and data
file.write(self.delimiter.join(['"{0}"'.format(x) for x in self.columns]) + "\n")
[file.write(reduce(lambda x, y: x + self.delimiter + str('""' if str(y) == '"' + str(
self.max) + '"' else y) if self.max == max_int else x + self.delimiter + y,
['"{0}"'.format(row[self.columns[x]]) for x in range(len(self.columns))]) + "\n")
for row in self.updated_csv]
except PermissionError:
exit("Unable create new file.")
return True
if __name__ == "__main__":
try:
# Either returns True for success or the program exits
if argv[1][-4:] == ".csv" and CSVEditor(argv[1]).parse().make_new():
print("Done.")
except IndexError:
exit("Missing argument.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment