Skip to content

Instantly share code, notes, and snippets.

@hartym
Forked from cw-andrews/example_job.py
Created November 11, 2017 18:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hartym/c03ed10caa5a439a72b108057209f543 to your computer and use it in GitHub Desktop.
Save hartym/c03ed10caa5a439a72b108057209f543 to your computer and use it in GitHub Desktop.
CSV Sanitizer Bonobo ETL Job with Atomic Writes
import argparse
from time import sleep
import os
import re
import bonobo
from atomicwrites import atomic_write
from fs.osfs import OSFS
from texools.files import new_file_check
from texools.strings import asciify
from unittest.mock import patch
SOURCE_FILE = os.getenv('SOURCE_FILE',
'dealervault/DealerVaultInventory.csv')
DESTINATION_FILE = os.getenv('DESTINATION_FILE',
'dealervault/DealerVaultInventory_processed.csv')
SHOULD_FORCE = bool(os.getenv('SHOULD_FORCE', 0))
_open = OSFS.open
def _atomic_open(self, path, mode="r", **kwargs):
if mode.startswith('w'):
return atomic_write(path, overwrite=mode.endswith('+'))
return _open(self, path, mode=mode, **kwargs)
OSFS.open = _atomic_open
def get_graph():
graph = bonobo.Graph()
if SHOULD_FORCE or new_file_check(SOURCE_FILE, DESTINATION_FILE):
graph.add_chain(
bonobo.FileReader(SOURCE_FILE),
asciify,
extra_quote_killer,
bonobo.FileWriter(DESTINATION_FILE)
)
else:
graph.add_node(
sleep(1)
)
return graph
def extra_quote_killer(line: str) -> str:
# if line.startswith('"') and line.endswith('"'):
field_seprators_original = re.compile('","')
field_sepraters_replacement = re.compile('<~>')
bos_eos_quotes_original = re.compile('(^"|"$)')
bos_eos_quotes_replacement = re.compile('<>')
any_quotes = re.compile('"')
translated_seperators = field_seprators_original.sub('<~>', line)
translated_bol_eol_quotes = bos_eos_quotes_original.sub(
'<>', translated_seperators)
translated_sanitized = any_quotes.sub('', translated_bol_eol_quotes)
retranslated_seperators = field_sepraters_replacement.sub(
'","', translated_sanitized)
retranslated_sanitized = bos_eos_quotes_replacement.sub(
'"', retranslated_seperators)
# else:
# retranslated_sanitized = line
return retranslated_sanitized
def get_services():
return {}
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--env', '-e', action='append', default=list())
options = parser.parse_args()
for env in options.env:
k, v = env.split('=', 1)
os.environ[k] = v
bonobo.run(get_graph(), services=get_services())
else:
graph = get_graph()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment