Skip to content

Instantly share code, notes, and snippets.

@cw-andrews
Created November 11, 2017 18:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save cw-andrews/2b7a2f98475f90b52857f9b6cc75db5d to your computer and use it in GitHub Desktop.
Save cw-andrews/2b7a2f98475f90b52857f9b6cc75db5d to your computer and use it in GitHub Desktop.
CSV Sanitizer Bonobo ETL Job with Atomic Writes
import argparse
from time import sleep
import os
import re
import bonobo
from atomicwrites import atomic_write
from fs.osfs import OSFS
from texools.files import new_file_check
from texools.strings import asciify
from unittest.mock import patch
SOURCE_FILE = os.getenv('SOURCE_FILE',
'dealervault/DealerVaultInventory.csv')
DESTINATION_FILE = os.getenv('DESTINATION_FILE',
'dealervault/DealerVaultInventory_processed.csv')
SHOULD_FORCE = bool(os.getenv('SHOULD_FORCE', 0))
_open = OSFS.open
def _atomic_open(self, path, mode="r", **kwargs):
if mode.startswith('w'):
return atomic_write(path, overwrite=mode.endswith('+'))
return _open(path, mode=mode, **kwargs)
OSFS.open = _atomic_open
def get_graph():
graph = bonobo.Graph()
if SHOULD_FORCE or new_file_check(SOURCE_FILE, DESTINATION_FILE):
graph.add_chain(
bonobo.FileReader(SOURCE_FILE),
asciify,
extra_quote_killer,
bonobo.FileWriter(DESTINATION_FILE)
)
else:
graph.add_node(
sleep(1)
)
return graph
def extra_quote_killer(line: str) -> str:
# if line.startswith('"') and line.endswith('"'):
field_seprators_original = re.compile('","')
field_sepraters_replacement = re.compile('<~>')
bos_eos_quotes_original = re.compile('(^"|"$)')
bos_eos_quotes_replacement = re.compile('<>')
any_quotes = re.compile('"')
translated_seperators = field_seprators_original.sub('<~>', line)
translated_bol_eol_quotes = bos_eos_quotes_original.sub(
'<>', translated_seperators)
translated_sanitized = any_quotes.sub('', translated_bol_eol_quotes)
retranslated_seperators = field_sepraters_replacement.sub(
'","', translated_sanitized)
retranslated_sanitized = bos_eos_quotes_replacement.sub(
'"', retranslated_seperators)
# else:
# retranslated_sanitized = line
return retranslated_sanitized
def get_services():
return {}
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--env', '-e', action='append', default=list())
options = parser.parse_args()
for env in options.env:
k, v = env.split('=', 1)
os.environ[k] = v
bonobo.run(get_graph(), services=get_services())
else:
graph = get_graph()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment