Skip to content

Instantly share code, notes, and snippets.

@brian-pond
Last active September 9, 2022 07:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brian-pond/514ce0a549aa1950ec82728ecbe04cdf to your computer and use it in GitHub Desktop.
Save brian-pond/514ce0a549aa1950ec82728ecbe04cdf to your computer and use it in GitHub Desktop.
""" sql_db_clean.py """
#
# Original code and design belongs to Frappe Framework at https://github.com/frappe/frappe, licensed MIT.
# Copyright (c) 2016-2022 Frappe Technologies Pvt. Ltd. <developers@frappe.io>
#
# I kindly thank the contributors and maintainers for their original, open source work.
#
# The modifications and alterations below are to offer the "trim tables" and "trim database" CLI functions
# to users of Frappe Framework v13 (and possibly earlier versions)
#
# Modified and redistributed by Brian Pond <brian@datahenge.com>
#
# NOTE: Some of the functions below are using Type Hints, which require Python 3.5 or later.
#
# NOTE: Please run this in a TEST environment first, and verify the outcome!
# I offer no warranties or guarantees that this code is safe for you to use.
#
import json
import os
import re
import frappe
from frappe.utils.backups import scheduled_backup
from frappe.utils.commands import render_table
child_table_fields = ("parent", "parentfield", "parenttype")
default_fields = (
"doctype",
"name",
"owner",
"creation",
"modified",
"modified_by",
"docstatus",
"idx",
)
optional_fields = ("_user_tags", "_comments", "_assign", "_liked_by", "_seen")
table_fields = ("Table", "Table MultiSelect")
def trim_table(doctype_name: str, dry_run=True):
"""
Purpose:
Given a single DocType's name (e.g. Customer), remove SQL columns that are not DocFields.
Usage:
bench execute --args "Customer" app_name.module_name.sql_db_clean.trim_table
bench execute --args "['Customer', False]" app_name.module_name.sql_db_clean.trim_table
"""
frappe.cache().hdel("table_columns", f"tab{doctype_name}")
columns = frappe.db.get_table_columns(doctype_name)
fields = frappe.get_meta(doctype_name, cached=False).get_fieldnames_with_value()
def _is_internal(field):
ignore_fields = default_fields + optional_fields + child_table_fields
return field not in ignore_fields and not field.startswith("_")
columns_to_remove = [f for f in list(set(columns) - set(fields)) if _is_internal(f)]
dropped_columns = columns_to_remove[:]
if columns_to_remove:
column_names_string = "', '".join(columns_to_remove)
if dry_run:
print(f"Dry Run. The following columns would be dropped from DocType '{doctype_name}': {column_names_string}")
else:
sql_suffix = ", ".join(f"DROP `{c}`" for c in columns_to_remove)
frappe.db.sql_ddl(f"ALTER TABLE `tab{doctype_name}` {sql_suffix}")
print(f"Dropped the following columns from DocType '{doctype_name}': {column_names_string}")
return dropped_columns
return None
def trim_tables(doctype_name: str=None, dry_run=False, quiet=False):
"""
Description:
Removes SQL database columns that are not DocFields (standard or custom)
Useful because removing a field in a DocType doesn't automatically delete the db field.
Usage:
bench execute --args "['Customer', False, False]" app_name.module_name.sql_db_clean.trim_tables
bench execute --args "[None, False, False]" app_name.module_name.sql_db_clean.trim_tables
"""
updated_tables = {}
filters = { "issingle": 0 }
if doctype_name:
filters["name"] = doctype_name
doctype_names = frappe.get_all("DocType", filters=filters, pluck="name", order_by="name")
for index, each_doctype in enumerate(doctype_names):
try:
if index % 100 == 0:
print(f"Iteration #{index}")
dropped_columns = trim_table(each_doctype, dry_run=dry_run)
if dropped_columns:
updated_tables[each_doctype] = dropped_columns
except frappe.db.TableMissingError:
if quiet:
continue
print(f"Ignoring missing table for DocType: {each_doctype}")
print(f"Consider removing record in the DocType table for {each_doctype}")
except Exception as ex:
if not quiet:
print(f"{ex}")
return updated_tables
def trim_tables_by_site(site_name: str, dry_run: bool=True, result_format: str="json", no_backup=True):
"""
Description:
Just a wrapper around 'trim_table' offering the ability to name a Site, and create backups.
Usage:
bench execute --args "['myerp.mydomain.com']" app_name.module_name.sql_db_clean.trim_tables_by_site
"""
def _handle_data(data: dict, result_format="json"):
if result_format == "json":
print(json.dumps({frappe.local.site: data}, indent=1, sort_keys=True))
else:
data = [["DocType", "Fields"]] + [[table, ", ".join(columns)] for table, columns in data.items()]
render_table(data)
frappe.init(site=site_name)
frappe.connect()
if not (no_backup or dry_run):
print(f"Taking SQL database backup for Site = '{frappe.local.site}'")
backup_generator = scheduled_backup(ignore_files=False, force=True)
backup_generator.print_summary()
try:
print("Removing SQL database columns that are not DocFields...")
trimmed_data = trim_tables(dry_run=dry_run, quiet=(result_format == "json"))
if result_format == "table" and not dry_run:
print(f"The following data have been removed from {frappe.local.site}")
_handle_data(trimmed_data, result_format=format)
finally:
frappe.destroy()
def get_standard_tables():
"""
Returns a List of Strings, representing standard Frappe tables like tabDocType, tabDocField, tabDocPerm, etc.
"""
tables = []
sql_file = os.path.join(
"..",
"apps",
"frappe",
"frappe",
"database",
frappe.conf.db_type,
f"framework_{frappe.conf.db_type}.sql",
)
with open(sql_file, encoding='utf-8') as fstream:
content = fstream.read().splitlines()
for line in content:
table_found = re.search(r"""CREATE TABLE ("|`)(.*)?("|`) \(""", line)
if table_found:
tables.append(table_found.group(2))
return tables
def trim_database(site_name: str, dry_run: bool=True, result_format:str ='text', no_backup=True):
"""
Purpose:
Remove SQL tables from the Site database that are not DocTypes.
Usage:
bench execute --args "['myerp.mydomain.com', False, 'text', False]" gauntlet.sql_db_clean.trim_database
"""
ALL_DATA = {} # pylint: disable=invalid-name
frappe.init(site=site_name)
frappe.connect()
TABLES_TO_DROP = [] # pylint: disable=invalid-name
STANDARD_TABLES = get_standard_tables() # pylint: disable=invalid-name
query = """ SELECT table_name FROM information_schema.tables
WHERE table_type = 'BASE TABLE'
AND table_schema = %(db_name)s
"""
queried_result = frappe.db.sql(query, values={"db_name": frappe.conf.db_name})
database_tables = [x[0] for x in queried_result]
doctype_tables = frappe.get_all("DocType", pluck="name")
for each_table in database_tables:
doctype = each_table.replace("tab", "", 1)
if not (doctype in doctype_tables or each_table.startswith("__") or each_table in STANDARD_TABLES):
TABLES_TO_DROP.append(each_table)
if not TABLES_TO_DROP:
if result_format == "text":
print(f"No ghost tables found in {frappe.local.site}")
else:
if not (no_backup or dry_run):
if result_format == "text":
print(f"Backing Up Tables: {', '.join(TABLES_TO_DROP)}")
odb = scheduled_backup(
ignore_conf=False,
include_doctypes=",".join(x.replace("tab", "", 1) for x in TABLES_TO_DROP),
ignore_files=True,
force=True,
)
if result_format == "text":
odb.print_summary()
print("\nTrimming Database")
for table in TABLES_TO_DROP:
if result_format == "text":
print(f"* Dropping Table '{table}'...")
if not dry_run:
frappe.db.sql_ddl(f"drop table `{table}`")
ALL_DATA[frappe.local.site] = TABLES_TO_DROP
frappe.destroy()
if result_format == "json":
print(json.dumps(ALL_DATA, indent=1))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment