Skip to content

Instantly share code, notes, and snippets.

@seregasheypak
Created February 9, 2016 17:20
Show Gist options
  • Save seregasheypak/74c8fee735cefc85feb0 to your computer and use it in GitHub Desktop.
Save seregasheypak/74c8fee735cefc85feb0 to your computer and use it in GitHub Desktop.
lass UntarOperator(BashOperator):
"""
Download and unpack artifact
:param artifact_tar_gz_name: name of artifact from previous download step
:type url: string
:param lookup_task_id an id of task that downloaded artifact
:type user: string
:param password
:type user: string
"""
@apply_defaults
def __init__(self, artifact_tar_gz_name, lookup_task_id, storage_path_prefix="", *args, **kwargs):
super(UntarOperator, self).__init__(bash_command="set me later", *args, **kwargs)
self.storage_path_prefix = storage_path_prefix
self.lookup_task_id = lookup_task_id
self.artifact_tar_gz_name = artifact_tar_gz_name
def execute(self, context):
unpack_path = self.do_someting()
self.bash_command = self.compose_cmd_command(path_to_tar_gz, unpack_path)
super(UntarOperator, self).execute(context)
# it works just fine, both self.artifact_tar_gz_name, unpack_path are strings
logging.info("Saving path to unpacked artifact here: {}=>{}".format(self.artifact_tar_gz_name, unpack_path))
self.xcom_push(context, self.artifact_tar_gz_name, unpack_path) # Exception here...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment