Last active
February 18, 2019 22:06
-
-
Save clayg/7a975ef3b34828c5ac7db05a519b6e8a to your computer and use it in GitHub Desktop.
This is mostly to deal with https://bugs.launchpad.net/swift/+bug/1651530
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import os | |
import errno | |
from argparse import ArgumentParser | |
from collections import defaultdict | |
import shutil | |
import logging | |
import thread | |
import threading | |
import random | |
from Queue import Queue | |
from swift.common.storage_policy import POLICIES | |
from swift.obj.diskfile import get_data_dir, DiskFileRouter | |
from swift.common.utils import remove_file, drop_privileges | |
# fix monkey-patch lp bug #1380815 | |
logging.threading = threading | |
logging.thread = thread | |
logging._lock = threading.RLock() | |
logging.basicConfig(level=logging.DEBUG) | |
parser = ArgumentParser() | |
parser.add_argument('devices', help='root of devices tree for node', | |
nargs='?', default='/srv/node') | |
parser.add_argument('--workers', help='number of workers', type=int, | |
default=24, ) | |
parser.add_argument('--dry-run', help='do not remove empty partitions', | |
default=False, action='store_true') | |
parser.add_argument('--do-listdir', help='force a get_hashes do_listdir', | |
default=False, action='store_true') | |
parser.add_argument('--policy-index', help='the policy index', | |
type=int, default=0) | |
parser.add_argument('--limit', help='max number of handoff parts to clean', | |
default=None, type=int) | |
parser.add_argument('--user', help='drop privledges to this user', | |
default='swift') | |
class AtomicStats(object): | |
def __init__(self): | |
self.stats = defaultdict(int) | |
self.lock = threading.RLock() | |
def incr(self, key, amount=1): | |
with self.lock: | |
self.stats[key] += amount | |
def __iter__(self): | |
return iter(self.stats.items()) | |
STATS = AtomicStats() | |
def delete_partition(path): | |
shutil.rmtree(path, ignore_errors=True) | |
remove_file(path) | |
def consumer(q, df_mgr, args): | |
while True: | |
entry = q.get() | |
if entry is None: | |
return | |
logging.debug('Checking %r', entry) | |
device, part, full_part_path = entry | |
STATS.incr('count') | |
try: | |
hashed, hashes = df_mgr._get_hashes( | |
device, part, df_mgr.policy, do_listdir=args.do_listdir) | |
except Exception: | |
logging.exception('Unable to read hashes from %r', full_part_path) | |
STATS.incr('failed_skipped') | |
continue | |
STATS.incr('hashed', hashed) | |
if hashes and hashed == 0: | |
initial_hashes = hashes | |
STATS.incr('cache_hashed') | |
logging.warning('Running force re-hash %r', full_part_path) | |
if not args.dry_run: | |
try: | |
remove_file(os.path.join(full_part_path, 'hashes.pkl')) | |
STATS.incr('cache_reset') | |
hashed, hashes = df_mgr._get_hashes( | |
device, part, df_mgr.policy, do_listdir=args.do_listdir) | |
except Exception: | |
logging.exception('Unable to re-hash %r', full_part_path) | |
STATS.incr('failed_skipped') | |
continue | |
STATS.incr('hashed', hashed) | |
if hashes == initial_hashes: | |
STATS.incr('cache_correct') | |
else: | |
STATS.incr('cache_stale') | |
STATS.incr('cache_diff', abs( | |
len(hashes) - len(initial_hashes))) | |
else: | |
STATS.incr('re_hashed') | |
if not hashes: | |
STATS.incr('empty') | |
logging.info('Removing partition %r' % full_part_path) | |
if not args.dry_run: | |
try: | |
delete_partition(full_part_path) | |
except Exception: | |
STATS.incr('failed_skipped') | |
logging.exception('Unable to delete %r', full_part_path) | |
STATS.incr('removed') | |
else: | |
STATS.incr('not_empty') | |
logging.debug('Part %s hashed %r', full_part_path, hashed) | |
def main(): | |
args = parser.parse_args() | |
device_root = args.devices | |
policy = POLICIES[args.policy_index] | |
datadir = get_data_dir(policy) | |
policy.load_ring('/etc/swift') | |
ring = policy.object_ring | |
conf = { | |
'devices': args.devices, | |
'mount_check': False, | |
} | |
df_mgr = DiskFileRouter(conf, logging)[policy] | |
df_mgr.policy = policy | |
dev2parts = defaultdict(set) | |
for replica, part2dev in enumerate(ring._replica2part2dev_id): | |
for part, device_id in enumerate(part2dev): | |
dev2parts[ring.devs[device_id]['device']].add(part) | |
# print dev2parts | |
handoffs = defaultdict(set) | |
device_dirs = os.listdir(device_root) | |
for device_dir in device_dirs: | |
parts_dir = os.path.join(device_root, device_dir, datadir) | |
try: | |
parts = os.listdir(parts_dir) | |
except OSError as e: | |
if e.errno == errno.ENOENT: | |
continue | |
else: | |
raise | |
for part in parts: | |
if not part.isdigit(): | |
continue | |
part = int(part) | |
if part in dev2parts[device_dir]: | |
continue | |
handoffs[device_dir].add(part) | |
device_parts = handoffs.items() | |
random.shuffle(device_parts) | |
drop_privileges(args.user) | |
q = Queue(1000) | |
workers = [] | |
try: | |
for i in range(args.workers): | |
t = threading.Thread(target=consumer, args=(q, df_mgr, args)) | |
t.start() | |
workers.append(t) | |
for device, parts in device_parts: | |
if args.limit is not None: | |
parts = list(parts)[:args.limit] | |
device_path = os.path.join(device_root, device) | |
for part in parts: | |
full_part_path = os.path.join(device_path, datadir, str(part)) | |
entry = os.path.join((device, part, full_part_path)) | |
q.put(entry) | |
finally: | |
for t in workers: | |
q.put(None) | |
for t in workers: | |
t.join() | |
print '\nSTATS:\n' | |
for k, v in sorted(STATS): | |
print '%14s: %s' % (k, v) | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment