Created
January 5, 2022 21:55
-
-
Save BonBonSlick/0a68e39cf2426601342181284f3fd15a to your computer and use it in GitHub Desktop.
Multi threads async similar files removal
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from PIL import Image | |
import random | |
import imagehash | |
import time | |
import os | |
import asyncio | |
import datetime | |
allowedFileFormats = '.jpg, .png, .webp, .mp4, .gif'; | |
folderFailesPath = 'C:/Users/BonBon.DESKTOP-B1B9CUP/Downloads/files' | |
removedFilesForIteration = 0 | |
numberofChecks = 0 | |
def writeLog(Error): | |
print('=======================') | |
print('=======================') | |
print('=======================') | |
print('======================= EXCEPTION =======================') | |
print('=======================') | |
print('=======================') | |
print('=======================') | |
log = open("log.txt", "a") | |
log.write('EXCEPTION! ' + '[' + datetime.datetime.now().strftime("%Y-%m-%d %H:%M") + '] : ' + str(Error)) | |
log.close() | |
def getFileExtension(fileNamePath): | |
return os.path.splitext(fileNamePath)[1] | |
def isFileExists(fileNamePath): | |
return os.path.exists(fileNamePath) | |
def isAllowedFormat(fileExtension): | |
return fileExtension in allowedFileFormats | |
def getImageHashByPath(fileNamePath): | |
return imagehash.average_hash(Image.open(fileNamePath)) | |
def removeFile(fileNamePath): | |
if os.path.exists(fileNamePath): | |
print('Removing file with wrong extension: ', fileNamePath) | |
os.remove(fileNamePath) | |
def absoluteFilePaths(sortByDate = True): | |
paths = [] | |
for root, dirs, files in os.walk(os.path.abspath(folderFailesPath)): | |
for file in files: | |
paths.append(os.path.join(root, file)) | |
if True == sortByDate: | |
paths.sort(key=os.path.getctime) | |
return paths | |
async def removeSimilarFiles(sortByDate = True): | |
files = absoluteFilePaths(sortByDate) | |
# print(dirFileNames) | |
# print(files) | |
print('=======================') | |
print('========================= Total files to check for current iteration: ', len(files)) | |
print('=======================') | |
removedFilesForIteration = 0 | |
for compareFileName in files: | |
compareFileName = compareFileName | |
print('=======================') | |
print('========================= Comparing file name: ', compareFileName) | |
print('=======================') | |
# print('Is file exists file: ', isFileExists(compareFileName)) | |
if not isFileExists(compareFileName): | |
continue | |
fileExtension = getFileExtension(compareFileName) | |
if not isAllowedFormat(fileExtension): | |
removeFile(compareFileName) | |
continue | |
# print('Comparing file name: ', compareFileName) | |
compareFileHash = getImageHashByPath(compareFileName) | |
for withFileName in absoluteFilePaths(sortByDate): | |
withFileName = withFileName | |
# print('Is file WITH exists file: ', os.path.exists(withFileName)) | |
# print('with file name: ', withFileName) | |
if not isFileExists(withFileName): | |
continue | |
fileWithExtension = getFileExtension(withFileName) | |
if not isAllowedFormat(fileWithExtension): | |
removeFile(withFileName) | |
continue | |
if compareFileName == withFileName or fileExtension != fileWithExtension: | |
continue | |
print('==========') | |
print('========== with file name: ', withFileName) | |
print('==========') | |
compareWithFileHash = getImageHashByPath(withFileName) | |
maxPercentOfDifference = 0 | |
if maxPercentOfDifference != compareFileHash - compareWithFileHash: | |
continue | |
removedFilesForIteration += 1 | |
removeFile(withFileName) | |
print('Total removed files per check iteration: ', removedFilesForIteration) | |
print('=======================') | |
print('======================= END CHECK =======================') | |
print('=======================') | |
return removedFilesForIteration | |
while True: | |
print('=======================') | |
print('======================= STARTING CHECK =======================') | |
print('=======================') | |
try: | |
asyncio.run(removeSimilarFiles(bool(random.getrandbits(1)))) | |
except Exception as Error: | |
writeLog(Error) | |
pass | |
numberofChecks += 1 | |
print('=======================') | |
print('Total check iterations: ', numberofChecks) | |
print('=======================') | |
time.sleep(600) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment