Skip to content

Instantly share code, notes, and snippets.

@antweiss
Created March 1, 2023 10:23
Show Gist options
  • Save antweiss/28508b5f37a32ad7474d80b8527b7837 to your computer and use it in GitHub Desktop.
Save antweiss/28508b5f37a32ad7474d80b8527b7837 to your computer and use it in GitHub Desktop.
Airflow Zip Operator
class ZipOperator(BaseOperator):
"""
An operator which takes in a path to a file and zips the contents to a location you define.
:param path_to_file_to_zip: Full path to the file you want to Zip
:type path_to_file_to_zip: string
:param path_to_save_zip: Full path to where you want to save the Zip file
:type path_to_save_zip: string
"""
template_fields = ('path_to_file_to_zip', 'path_to_save_zip')
template_ext = []
ui_color = '#ffffff' # ZipOperator's Main Color: white # todo: find better color
@apply_defaults
def __init__(
self,
path_to_file_to_zip,
path_to_save_zip,
*args, **kwargs):
self.path_to_file_to_zip = path_to_file_to_zip
self.path_to_save_zip = path_to_save_zip
def execute(self, context):
logging.info("Executing ZipOperator.execute(context)")
logging.info("Path to the File to Zip provided by the User (path_to_file_to_zip): " + str(self.path_to_file_to_zip))
logging.info("Path to save the Zip File provided by the User (path_to_save_zip) : " + str(self.path_to_save_zip))
dir_path_to_file_to_zip = os.path.dirname(os.path.abspath(self.path_to_file_to_zip))
logging.info("Absolute path to the File to Zip: " + str(dir_path_to_file_to_zip))
zip_file_name = os.path.basename(self.path_to_save_zip)
logging.info("Zip File Name: " + str(zip_file_name))
file_to_zip_name = os.path.basename(self.path_to_file_to_zip)
logging.info("Name of the File or Folder to be Zipped: " + str(file_to_zip_name))
os.chdir(dir_path_to_file_to_zip)
logging.info("Current Working Directory: " + str(os.getcwd()))
with ZipFile(zip_file_name, 'w') as zip_file:
logging.info("Created zip file object '" + str(zip_file) + "' with name '" + str(zip_file_name) + "'")
is_file = os.path.isfile(self.path_to_file_to_zip)
logging.info("Is the File to Zip a File (else its a folder): " + str(is_file))
if is_file:
logging.info("Writing '" + str(file_to_zip_name) + "to zip file")
zip_file.write(file_to_zip_name)
else: # is folder
for dirname, subdirs, files in os.walk(file_to_zip_name):
logging.info("Writing '" + str(dirname) + "to zip file")
zip_file.write(dirname)
for filename in files:
file_name_to_write = os.path.join(dirname, filename)
logging.info("Writing '" + str(file_name_to_write) + "to zip file")
zip_file.write(file_name_to_write)
logging.info("Closing Zip File Object")
zip_file.close()
logging.info("Moving '" + str(zip_file_name) + "' to '" + str(self.path_to_save_zip) + "'")
os.rename(zip_file_name, self.path_to_save_zip)
logging.info("Finished executing ZipOperator.execute(context)")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment