Last active
April 3, 2019 05:51
-
-
Save clayg/1f5e15a8c31ca467b94d85d25ae6f0fa to your computer and use it in GitHub Desktop.
Poke around at expired objects
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# This looks at what's expired in the expiring objects queue, asks the object | |
# servers for the current real state of the objects, and then logs it's | |
# findings. If the object has been deleted but the tombstone timestamp is | |
# with-in a small delta of the queue entry because of lp bug #1741371 - we'll | |
# clean-up those queue entries. | |
import argparse | |
import logging | |
import sys | |
import itertools | |
import time | |
import eventlet | |
from swift.common.storage_policy import POLICIES | |
from swift.common.direct_client import direct_head_object, \ | |
DirectClientException | |
from swift.common.internal_client import InternalClient, UnexpectedResponse | |
from swift.container.sync import ic_conf_body | |
from swift.container.reconciler import direct_delete_container_entry | |
from swift.common.wsgi import ConfigString | |
START = time.time() | |
EXP_ACCOUNT = ".expiring_objects" | |
SWIFT_DIR = "/etc/swift" | |
STORAGE_POLICY_CACHE = {} | |
def get_storage_policy(swift, acc, con): | |
# First, figure out the storage policy so we can ask the object servers | |
# about this guy | |
try: | |
storage_policy_name = STORAGE_POLICY_CACHE[(acc, con)] | |
except KeyError: | |
container_metadata = { | |
k.lower(): v | |
for k, v in swift.get_container_metadata(acc, con).items()} | |
storage_policy_name = STORAGE_POLICY_CACHE[(acc, con)] = \ | |
container_metadata["x-storage-policy"] | |
# Now go bother each node individually and see what's up | |
return POLICIES.get_by_name(storage_policy_name) | |
def pop_queue(swift, con, obj): | |
logging.info('Removing queue entry %s/%s', con, obj) | |
direct_delete_container_entry(swift.container_ring, | |
EXP_ACCOUNT, con, obj) | |
def handle_object(swift, exp_con_name, exp_obj_name, | |
delta=5, extra_nodes=0, now=START): | |
ts, acc, con, obj = parse_name(exp_obj_name) | |
if int(ts) > now: | |
return | |
logging.info("looking at %s/%s/%s (%s)" % (acc, con, obj, ts)) | |
try: | |
policy = get_storage_policy(swift, acc, con) | |
except UnexpectedResponse as err: | |
logging.info("Container %s/%s: got error %s; skipping", | |
acc, con, err) | |
return | |
ring = POLICIES.get_object_ring(policy.idx, SWIFT_DIR) | |
partition = ring.get_part(acc, con, obj) | |
node_iter = itertools.chain( | |
ring.get_part_nodes(partition), | |
itertools.islice(ring.get_more_nodes(partition), extra_nodes)) | |
for node_idx, node in enumerate(node_iter): | |
should_delete = inspect_object(node_idx, node, partition, | |
acc, con, obj, policy, ts, delta=delta) | |
if should_delete: | |
pop_queue(swift, exp_con_name, exp_obj_name) | |
if should_delete is not None: | |
break | |
def inspect_object(node_idx, node, partition, acc, con, obj, policy, ts, | |
delta=5): | |
""" | |
Make request to object-server direct, if we can a high confidence the | |
expired object row for the object has already been handled we can return | |
True to indicated the expirier row should be deleted. | |
:returns: a tri-state boolean, False means do not pop the queue and quit, | |
True mans pop the queue and quit, None means keep searching. | |
""" | |
req_headers = {} | |
# tell the object server not to check X-Delete-At and just show me | |
# what it's got | |
req_headers["X-Backend-Replication"] = "true" | |
# look in the right place for the data | |
req_headers["X-Backend-Storage-Policy-Index"] = str(policy.idx) | |
# be filterable out of the logs | |
req_headers["User-Agent"] = "expirer-check.py" | |
logging.info("====================") | |
logging.info("Asking about %s/%s/%s node %d (%s:%d/%s)" % ( | |
acc, con, obj, node_idx, node["ip"], node["port"], node["device"])) | |
try: | |
resp_headers = direct_head_object(node, partition, acc, con, obj, | |
headers=req_headers) | |
except DirectClientException as err: | |
backend_timestamp = err.http_headers.get('x-backend-timestamp') | |
logging.info("%s/%s/%s gave status %d (%s)", acc, con, obj, | |
err.http_status, | |
err.http_headers.get('x-backend-timestamp')) | |
if (err.http_status == 404 and backend_timestamp and | |
abs(float(backend_timestamp) - float(ts)) < delta): | |
return True | |
except eventlet.Timeout: | |
logging.info("Timeout talking to node") | |
except Exception as err: | |
logging.info("Unexpected exception: %s" % (err,)) | |
else: | |
for key, value in sorted(resp_headers.items()): | |
logging.info("%s: %s" % (key, value)) | |
if ts == resp_headers.get('X-Delete-At'): | |
logging.info("\nX-Delete-At matches") | |
return False | |
else: | |
logging.info("\nX-Delete-At does not match (queue: %s object: %r)" | |
% (ts, resp_headers.get('X-Delete-At'))) | |
def parse_name(obj_name): | |
""" | |
Now we've got an object name that looks like this: | |
1515008817-AUTH_test/test/slow_chunked_upload.py | |
Split that up. | |
""" | |
ts, path = obj_name.split("-", 1) | |
acc, con, obj = path.split("/", 2) | |
return (ts, acc, con, obj) | |
def list_expiring_objects(swift, now=START): | |
""" | |
List the objects in the expirer queue. | |
Returns an iterable of 4-tuples (timestamp, acc, con, obj). | |
It's all just strings. | |
""" | |
for con_item in swift.iter_containers(EXP_ACCOUNT): | |
con_name = con_item["name"] | |
if int(con_name) > now: | |
return | |
for obj_item in swift.iter_objects(EXP_ACCOUNT, con_name): | |
obj_name = obj_item["name"] | |
logging.info("found %s/%s/%s" % (EXP_ACCOUNT, con_name, obj_name)) | |
yield con_name, obj_name | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-n', type=int, | |
default=0, | |
help="Number of objects to inspect") | |
parser.add_argument('--verbose', action='store_true', | |
help='make debug noise') | |
parser.add_argument( | |
'--concurrency', type=int, default=4, | |
help="How much of this do you want to do at once?") | |
parser.add_argument( | |
'--extra-nodes', type=int, default=0, | |
help="Number of handoffs (non-primary) nodes to inspect") | |
parser.add_argument( | |
'--delta', type=float, default=5.0, | |
help="How much clock skew do we allow for expired objects") | |
args = parser.parse_args() | |
if args.verbose: | |
level = logging.DEBUG | |
else: | |
level = logging.INFO | |
logging.basicConfig(level=level) | |
swift = InternalClient(ConfigString(ic_conf_body), 'test', 1) | |
limit = args.n | |
inspected = 0 | |
now = time.time() | |
pool = eventlet.GreenPool(args.concurrency) | |
for (exp_con_name, exp_obj_name) in list_expiring_objects(swift, now=now): | |
pool.spawn_n(handle_object, swift, exp_con_name, exp_obj_name, | |
extra_nodes=args.extra_nodes, now=now, delta=args.delta) | |
inspected += 1 | |
if limit > 0 and inspected >= limit: | |
logging.info("\nInspected %d objects; exiting as requested", | |
inspected) | |
break | |
pool.waitall() | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment