Created
March 1, 2023 10:23
-
-
Save antweiss/28508b5f37a32ad7474d80b8527b7837 to your computer and use it in GitHub Desktop.
Airflow Zip Operator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ZipOperator(BaseOperator): | |
""" | |
An operator which takes in a path to a file and zips the contents to a location you define. | |
:param path_to_file_to_zip: Full path to the file you want to Zip | |
:type path_to_file_to_zip: string | |
:param path_to_save_zip: Full path to where you want to save the Zip file | |
:type path_to_save_zip: string | |
""" | |
template_fields = ('path_to_file_to_zip', 'path_to_save_zip') | |
template_ext = [] | |
ui_color = '#ffffff' # ZipOperator's Main Color: white # todo: find better color | |
@apply_defaults | |
def __init__( | |
self, | |
path_to_file_to_zip, | |
path_to_save_zip, | |
*args, **kwargs): | |
self.path_to_file_to_zip = path_to_file_to_zip | |
self.path_to_save_zip = path_to_save_zip | |
def execute(self, context): | |
logging.info("Executing ZipOperator.execute(context)") | |
logging.info("Path to the File to Zip provided by the User (path_to_file_to_zip): " + str(self.path_to_file_to_zip)) | |
logging.info("Path to save the Zip File provided by the User (path_to_save_zip) : " + str(self.path_to_save_zip)) | |
dir_path_to_file_to_zip = os.path.dirname(os.path.abspath(self.path_to_file_to_zip)) | |
logging.info("Absolute path to the File to Zip: " + str(dir_path_to_file_to_zip)) | |
zip_file_name = os.path.basename(self.path_to_save_zip) | |
logging.info("Zip File Name: " + str(zip_file_name)) | |
file_to_zip_name = os.path.basename(self.path_to_file_to_zip) | |
logging.info("Name of the File or Folder to be Zipped: " + str(file_to_zip_name)) | |
os.chdir(dir_path_to_file_to_zip) | |
logging.info("Current Working Directory: " + str(os.getcwd())) | |
with ZipFile(zip_file_name, 'w') as zip_file: | |
logging.info("Created zip file object '" + str(zip_file) + "' with name '" + str(zip_file_name) + "'") | |
is_file = os.path.isfile(self.path_to_file_to_zip) | |
logging.info("Is the File to Zip a File (else its a folder): " + str(is_file)) | |
if is_file: | |
logging.info("Writing '" + str(file_to_zip_name) + "to zip file") | |
zip_file.write(file_to_zip_name) | |
else: # is folder | |
for dirname, subdirs, files in os.walk(file_to_zip_name): | |
logging.info("Writing '" + str(dirname) + "to zip file") | |
zip_file.write(dirname) | |
for filename in files: | |
file_name_to_write = os.path.join(dirname, filename) | |
logging.info("Writing '" + str(file_name_to_write) + "to zip file") | |
zip_file.write(file_name_to_write) | |
logging.info("Closing Zip File Object") | |
zip_file.close() | |
logging.info("Moving '" + str(zip_file_name) + "' to '" + str(self.path_to_save_zip) + "'") | |
os.rename(zip_file_name, self.path_to_save_zip) | |
logging.info("Finished executing ZipOperator.execute(context)") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment