Skip to content

Instantly share code, notes, and snippets.

@driskell
Last active November 29, 2020 11:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save driskell/270467628fea76a653b7e3756ab88984 to your computer and use it in GitHub Desktop.
Save driskell/270467628fea76a653b7e3756ab88984 to your computer and use it in GitHub Desktop.
Clean unused catalog images from the Magento 1 media folder. Run inside the media folder. Supports ability to scan attributes with custom backends.
# yum install mysql-connector-python
import ConfigParser
from contextlib import contextmanager
import getopt
import getpass
import io
from mysql.connector import connection
import os
import re
import sys
def CursorContextManager(connection):
@contextmanager
def manager():
cursor = connection.cursor()
try:
yield cursor
except BaseException as err:
print('Rolling back transaction due to exception')
print(err.message)
cursor.close()
connection.rollback()
raise
cursor.close()
connection.commit()
return manager
def usage():
print('Usage: clean_images.py [-h|--host HOST] [-u|--user USER] [-p|--password PASSWORD] [-P|--prompt-password]')
sys.exit(1)
defaults = '''
[client]
host = localhost
'''
config_parser = ConfigParser.RawConfigParser()
config_parser.readfp(io.BytesIO(defaults))
config_parser.read([os.path.expanduser('~/.my.cnf')])
def get_config(option):
value = None
for section in ['client', 'mysql']:
section_value = None
try:
section_value = config_parser.get(section, option)
except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
pass
if section_value is not None:
value = section_value
return value
delete_allowed = False
mysql_host = get_config('host')
mysql_user = get_config('user')
mysql_password = get_config('password')
mysql_database = get_config('database')
mysql_prompt_password = False
custom_backends = {
'product': [],
'category': [],
}
opts, args = getopt.getopt(sys.argv[1:], 'u:h:p:d:Pb:c:D', ['user=', 'password=', 'host=', 'database=', 'prompt-password', 'backend=', 'category-backend=', 'delete'])
for opt, value in opts:
if opt in ('-u', '--user'):
mysql_user = value
elif opt in ('-p', '--password'):
mysql_password = value
elif opt in ('-h', '--host'):
mysql_host = value
elif opt in ('-d', '--database'):
mysql_database = value
elif opt in ('-P', '--prompt-password'):
mysql_prompt_password = True
elif opt in ('-b', '--backend'):
for one_value in value.split(','):
custom_backends['product'].append(one_value)
elif opt in ('-c', '--category-backend'):
for one_value in value.split(','):
custom_backends['category'].append(value)
elif opt in ('-D', '--delete'):
delete_allowed = True
else:
usage()
if mysql_prompt_password:
mysql_password = getpass.getpass('Password:')
if mysql_host == 'localhost':
connection = connection.MySQLConnection(unix_socket='/var/lib/mysql/mysql.sock', user=mysql_user, password=mysql_password, database=mysql_database)
else:
connection = connection.MySQLConnection(host=mysql_host, user=mysql_user, password=mysql_password, database=mysql_database)
cursorContext = CursorContextManager(connection)
_load_attributes = {}
def load_attributes(entity_type, filter_by, filter_value):
if entity_type == 'product':
entity_type_id = 4
elif entity_type == 'category':
entity_type_id = 3
else:
raise 'Invalid entity_type: %s' % entity_type
cache_key = '%s-%s-%s' % (entity_type_id, filter_by, filter_value)
if cache_key in _load_attributes:
return _load_attributes[cache_key]
attributes = {}
with cursorContext() as cursor:
cursor.execute(
'SELECT attribute_id, backend_type'
' FROM eav_attribute'
' WHERE entity_type_id=%%s AND %s=%%s' % filter_by,
(entity_type_id, filter_value)
)
for (attribute_id, backend_type) in cursor:
if backend_type not in attributes:
attributes[backend_type] = []
attributes[backend_type].append(attribute_id)
_load_attributes[cache_key] = attributes
return attributes
def prepare_paths(prefix, path, filenames):
for filename in filenames:
filepath = os.path.join(path, filename)
if filepath.startswith(prefix):
value = filepath[len(prefix):]
yield (value, value)
def check_files(entity_type, filepaths):
current_used, current_unused = 0, 0
filepaths = filepaths.copy()
with cursorContext() as cursor:
if entity_type == 'product':
if len(filepaths) != 0:
parameters = [filename for filename in filepaths]
cursor.execute(
# TABLE SCAN
'SELECT DISTINCT value '
' FROM catalog_product_entity_media_gallery'
' WHERE value IN (%s)'
% ','.join(['%s'] * len(parameters)),
tuple(parameters)
)
for (value,) in cursor:
current_used += 1
del filepaths[value]
for (backend_type, attribute_codes) in load_attributes(entity_type, 'frontend_input', 'media_image').iteritems():
if len(filepaths) != 0:
parameters = [filename for filename in filepaths]
cursor.execute(
# TABLE SCAN
'SELECT DISTINCT value '
' FROM catalog_%s_entity_%s'
' WHERE attribute_id IN (%s) AND value IN (%s)'
% (entity_type, backend_type, ','.join(['%s'] * len(attribute_codes)), ','.join(['%s'] * len(parameters))),
tuple(attribute_codes + parameters)
)
for (value,) in cursor:
current_used += 1
del filepaths[value]
for (backend_type, attribute_codes) in load_attributes(entity_type, 'frontend_input', 'image').iteritems():
if len(filepaths) != 0:
parameters = [filename[1:] for filename in filepaths]
cursor.execute(
# TABLE SCAN
'SELECT DISTINCT value '
' FROM catalog_%s_entity_%s'
' WHERE attribute_id IN (%s) AND value IN (%s)'
% (entity_type, backend_type, ','.join(['%s'] * len(attribute_codes)), ','.join(['%s'] * len(parameters))),
tuple(attribute_codes + parameters)
)
for (value,) in cursor:
current_used += 1
del filepaths['/%s' % value]
for backend_model in custom_backends[entity_type]:
for (backend_type, attribute_codes) in load_attributes(entity_type, 'backend_model', backend_model).iteritems():
if len(filepaths) != 0:
# For simpler code, happy to match _ from the filename as any character, rather to keep too much than to lose what we need
# Match on each since we need to know WHAT matched inside the value
for filename in filepaths.copy():
cursor.execute(
# TABLE SCAN
'SELECT DISTINCT %%s '
' FROM catalog_%s_entity_%s'
' WHERE attribute_id IN (%s) AND value LIKE (%%s)'
% (entity_type, backend_type, ','.join(['%s'] * len(attribute_codes))),
tuple([filename] + attribute_codes + ['%%%s%%' % filename[1:]])
)
for (value,) in cursor:
current_used += 1
del filepaths[value]
current_unused += len(filepaths)
return (current_used, current_unused, filepaths)
def check_cache(entity_type, filepaths):
current_used, current_unused = 0, 0
filepaths = filepaths.copy()
for entry in filepaths.copy():
if entity_type == 'product':
match = re.match(r'/cache/[0-9]+/[^/]+/[^/]+/[^/]+/(.*)', entry)
elif entity_type == 'category':
match = re.match(r'/cache/[^/]+/(.*)', entry)
else:
raise 'Unknown entity_type'
if match is None:
print(' Skipping unknown cache entry: %s' % entry)
del filepaths[entry]
continue
originalfile = 'catalog/%s/%s' % (entity_type, match.group(1))
if os.path.isfile(originalfile):
current_used += 1
del filepaths[entry]
current_unused += len(filepaths)
return (current_used, current_unused, filepaths)
def delete_file(filepath):
if delete_allowed:
os.unlink(filepath)
action = 'Deleted'
else:
action = 'Would delete'
print(' %s: %s' % (action, filepath))
def scan_entity_type(entity_type):
prefix = 'catalog/%s' % entity_type
entity_used, entity_unused = 0, 0
for path, dirnames, filenames in os.walk(prefix):
print('Verifying: %s' % path)
for index in range(0, len(filenames), 100):
filepaths = dict(prepare_paths(prefix, path, filenames[index:index + 100]))
if path.startswith('%s/cache' % prefix):
chunk_used, chunk_unused, chunk_todelete = check_cache(entity_type, filepaths)
else:
chunk_used, chunk_unused, chunk_todelete = check_files(entity_type, filepaths)
entity_used += chunk_used
entity_unused += chunk_unused
print(' Chunk of %d: %d used / %d unused' % (len(filepaths), chunk_used, chunk_unused))
for filename in chunk_todelete:
filepath = '%s%s' % (prefix, filename)
delete_file(filepath)
return entity_used, entity_unused
total_used, total_unused = 0, 0
entity_used, entity_unused = scan_entity_type('category')
total_used += entity_used
total_unused += entity_unused
entity_used, entity_unused = scan_entity_type('product')
total_unused += entity_unused
total_used += entity_used
print('Totals: %d used, %d unused' % (total_used, total_unused))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment