Skip to content

Instantly share code, notes, and snippets.

@BonBonSlick
Created January 5, 2022 21:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BonBonSlick/0a68e39cf2426601342181284f3fd15a to your computer and use it in GitHub Desktop.
Save BonBonSlick/0a68e39cf2426601342181284f3fd15a to your computer and use it in GitHub Desktop.
Multi threads async similar files removal
from PIL import Image
import random
import imagehash
import time
import os
import asyncio
import datetime
allowedFileFormats = '.jpg, .png, .webp, .mp4, .gif';
folderFailesPath = 'C:/Users/BonBon.DESKTOP-B1B9CUP/Downloads/files'
removedFilesForIteration = 0
numberofChecks = 0
def writeLog(Error):
print('=======================')
print('=======================')
print('=======================')
print('======================= EXCEPTION =======================')
print('=======================')
print('=======================')
print('=======================')
log = open("log.txt", "a")
log.write('EXCEPTION! ' + '[' + datetime.datetime.now().strftime("%Y-%m-%d %H:%M") + '] : ' + str(Error))
log.close()
def getFileExtension(fileNamePath):
return os.path.splitext(fileNamePath)[1]
def isFileExists(fileNamePath):
return os.path.exists(fileNamePath)
def isAllowedFormat(fileExtension):
return fileExtension in allowedFileFormats
def getImageHashByPath(fileNamePath):
return imagehash.average_hash(Image.open(fileNamePath))
def removeFile(fileNamePath):
if os.path.exists(fileNamePath):
print('Removing file with wrong extension: ', fileNamePath)
os.remove(fileNamePath)
def absoluteFilePaths(sortByDate = True):
paths = []
for root, dirs, files in os.walk(os.path.abspath(folderFailesPath)):
for file in files:
paths.append(os.path.join(root, file))
if True == sortByDate:
paths.sort(key=os.path.getctime)
return paths
async def removeSimilarFiles(sortByDate = True):
files = absoluteFilePaths(sortByDate)
# print(dirFileNames)
# print(files)
print('=======================')
print('========================= Total files to check for current iteration: ', len(files))
print('=======================')
removedFilesForIteration = 0
for compareFileName in files:
compareFileName = compareFileName
print('=======================')
print('========================= Comparing file name: ', compareFileName)
print('=======================')
# print('Is file exists file: ', isFileExists(compareFileName))
if not isFileExists(compareFileName):
continue
fileExtension = getFileExtension(compareFileName)
if not isAllowedFormat(fileExtension):
removeFile(compareFileName)
continue
# print('Comparing file name: ', compareFileName)
compareFileHash = getImageHashByPath(compareFileName)
for withFileName in absoluteFilePaths(sortByDate):
withFileName = withFileName
# print('Is file WITH exists file: ', os.path.exists(withFileName))
# print('with file name: ', withFileName)
if not isFileExists(withFileName):
continue
fileWithExtension = getFileExtension(withFileName)
if not isAllowedFormat(fileWithExtension):
removeFile(withFileName)
continue
if compareFileName == withFileName or fileExtension != fileWithExtension:
continue
print('==========')
print('========== with file name: ', withFileName)
print('==========')
compareWithFileHash = getImageHashByPath(withFileName)
maxPercentOfDifference = 0
if maxPercentOfDifference != compareFileHash - compareWithFileHash:
continue
removedFilesForIteration += 1
removeFile(withFileName)
print('Total removed files per check iteration: ', removedFilesForIteration)
print('=======================')
print('======================= END CHECK =======================')
print('=======================')
return removedFilesForIteration
while True:
print('=======================')
print('======================= STARTING CHECK =======================')
print('=======================')
try:
asyncio.run(removeSimilarFiles(bool(random.getrandbits(1))))
except Exception as Error:
writeLog(Error)
pass
numberofChecks += 1
print('=======================')
print('Total check iterations: ', numberofChecks)
print('=======================')
time.sleep(600)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment