Created
December 13, 2022 17:24
-
-
Save mgrddsj/35f1339941ff1b3eabb15e99c34e3ae0 to your computer and use it in GitHub Desktop.
POC Auto Photo Compressor For Files With "[towebp]"
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This Python snippet watches for file changes in a specified directory. | |
# If a file contains "[towebp]", it runs ffmpeg and converts it to webp, | |
# then deletes the original file | |
# File watcher base code credits to: | |
# https://towardsdatascience.com/implementing-a-file-watcher-in-python-73f8356a425d | |
import os | |
import subprocess | |
import time | |
watch_directory = r'/home/ubuntu/Files/Photos' # Directory to watch | |
poll_time = 5 # in seconds | |
# function to return files in a directory | |
def list_files(directory: str): | |
# Code modified to list files in subdirectories | |
filenames = [] | |
for path, subdirs, files in os.walk(directory): | |
for name in files: | |
filenames.append(os.path.join(path, name)) | |
return (filenames) | |
# function comparing two lists | |
def list_comparison(original_list: list, new_list: list): | |
# Note if files get deleted, this will not highlight them | |
differencesList = [x for x in new_list if x not in original_list] | |
return (differencesList) | |
def convert_to_webp(new_files: list): | |
print(f'New files detected: {new_files}') | |
for filename in new_files: | |
if "[towebp]" in filename: | |
new_file_name = os.path.splitext(filename)[0].replace("[towebp]", "") + ".webp" | |
ffmpeg_command = 'ffmpeg -i ' + f'"{filename}" ' + f'"{new_file_name}" ' | |
print(f'I will compress this: "{filename}", and it will become "{new_file_name}"') | |
print(f'ffmpeg command: {ffmpeg_command}') | |
proc = subprocess.run(ffmpeg_command, shell=True) | |
if proc.returncode == 0: | |
print("Converted successfully! Deleting original file.") | |
if os.path.exists(filename): | |
os.remove(filename) | |
else: | |
print("An error occured when running ffmpeg!") | |
def file_watcher(directory: str, poll_time: int): | |
while True: | |
if 'watching' not in locals(): # Check if this is the first time the function has run | |
previous_file_list = list_files(watch_directory) | |
watching = 1 | |
time.sleep(poll_time) | |
new_file_list = list_files(watch_directory) | |
file_diff = list_comparison(previous_file_list, new_file_list) | |
previous_file_list = new_file_list | |
if len(file_diff) == 0: | |
continue | |
convert_to_webp(file_diff) | |
if __name__ == "__main__": | |
file_watcher(watch_directory, poll_time) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment