Skip to content

Instantly share code, notes, and snippets.

@clayg
Last active April 3, 2019 05:51
Show Gist options
  • Save clayg/1f5e15a8c31ca467b94d85d25ae6f0fa to your computer and use it in GitHub Desktop.
Save clayg/1f5e15a8c31ca467b94d85d25ae6f0fa to your computer and use it in GitHub Desktop.
Poke around at expired objects
#!/usr/bin/env python
#
# This looks at what's expired in the expiring objects queue, asks the object
# servers for the current real state of the objects, and then logs it's
# findings. If the object has been deleted but the tombstone timestamp is
# with-in a small delta of the queue entry because of lp bug #1741371 - we'll
# clean-up those queue entries.
import argparse
import logging
import sys
import itertools
import time
import eventlet
from swift.common.storage_policy import POLICIES
from swift.common.direct_client import direct_head_object, \
DirectClientException
from swift.common.internal_client import InternalClient, UnexpectedResponse
from swift.container.sync import ic_conf_body
from swift.container.reconciler import direct_delete_container_entry
from swift.common.wsgi import ConfigString
START = time.time()
EXP_ACCOUNT = ".expiring_objects"
SWIFT_DIR = "/etc/swift"
STORAGE_POLICY_CACHE = {}
def get_storage_policy(swift, acc, con):
# First, figure out the storage policy so we can ask the object servers
# about this guy
try:
storage_policy_name = STORAGE_POLICY_CACHE[(acc, con)]
except KeyError:
container_metadata = {
k.lower(): v
for k, v in swift.get_container_metadata(acc, con).items()}
storage_policy_name = STORAGE_POLICY_CACHE[(acc, con)] = \
container_metadata["x-storage-policy"]
# Now go bother each node individually and see what's up
return POLICIES.get_by_name(storage_policy_name)
def pop_queue(swift, con, obj):
logging.info('Removing queue entry %s/%s', con, obj)
direct_delete_container_entry(swift.container_ring,
EXP_ACCOUNT, con, obj)
def handle_object(swift, exp_con_name, exp_obj_name,
delta=5, extra_nodes=0, now=START):
ts, acc, con, obj = parse_name(exp_obj_name)
if int(ts) > now:
return
logging.info("looking at %s/%s/%s (%s)" % (acc, con, obj, ts))
try:
policy = get_storage_policy(swift, acc, con)
except UnexpectedResponse as err:
logging.info("Container %s/%s: got error %s; skipping",
acc, con, err)
return
ring = POLICIES.get_object_ring(policy.idx, SWIFT_DIR)
partition = ring.get_part(acc, con, obj)
node_iter = itertools.chain(
ring.get_part_nodes(partition),
itertools.islice(ring.get_more_nodes(partition), extra_nodes))
for node_idx, node in enumerate(node_iter):
should_delete = inspect_object(node_idx, node, partition,
acc, con, obj, policy, ts, delta=delta)
if should_delete:
pop_queue(swift, exp_con_name, exp_obj_name)
if should_delete is not None:
break
def inspect_object(node_idx, node, partition, acc, con, obj, policy, ts,
delta=5):
"""
Make request to object-server direct, if we can a high confidence the
expired object row for the object has already been handled we can return
True to indicated the expirier row should be deleted.
:returns: a tri-state boolean, False means do not pop the queue and quit,
True mans pop the queue and quit, None means keep searching.
"""
req_headers = {}
# tell the object server not to check X-Delete-At and just show me
# what it's got
req_headers["X-Backend-Replication"] = "true"
# look in the right place for the data
req_headers["X-Backend-Storage-Policy-Index"] = str(policy.idx)
# be filterable out of the logs
req_headers["User-Agent"] = "expirer-check.py"
logging.info("====================")
logging.info("Asking about %s/%s/%s node %d (%s:%d/%s)" % (
acc, con, obj, node_idx, node["ip"], node["port"], node["device"]))
try:
resp_headers = direct_head_object(node, partition, acc, con, obj,
headers=req_headers)
except DirectClientException as err:
backend_timestamp = err.http_headers.get('x-backend-timestamp')
logging.info("%s/%s/%s gave status %d (%s)", acc, con, obj,
err.http_status,
err.http_headers.get('x-backend-timestamp'))
if (err.http_status == 404 and backend_timestamp and
abs(float(backend_timestamp) - float(ts)) < delta):
return True
except eventlet.Timeout:
logging.info("Timeout talking to node")
except Exception as err:
logging.info("Unexpected exception: %s" % (err,))
else:
for key, value in sorted(resp_headers.items()):
logging.info("%s: %s" % (key, value))
if ts == resp_headers.get('X-Delete-At'):
logging.info("\nX-Delete-At matches")
return False
else:
logging.info("\nX-Delete-At does not match (queue: %s object: %r)"
% (ts, resp_headers.get('X-Delete-At')))
def parse_name(obj_name):
"""
Now we've got an object name that looks like this:
1515008817-AUTH_test/test/slow_chunked_upload.py
Split that up.
"""
ts, path = obj_name.split("-", 1)
acc, con, obj = path.split("/", 2)
return (ts, acc, con, obj)
def list_expiring_objects(swift, now=START):
"""
List the objects in the expirer queue.
Returns an iterable of 4-tuples (timestamp, acc, con, obj).
It's all just strings.
"""
for con_item in swift.iter_containers(EXP_ACCOUNT):
con_name = con_item["name"]
if int(con_name) > now:
return
for obj_item in swift.iter_objects(EXP_ACCOUNT, con_name):
obj_name = obj_item["name"]
logging.info("found %s/%s/%s" % (EXP_ACCOUNT, con_name, obj_name))
yield con_name, obj_name
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', type=int,
default=0,
help="Number of objects to inspect")
parser.add_argument('--verbose', action='store_true',
help='make debug noise')
parser.add_argument(
'--concurrency', type=int, default=4,
help="How much of this do you want to do at once?")
parser.add_argument(
'--extra-nodes', type=int, default=0,
help="Number of handoffs (non-primary) nodes to inspect")
parser.add_argument(
'--delta', type=float, default=5.0,
help="How much clock skew do we allow for expired objects")
args = parser.parse_args()
if args.verbose:
level = logging.DEBUG
else:
level = logging.INFO
logging.basicConfig(level=level)
swift = InternalClient(ConfigString(ic_conf_body), 'test', 1)
limit = args.n
inspected = 0
now = time.time()
pool = eventlet.GreenPool(args.concurrency)
for (exp_con_name, exp_obj_name) in list_expiring_objects(swift, now=now):
pool.spawn_n(handle_object, swift, exp_con_name, exp_obj_name,
extra_nodes=args.extra_nodes, now=now, delta=args.delta)
inspected += 1
if limit > 0 and inspected >= limit:
logging.info("\nInspected %d objects; exiting as requested",
inspected)
break
pool.waitall()
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment