Skip to content

Instantly share code, notes, and snippets.

@clayg
Created April 20, 2017 19:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save clayg/33f057fb1d9a9d5cc2fd84812e06a9fd to your computer and use it in GitHub Desktop.
Save clayg/33f057fb1d9a9d5cc2fd84812e06a9fd to your computer and use it in GitHub Desktop.
import sys
import os
import errno
from argparse import ArgumentParser
from collections import defaultdict
import shutil
import logging
import thread
import threading
import random
from Queue import Queue
from swift.common.storage_policy import POLICIES
from swift.obj.diskfile import get_data_dir, DiskFileRouter
from swift.common.utils import remove_file, drop_privileges
# fix monkey-patch lp bug #1380815
logging.threading = threading
logging.thread = thread
logging._lock = threading.RLock()
logging.basicConfig(level=logging.DEBUG)
parser = ArgumentParser()
parser.add_argument('devices', help='root of devices tree for node',
nargs='?', default='/srv/node')
parser.add_argument('--workers', help='number of workers', type=int,
default=24, )
parser.add_argument('--dry-run', help='do not remove empty partitions',
default=False, action='store_true')
parser.add_argument('--do-listdir', help='force a get_hashes do_listdir',
default=False, action='store_true')
parser.add_argument('--policy-index', help='the policy index',
type=int, default=0)
parser.add_argument('--limit', help='max number of handoff parts to clean',
default=None, type=int)
parser.add_argument('--user', help='drop privledges to this user',
default='swift')
class AtomicStats(object):
def __init__(self):
self.stats = defaultdict(int)
self.lock = threading.RLock()
def incr(self, key, amount=1):
with self.lock:
self.stats[key] += amount
def __iter__(self):
return iter(self.stats.items())
STATS = AtomicStats()
def delete_partition(path):
shutil.rmtree(path, ignore_errors=True)
remove_file(path)
def consumer(q, df_mgr, args):
while True:
part_path = q.get()
if part_path is None:
return
logging.debug('Checking %r', part_path)
STATS.incr('count')
try:
hashed, hashes = df_mgr._get_hashes(
part_path, do_listdir=args.do_listdir)
except Exception:
logging.exception('Unable to read hashes from %r', part_path)
STATS.incr('failed_skipped')
continue
STATS.incr('hashed', hashed)
if hashes and hashed == 0:
initial_hashes = hashes
STATS.incr('cache_hashed')
logging.warning('Running force re-hash %r', part_path)
if not args.dry_run:
try:
remove_file(os.path.join(part_path, 'hashes.pkl'))
STATS.incr('cache_reset')
hashed, hashes = df_mgr._get_hashes(
part_path, do_listdir=args.do_listdir)
except Exception:
logging.exception('Unable to re-hash %r', part_path)
STATS.incr('failed_skipped')
continue
STATS.incr('hashed', hashed)
if hashes == initial_hashes:
STATS.incr('cache_correct')
else:
STATS.incr('cache_stale')
STATS.incr('cache_diff', abs(
len(hashes) - len(initial_hashes)))
else:
STATS.incr('re_hashed')
if not hashes:
STATS.incr('empty')
logging.info('Removing partition %r' % part_path)
if not args.dry_run:
try:
delete_partition(part_path)
except Exception:
STATS.incr('failed_skipped')
logging.exception('Unable to delete %r', part_path)
STATS.incr('removed')
else:
STATS.incr('not_empty')
logging.debug('Part %s hashed %r', part_path, hashed)
def main():
args = parser.parse_args()
device_root = args.devices
policy = POLICIES[args.policy_index]
datadir = get_data_dir(policy)
policy.load_ring('/etc/swift')
ring = policy.object_ring
conf = {
'devices': args.devices,
'mount_check': False,
}
df_mgr = DiskFileRouter(conf, logging)[policy]
dev2parts = defaultdict(set)
for replica, part2dev in enumerate(ring._replica2part2dev_id):
for part, device_id in enumerate(part2dev):
dev2parts[ring.devs[device_id]['device']].add(part)
# print dev2parts
handoffs = defaultdict(set)
device_dirs = os.listdir(device_root)
for device_dir in device_dirs:
parts_dir = os.path.join(device_root, device_dir, datadir)
try:
parts = os.listdir(parts_dir)
except OSError as e:
if e.errno == errno.ENOENT:
continue
else:
raise
for part in parts:
if not part.isdigit():
continue
part = int(part)
if part in dev2parts[device_dir]:
continue
handoffs[device_dir].add(part)
device_parts = handoffs.items()
random.shuffle(device_parts)
drop_privileges(args.user)
q = Queue(1000)
workers = []
try:
for i in range(args.workers):
t = threading.Thread(target=consumer, args=(q, df_mgr, args))
t.start()
workers.append(t)
for device, parts in device_parts:
if args.limit is not None:
parts = list(parts)[:args.limit]
device_path = os.path.join(device_root, device)
for part in parts:
part_path = os.path.join(device_path, datadir, str(part))
q.put(part_path)
finally:
for t in workers:
q.put(None)
for t in workers:
t.join()
print '\nSTATS:\n'
for k, v in sorted(STATS):
print '%14s: %s' % (k, v)
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment