Skip to content

Instantly share code, notes, and snippets.

@wdjwxh
Forked from mrqwer88/cl_clean.yaml
Created October 29, 2019 09:22
Show Gist options
  • Save wdjwxh/37c737867b06fe3abf0250c91767c793 to your computer and use it in GitHub Desktop.
Save wdjwxh/37c737867b06fe3abf0250c91767c793 to your computer and use it in GitHub Desktop.
clean old partitions in clickhouse
databases:
- logs: 7
- vms: 9
- statistics: 20
connect_line: '127.0.0.1'
log_file: 'cl_clean_partitions.log'
#!/usr/bin/env python
import logging
import yaml
import argparse
import os
import os.path
import time
import datetime
import clickhouse_driver
parser = argparse.ArgumentParser(description='Clean old clickhouse partitions')
parser.add_argument('--config', dest='config_path', help='path to config path')
parser.add_argument('-s', '--simulate', action='store_true')
parser.add_argument('--debug', action='store_true')
args = parser.parse_args()
#print(args.config_path)
#print(args.simulate)
if args.config_path == None:
print("Need config path via --config flag")
os._exit(1)
if not os.path.isfile(args.config_path):
print("Config file with name \"%s\" doesnt exist" % args.config_path)
os._exit(1)
with open(args.config_path, 'r') as stream:
try:
config = yaml.load(stream)
except yaml.YAMLError as exc:
print(exc)
#print(config)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
fh = logging.FileHandler(config['log_file'])
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
logger.addHandler(fh)
ch = logging.StreamHandler()
if args.debug:
ch.setLevel(logging.DEBUG)
else:
ch.setLevel(logging.WARNING)
#ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)
client = clickhouse_driver.Client(config['connect_line'])
all_databases_raw = client.execute("SHOW databases")
all_databases = []
for d in all_databases_raw:
all_databases.append(d[0])
for db in config['databases']:
dbname = db.keys()[0]
max_days = db[dbname]
logger.info("Try clean db '%s' with %d max days" % (dbname, max_days))
if not dbname in all_databases:
logger.warning("We have not database %s - continue" % dbname)
continue
query = "SHOW TABLES FROM " + dbname
tables = client.execute(query)
for tables_t in tables:
table = tables_t[0]
logger.debug("we found table %s in db %s" % (table, dbname))
query = "select partition from (select partition,max(max_date) as m from system.parts where database = '%s' and table = '%s' group by partition) where m < today()-%d order by partition;" % (dbname,table,max_days)
#print(query)
partitions = client.execute(query)
for partition_t in partitions:
partition = partition_t[0]
partition_delete_command = "ALTER TABLE %s.%s DROP PARTITION %s" % (dbname, table, partition)
#print(partition)
#print(partition_delete_command)
if args.simulate:
print("Simulate only! Not execute \"%s\"" % partition_delete_command)
else:
client.execute(partition_delete_command)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment