Skip to content

Instantly share code, notes, and snippets.

@clayg
Last active February 18, 2019 22:06
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save clayg/7a975ef3b34828c5ac7db05a519b6e8a to your computer and use it in GitHub Desktop.
Save clayg/7a975ef3b34828c5ac7db05a519b6e8a to your computer and use it in GitHub Desktop.
This is mostly to deal with https://bugs.launchpad.net/swift/+bug/1651530
#!/usr/bin/env python
import sys
import os
import errno
from argparse import ArgumentParser
from collections import defaultdict
import shutil
import logging
import thread
import threading
import random
from Queue import Queue
from swift.common.storage_policy import POLICIES
from swift.obj.diskfile import get_data_dir, DiskFileRouter
from swift.common.utils import remove_file, drop_privileges
# fix monkey-patch lp bug #1380815
logging.threading = threading
logging.thread = thread
logging._lock = threading.RLock()
logging.basicConfig(level=logging.DEBUG)
parser = ArgumentParser()
parser.add_argument('devices', help='root of devices tree for node',
nargs='?', default='/srv/node')
parser.add_argument('--workers', help='number of workers', type=int,
default=24, )
parser.add_argument('--dry-run', help='do not remove empty partitions',
default=False, action='store_true')
parser.add_argument('--do-listdir', help='force a get_hashes do_listdir',
default=False, action='store_true')
parser.add_argument('--policy-index', help='the policy index',
type=int, default=0)
parser.add_argument('--limit', help='max number of handoff parts to clean',
default=None, type=int)
parser.add_argument('--user', help='drop privledges to this user',
default='swift')
class AtomicStats(object):
def __init__(self):
self.stats = defaultdict(int)
self.lock = threading.RLock()
def incr(self, key, amount=1):
with self.lock:
self.stats[key] += amount
def __iter__(self):
return iter(self.stats.items())
STATS = AtomicStats()
def delete_partition(path):
shutil.rmtree(path, ignore_errors=True)
remove_file(path)
def consumer(q, df_mgr, args):
while True:
entry = q.get()
if entry is None:
return
logging.debug('Checking %r', entry)
device, part, full_part_path = entry
STATS.incr('count')
try:
hashed, hashes = df_mgr._get_hashes(
device, part, df_mgr.policy, do_listdir=args.do_listdir)
except Exception:
logging.exception('Unable to read hashes from %r', full_part_path)
STATS.incr('failed_skipped')
continue
STATS.incr('hashed', hashed)
if hashes and hashed == 0:
initial_hashes = hashes
STATS.incr('cache_hashed')
logging.warning('Running force re-hash %r', full_part_path)
if not args.dry_run:
try:
remove_file(os.path.join(full_part_path, 'hashes.pkl'))
STATS.incr('cache_reset')
hashed, hashes = df_mgr._get_hashes(
device, part, df_mgr.policy, do_listdir=args.do_listdir)
except Exception:
logging.exception('Unable to re-hash %r', full_part_path)
STATS.incr('failed_skipped')
continue
STATS.incr('hashed', hashed)
if hashes == initial_hashes:
STATS.incr('cache_correct')
else:
STATS.incr('cache_stale')
STATS.incr('cache_diff', abs(
len(hashes) - len(initial_hashes)))
else:
STATS.incr('re_hashed')
if not hashes:
STATS.incr('empty')
logging.info('Removing partition %r' % full_part_path)
if not args.dry_run:
try:
delete_partition(full_part_path)
except Exception:
STATS.incr('failed_skipped')
logging.exception('Unable to delete %r', full_part_path)
STATS.incr('removed')
else:
STATS.incr('not_empty')
logging.debug('Part %s hashed %r', full_part_path, hashed)
def main():
args = parser.parse_args()
device_root = args.devices
policy = POLICIES[args.policy_index]
datadir = get_data_dir(policy)
policy.load_ring('/etc/swift')
ring = policy.object_ring
conf = {
'devices': args.devices,
'mount_check': False,
}
df_mgr = DiskFileRouter(conf, logging)[policy]
df_mgr.policy = policy
dev2parts = defaultdict(set)
for replica, part2dev in enumerate(ring._replica2part2dev_id):
for part, device_id in enumerate(part2dev):
dev2parts[ring.devs[device_id]['device']].add(part)
# print dev2parts
handoffs = defaultdict(set)
device_dirs = os.listdir(device_root)
for device_dir in device_dirs:
parts_dir = os.path.join(device_root, device_dir, datadir)
try:
parts = os.listdir(parts_dir)
except OSError as e:
if e.errno == errno.ENOENT:
continue
else:
raise
for part in parts:
if not part.isdigit():
continue
part = int(part)
if part in dev2parts[device_dir]:
continue
handoffs[device_dir].add(part)
device_parts = handoffs.items()
random.shuffle(device_parts)
drop_privileges(args.user)
q = Queue(1000)
workers = []
try:
for i in range(args.workers):
t = threading.Thread(target=consumer, args=(q, df_mgr, args))
t.start()
workers.append(t)
for device, parts in device_parts:
if args.limit is not None:
parts = list(parts)[:args.limit]
device_path = os.path.join(device_root, device)
for part in parts:
full_part_path = os.path.join(device_path, datadir, str(part))
entry = os.path.join((device, part, full_part_path))
q.put(entry)
finally:
for t in workers:
q.put(None)
for t in workers:
t.join()
print '\nSTATS:\n'
for k, v in sorted(STATS):
print '%14s: %s' % (k, v)
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment