Skip to content

Instantly share code, notes, and snippets.

@hartym
Last active October 29, 2017 22:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hartym/e676f1c74a4313db6b108ff31f4e66b7 to your computer and use it in GitHub Desktop.
Save hartym/e676f1c74a4313db6b108ff31f4e66b7 to your computer and use it in GitHub Desktop.
import argparse
import datetime
import os
import time
import bonobo
from atomicwrites import atomic_write
from bonobo.constants import NOT_MODIFIED
from fs.osfs import OSFS
_open = OSFS.open
def _atomic_open(self, path, mode="r", **kwargs):
if mode.startswith('w'):
return atomic_write(path, overwrite=mode.endswith('+'))
return _open(path, mode=mode, **kwargs)
OSFS.open = _atomic_open
def is_update_needed():
return bool(os.getenv('UPDATE', ''))
input_data = [{'id': 1, 'timestamp': str(datetime.datetime.now())}]
def take_your_time(*args, **kwargs):
time.sleep(5)
return NOT_MODIFIED
def get_graph():
graph = bonobo.Graph()
if is_update_needed():
graph.add_chain(
input_data,
take_your_time,
bonobo.CsvWriter('output.csv', ioformat='arg0'),
)
return graph
def get_services():
return {}
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--env', '-e', action='append', default=list())
options = parser.parse_args()
for env in options.env:
k, v = env.split('=', 1)
os.environ[k] = v
bonobo.run(get_graph(), services=get_services())
else:
graph = get_graph()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment