Skip to content

Instantly share code, notes, and snippets.

@notmyname
Last active October 2, 2015 06:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save notmyname/c9cc9a9506430fe88343 to your computer and use it in GitHub Desktop.
Save notmyname/c9cc9a9506430fe88343 to your computer and use it in GitHub Desktop.
diff --git a/swift/common/utils.py b/swift/common/utils.py
index 03224eb..e0af836 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -738,6 +738,19 @@ class Timestamp(object):
"""
def __init__(self, timestamp, offset=0, delta=0):
+ """
+ Create a new Timestamp.
+
+ :param timestamp: time in seconds since the Epoch, may be any of:
+
+ * a float or integer
+ * normalized/internalized string
+ * another instance of this class (offset is preserved)
+
+ :param offset: the second internal offset vector, an int
+ :param delta: deca-microsecond difference from the base timestamp
+ param, an int
+ """
if isinstance(timestamp, basestring):
parts = timestamp.split('_', 1)
self.timestamp = float(parts.pop(0))
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py
index ca71f2d..7de7a2d 100644
--- a/swift/obj/diskfile.py
+++ b/swift/obj/diskfile.py
@@ -974,30 +974,36 @@ class BaseDiskFileManager(object):
suffixes = (
(os.path.join(partition_path, suffix), suffix)
for suffix in suffixes)
- key_map = {'.data': 'ts_data', '.ts': 'ts_data', '.meta': 'ts_meta'}
+ key_preference = (
+ ('ts_meta', '.meta'),
+ ('ts_data', '.data'),
+ ('ts_data', '.ts'),
+ )
for suffix_path, suffix in suffixes:
for object_hash in self._listdir(suffix_path):
object_path = os.path.join(suffix_path, object_hash)
- newest_valid_file = None
try:
results = self.cleanup_ondisk_files(
object_path, self.reclaim_age, **kwargs)
timestamps = {}
- for ext in key_map:
- f = results.get(ext)
- if f:
- key = key_map[ext]
- ts = self.parse_on_disk_filename(f)['timestamp']
- timestamps[key] = ts
- if timestamps:
- yield (object_path, object_hash, timestamps)
+ for ts_key, ext in key_preference:
+ if ext not in results:
+ continue
+ timestamps[ts_key] = self.parse_on_disk_filename(
+ results[ext])['timestamp']
+ if 'ts_data' not in timestamps:
+ # file sets that do not include a .data or .ts
+ # file can not be opened and therefore can not
+ # be ssync'd
+ continue
+ yield (object_path, object_hash, timestamps)
except AssertionError as err:
self.logger.debug('Invalid file set in %s (%s)' % (
object_path, err))
except DiskFileError as err:
self.logger.debug(
- 'Invalid diskfile filename %r in %r (%s)' % (
- newest_valid_file, object_path, err))
+ 'Invalid diskfile filename in %r (%s)' % (
+ object_path, err))
class BaseDiskFileWriter(object):
@@ -1779,6 +1785,7 @@ class BaseDiskFile(object):
"""
fp = open(data_file, 'rb')
self._datafile_metadata = self._failsafe_read_metadata(fp, data_file)
+ self._metadata = {}
if meta_file:
self._metafile_metadata = self._failsafe_read_metadata(
meta_file, meta_file)
@@ -1786,12 +1793,12 @@ class BaseDiskFile(object):
[(key, val) for key, val in self._datafile_metadata.items()
if key.lower() in DATAFILE_SYSTEM_META
or is_sys_meta('object', key)])
- self._metadata = self._metafile_metadata.copy()
+ self._metadata.update(self._metafile_metadata)
self._metadata.update(sys_metadata)
# diskfile writer added 'name' to metafile, so remove it here
self._metafile_metadata.pop('name', None)
else:
- self._metadata = self._datafile_metadata
+ self._metadata.update(self._datafile_metadata)
if self._name is None:
# If we don't know our name, we were just given a hash dir at
# instantiation, so we'd better validate that the name hashes back
@@ -2055,14 +2062,14 @@ class DiskFileManager(BaseDiskFileManager):
def _verify_on_disk_files(self, accepted_files, **kwargs):
"""
Verify that the final combination of on disk files complies with the
- diskfile contract.
+ replicated diskfile contract.
:param accepted_files: files that have been found and accepted
:returns: True if the file combination is compliant, False otherwise
"""
# mimic legacy behavior - .meta is ignored when .ts is found
if accepted_files.get('.ts'):
- accepted_files['.meta'] = None
+ accepted_files.pop('.meta', None)
data_file, meta_file, ts_file = tuple(
[accepted_files.get(ext)
@@ -2495,7 +2502,7 @@ class ECDiskFileManager(BaseDiskFileManager):
frag_prefs=None, **kwargs):
"""
Verify that the final combination of on disk files complies with the
- diskfile contract.
+ erasure-coded diskfile contract.
:param accepted_files: files that have been found and accepted
:param frag_index: specifies a specific fragment index .data file
diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py
index 412c8c0..5793b3e 100644
--- a/swift/obj/reconstructor.py
+++ b/swift/obj/reconstructor.py
@@ -71,28 +71,27 @@ class RebuildingECDiskFileStream(object):
metadata in the DiskFile interface for ssync.
"""
- def __init__(self, metadata, frag_index, rebuilt_fragment_iter):
+ def __init__(self, datafile_metadata, frag_index, rebuilt_fragment_iter):
# start with metadata from a participating FA
- self.metadata = metadata
+ self.datafile_metadata = datafile_metadata
# the new FA is going to have the same length as others in the set
- self._content_length = self.metadata['Content-Length']
+ self._content_length = self.datafile_metadata['Content-Length']
# update the FI and delete the ETag, the obj server will
# recalc on the other side...
- self.metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index
+ self.datafile_metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index
for etag_key in ('ETag', 'Etag'):
- self.metadata.pop(etag_key, None)
+ self.datafile_metadata.pop(etag_key, None)
self.frag_index = frag_index
self.rebuilt_fragment_iter = rebuilt_fragment_iter
def get_metadata(self):
- return self.metadata
+ return self.datafile_metadata
def get_datafile_metadata(self):
- # not a mistake, datafile_metadata == metadata
- return self.metadata
+ return self.datafile_metadata
@property
def content_length(self):
@@ -222,7 +221,7 @@ class ObjectReconstructor(Daemon):
'full_path': self._full_path(node, part, path, policy)})
return resp
- def reconstruct_fa(self, job, node, metadata):
+ def reconstruct_fa(self, job, node, datafile_metadata):
"""
Reconstructs a fragment archive - this method is called from ssync
after a remote node responds that is missing this object - the local
@@ -231,7 +230,8 @@ class ObjectReconstructor(Daemon):
:param job: job from ssync_sender
:param node: node that we're rebuilding to
- :param metadata: the metadata to attach to the rebuilt archive
+ :param datafile_metadata: the datafile metadata to attach to
+ the rebuilt fragment archive
:returns: a DiskFile like class for use by ssync
:raises DiskFileError: if the fragment archive cannot be reconstructed
"""
@@ -248,7 +248,7 @@ class ObjectReconstructor(Daemon):
headers = self.headers.copy()
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
pile = GreenAsyncPile(len(part_nodes))
- path = metadata['name']
+ path = datafile_metadata['name']
for node in part_nodes:
pile.spawn(self._get_response, node, job['partition'],
path, headers, job['policy'])
@@ -281,14 +281,14 @@ class ObjectReconstructor(Daemon):
'to reconstruct %s with ETag %s' % (
len(responses), job['policy'].ec_ndata,
self._full_path(node, job['partition'],
- metadata['name'], job['policy']),
+ datafile_metadata['name'], job['policy']),
etag))
raise DiskFileError('Unable to reconstruct EC archive')
rebuilt_fragment_iter = self.make_rebuilt_fragment_iter(
responses[:job['policy'].ec_ndata], path, job['policy'],
fi_to_rebuild)
- return RebuildingECDiskFileStream(metadata, fi_to_rebuild,
+ return RebuildingECDiskFileStream(datafile_metadata, fi_to_rebuild,
rebuilt_fragment_iter)
def _reconstruct(self, policy, fragment_payload, frag_index):
@@ -541,18 +541,17 @@ class ObjectReconstructor(Daemon):
"""
df_mgr = self._df_router[job['policy']]
for object_hash, timestamps in objects.items():
- if 'ts_data' in timestamps:
- try:
- df = df_mgr.get_diskfile_from_hash(
- job['local_dev']['device'], job['partition'],
- object_hash, job['policy'],
- frag_index=frag_index)
- df.purge(timestamps['ts_data'], frag_index)
- except DiskFileError:
- self.logger.exception(
- 'Unable to purge DiskFile (%r %r %r)',
- object_hash, timestamps['ts_data'], frag_index)
- continue
+ try:
+ df = df_mgr.get_diskfile_from_hash(
+ job['local_dev']['device'], job['partition'],
+ object_hash, job['policy'],
+ frag_index=frag_index)
+ df.purge(timestamps['ts_data'], frag_index)
+ except DiskFileError:
+ self.logger.exception(
+ 'Unable to purge DiskFile (%r %r %r)',
+ object_hash, timestamps['ts_data'], frag_index)
+ continue
def process_job(self, job):
"""
diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py
index 4b7762b..7e00e84 100644
--- a/swift/obj/ssync_receiver.py
+++ b/swift/obj/ssync_receiver.py
@@ -27,15 +27,18 @@ from swift.common import request_helpers
from swift.common.utils import Timestamp
-def _decode_missing(line):
+def decode_missing(line):
"""
Parse a string of the form generated by
- :py:func:`.ssync_sender._encode_missing` and return a dict with keys
- 'hash', 'ts_data', 'ts_meta'.
+ :py:func:`~swift.obj.ssync_sender.encode_missing` and return a dict
+ with keys ``object_hash``, ``ts_data``, ``ts_meta``.
+
+ The encoder for this line is
+ :py:func:`~swift.obj.ssync_sender.encode_missing`
"""
result = {}
parts = line.split()
- result['hash'], t_data = (urllib.unquote(v) for v in parts[:2])
+ result['object_hash'], t_data = (urllib.unquote(v) for v in parts[:2])
result['ts_data'] = result['ts_meta'] = Timestamp(t_data)
if len(parts) > 2:
# allow for a comma separated list of k:v pairs to future-proof
@@ -47,6 +50,38 @@ def _decode_missing(line):
return result
+def encode_wanted(remote, local):
+ """
+ Compare a remote and local results and generate a wanted line.
+
+ :param remote: a dict, with ts_data and ts_meta keys in the form
+ returned by :py:func:`decode_missing`
+ :param local: a dict, possibly empty, with ts_data and ts_meta keys
+ in the form returned :py:meth:`Receiver._check_local`
+
+ The decoder for this line is
+ :py:func:`~swift.obj.ssync_sender.decode_wanted`
+ """
+
+ want = {}
+ if 'ts_data' in local:
+ # we have something, let's get just the right stuff
+ if remote['ts_data'] > local['ts_data']:
+ want['data'] = True
+ if 'ts_meta' in local and remote['ts_meta'] > local['ts_meta']:
+ want['meta'] = True
+ else:
+ # we got nothing, so we'll take whatever the remote has
+ want['data'] = True
+ want['meta'] = True
+ if want:
+ # this is the inverse of _decode_wanted's key_map
+ key_map = dict(data='d', meta='m')
+ parts = ''.join(v for k, v in sorted(key_map.items()) if want.get(k))
+ return '%s %s' % (urllib.quote(remote['object_hash']), parts)
+ return None
+
+
class Receiver(object):
"""
Handles incoming SSYNC requests to the object server.
@@ -206,41 +241,41 @@ class Receiver(object):
raise swob.HTTPInsufficientStorage(drive=self.device)
self.fp = self.request.environ['wsgi.input']
- def _encode_wanted(self, line):
+ def _check_local(self, object_hash):
"""
- Parse a line received during missing_check to determine remote
- timestamps, compare with local diskfile and return representation
- of wanted parts (or None) for inclusion in response to sender.
+ Parse local diskfile and return results of current
+ representative for comparison to remote.
+
+ :param object_hash: the hash of the remote object being offered
"""
- remote = _decode_missing(line)
- want = []
try:
df = self.diskfile_mgr.get_diskfile_from_hash(
- self.device, self.partition, remote['hash'], self.policy,
- frag_index=self.frag_index)
+ self.device, self.partition, object_hash,
+ self.policy, frag_index=self.frag_index)
except exceptions.DiskFileNotExist:
- want = ('data', 'meta')
- else:
- try:
- df.open()
- except exceptions.DiskFileDeleted as err:
- if err.timestamp < remote['ts_data']:
- want = ('data', 'meta')
- except exceptions.DiskFileError as err:
- want = ('data', 'meta')
- else:
- # we have a local diskfile so be specific about which
- # elements need to be sync'd
- if df.data_timestamp < remote['ts_data']:
- want = ['data']
- if (remote['ts_data'] < remote['ts_meta']
- and df.timestamp < remote['ts_meta']):
- want.append('meta')
- if want:
- # compress wanted parts to just first letters for response
- want = ''.join(word[0] for word in want)
- return '%s %s' % (urllib.quote(remote['hash']), want)
- return None
+ return {}
+ try:
+ df.open()
+ except exceptions.DiskFileDeleted as err:
+ return {'ts_data': err.timestamp}
+ except exceptions.DiskFileError as err:
+ return {}
+ return {
+ 'ts_data': df.data_timestamp,
+ 'ts_meta': df.timestamp,
+ }
+
+ def _check_missing(self, line):
+ """
+ Parse offered object from sender, and compare to local diskfile,
+ responding with proper protocol line to represented needed data
+ or None if in sync.
+
+ Anchor point for tests to mock legacy protocol changes.
+ """
+ remote = decode_missing(line)
+ local = self._check_local(remote['object_hash'])
+ return encode_wanted(remote, local)
def missing_check(self):
"""
@@ -295,8 +330,8 @@ class Receiver(object):
line = self.fp.readline(self.app.network_chunk_size)
if not line or line.strip() == ':MISSING_CHECK: END':
break
- want = self._encode_wanted(line)
- if want is not None:
+ want = self._check_missing(line)
+ if want:
object_hashes.append(want)
yield ':MISSING_CHECK: START\r\n'
if object_hashes:
@@ -384,7 +419,7 @@ class Receiver(object):
if header == 'content-length':
content_length = int(value)
# Establish subrequest body, if needed.
- if method == 'DELETE' or method == 'POST':
+ if method in ('DELETE', 'POST'):
if content_length not in (None, 0):
raise Exception(
'%s subrequest with content-length %s'
diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py
index fb893aa..82a1ae4 100644
--- a/swift/obj/ssync_sender.py
+++ b/swift/obj/ssync_sender.py
@@ -20,19 +20,48 @@ from swift.common import exceptions
from swift.common import http
-def _encode_missing(hash, ts_data, ts_meta=None):
+def encode_missing(object_hash, ts_data, ts_meta=None):
"""
Returns a string representing the object hash, its data file timestamp
and the delta forwards to its metafile timestamp, if non-zero, in the form:
- <hash> <timestamp> m:<hex delta>
+ ``<hash> <timestamp> m:<hex delta>``
+
+ The decoder for this line is
+ :py:func:`~swift.obj.ssync_receiver.decode_missing`
"""
- msg = '%s %s' % (urllib.quote(hash), urllib.quote(ts_data.internal))
+ msg = '%s %s' % (urllib.quote(object_hash), urllib.quote(ts_data.internal))
if ts_meta and ts_meta != ts_data:
delta = ts_meta.raw - ts_data.raw
msg = '%s m:%x' % (msg, delta)
return msg
+def decode_wanted(parts):
+ """
+ Parse missing_check line parts to determine which parts of local
+ diskfile were wanted by the receiver.
+
+ The encoder for parts is
+ :py:func:`~swift.obj.ssync_receiver.encode_wanted`
+ """
+ wanted = {}
+ key_map = dict(d='data', m='meta')
+ if parts:
+ # receiver specified data and/or meta wanted, so use those as
+ # conditions for sending PUT and/or POST subrequests
+ for k in key_map:
+ if k in parts[0]:
+ wanted[key_map[k]] = True
+ if not wanted:
+ # assume legacy receiver which will only accept PUTs. There is no
+ # way to send any meta file content without morphing the timestamp
+ # of either the data or the metadata, so we just send data file
+ # content to a legacy receiver. Once the receiver gets updated we
+ # will be able to send it the meta file content.
+ wanted['data'] = True
+ return wanted
+
+
class Sender(object):
"""
Sends SSYNC requests to the object server.
@@ -235,15 +264,15 @@ class Sender(object):
frag_index=self.job.get('frag_index'))
if self.remote_check_objs is not None:
hash_gen = ifilter(
- lambda path_objhash_timestamp:
- path_objhash_timestamp[1] in
+ lambda path_objhash_timestamps:
+ path_objhash_timestamps[1] in
self.remote_check_objs, hash_gen)
for path, object_hash, timestamps in hash_gen:
self.available_map[object_hash] = timestamps
with exceptions.MessageTimeout(
self.daemon.node_timeout,
'missing_check send line'):
- msg = '%s\r\n' % _encode_missing(object_hash, **timestamps)
+ msg = '%s\r\n' % encode_missing(object_hash, **timestamps)
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'missing_check end'):
@@ -273,29 +302,7 @@ class Sender(object):
break
parts = line.split()
if parts:
- self.send_map[parts[0]] = self._decode_wanted(parts[1:])
-
- def _decode_wanted(self, parts):
- """
- Parse missing_check line parts to determine which parts of local
- diskfile were wanted by the receiver.
- """
- wanted = {}
- key_map = dict(d='data', m='meta')
- if parts:
- # receiver specified data and/or meta wanted, so use those as
- # conditions for sending PUT and/or POST subrequests
- for k in key_map:
- if k in parts[0]:
- wanted[key_map[k]] = True
- if not wanted:
- # assume legacy receiver which will only accept PUTs. There is no
- # way to send any meta file content without morphing the timestamp
- # of either the data or the metadata, so we just send data file
- # content to a legacy receiver. Once the receiver gets updated we
- # will be able to send it the meta file content.
- wanted['data'] = True
- return wanted
+ self.send_map[parts[0]] = decode_wanted(parts[1:])
def updates(self):
"""
@@ -310,8 +317,8 @@ class Sender(object):
self.daemon.node_timeout, 'updates start'):
msg = ':UPDATES: START\r\n'
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
- for hash, want in self.send_map.items():
- object_hash = urllib.unquote(hash)
+ for object_hash, want in self.send_map.items():
+ object_hash = urllib.unquote(object_hash)
try:
df = self.df_mgr.get_diskfile_from_hash(
self.job['device'], self.job['partition'], object_hash,
diff --git a/test/probe/test_object_metadata_replication.py b/test/probe/test_object_metadata_replication.py
index 35a15b9..e7479d5 100644
--- a/test/probe/test_object_metadata_replication.py
+++ b/test/probe/test_object_metadata_replication.py
@@ -257,6 +257,7 @@ class Test(ReplProbeTest):
self.brain.start_handoff_half()
self._assert_consistent_object_metadata()
+ self._assert_consistent_container_dbs()
def test_sysmeta_after_replication_with_subsequent_put(self):
sysmeta = {'x-object-sysmeta-foo': 'older'}
@@ -314,7 +315,9 @@ class Test(ReplProbeTest):
for key in sysmeta2.keys():
self.assertTrue(key in metadata, key)
self.assertEqual(metadata[key], sysmeta2[key])
+
self._assert_consistent_object_metadata()
+ self._assert_consistent_container_dbs()
def test_sysmeta_after_replication_with_subsequent_post(self):
sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'}
diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py
index 31adc9a..1d46f37 100644
--- a/test/unit/obj/test_diskfile.py
+++ b/test/unit/obj/test_diskfile.py
@@ -33,7 +33,7 @@ from shutil import rmtree
from time import time
from tempfile import mkdtemp
from hashlib import md5
-from contextlib import closing, nested
+from contextlib import closing, nested, contextmanager
from gzip import GzipFile
from eventlet import hubs, timeout, tpool
@@ -936,8 +936,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
device, part, policy, **kwargs))
expected = sorted(expected_items)
actual = sorted(hash_items)
- self.assertEqual(actual, expected,
- 'Expected %s but got %s' % (expected, actual))
+ # default list diff easiest to debug
+ self.assertEqual(actual, expected)
def test_yield_hashes_tombstones(self):
ts_iter = (Timestamp(t) for t in itertools.count(int(time())))
@@ -1098,27 +1098,30 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
ts3 = next(ts_iter)
suffix_map = {
'abc': {
- '9373a92d072897b136b3fc06595b4abc': [
+ # only tombstone is yield/sync -able
+ '9333a92d072897b136b3fc06595b4abc': [
ts1.internal + '.ts',
ts2.internal + '.meta'],
},
'456': {
- '9373a92d072897b136b3fc06595b0456': [
+ # only latest metadata timestamp
+ '9444a92d072897b136b3fc06595b0456': [
ts1.internal + '.data',
ts2.internal + '.meta',
ts3.internal + '.meta'],
- '9373a92d072897b136b3fc06595b7456': [
+ # exemplary datadir with .meta
+ '9555a92d072897b136b3fc06595b7456': [
ts1.internal + '.data',
ts2.internal + '.meta'],
},
}
expected = {
# do not expect meta file timestamp with tombstone timestamp
- '9373a92d072897b136b3fc06595b4abc':
+ '9333a92d072897b136b3fc06595b4abc':
{'ts_data': ts1},
- '9373a92d072897b136b3fc06595b0456':
+ '9444a92d072897b136b3fc06595b0456':
{'ts_data': ts1, 'ts_meta': ts3},
- '9373a92d072897b136b3fc06595b7456':
+ '9555a92d072897b136b3fc06595b7456':
{'ts_data': ts1, 'ts_meta': ts2},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected)
@@ -1427,6 +1430,27 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
def test_get_ondisk_files_with_stray_meta(self):
# get_ondisk_files does not tolerate a stray .meta file
+ class_under_test = self._get_diskfile(POLICIES.default)
+
+ @contextmanager
+ def create_files(df, files):
+ os.makedirs(df._datadir)
+ for fname in files:
+ fpath = os.path.join(df._datadir, fname)
+ with open(fpath, 'w') as f:
+ diskfile.write_metadata(f, {'name': df._name,
+ 'Content-Length': 0})
+ yield
+ rmtree(df._datadir, ignore_errors=True)
+
+ # sanity
+ files = [
+ '0000000006.00000#1.data',
+ '0000000006.00000.durable',
+ ]
+ with create_files(class_under_test, files):
+ class_under_test.open()
+
scenarios = [['0000000007.00000.meta'],
['0000000007.00000.meta',
@@ -1440,10 +1464,13 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
'0000000005.00000#1.data']
]
for files in scenarios:
- class_under_test = self._get_diskfile(POLICIES.default)
- with mock.patch('swift.obj.diskfile.os.listdir',
- lambda *args: files):
- self.assertRaises(DiskFileNotExist, class_under_test.open)
+ with create_files(class_under_test, files):
+ try:
+ class_under_test.open()
+ except DiskFileNotExist:
+ continue
+ self.fail('expected DiskFileNotExist opening %s with %r' % (
+ class_under_test.__class__.__name__, files))
def test_parse_on_disk_filename(self):
mgr = self.df_router[POLICIES.default]
@@ -1649,7 +1676,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected)
- # but meta timestamp is not returned if specified frag index
+ # but meta timestamp is *not* returned if specified frag index
# is not found
expected = {
'9373a92d072897b136b3fc06595b4abc': {'ts_data': ts1},
@@ -1754,31 +1781,42 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
ts2 = next(ts_iter)
suffix_map = {
'456': {
- # ok
- '9373a92d072897b136b3fc06595b0456': [
+ # this one is fine
+ '9333a92d072897b136b3fc06595b0456': [
ts1.internal + '#2.data',
ts1.internal + '.durable'],
- # no frag index in data file name
- '9373a92d072897b136b3fc06595b7456': [
+ # missing frag index
+ '9444a92d072897b136b3fc06595b7456': [
ts1.internal + '.data'],
# junk
- '9373a92d072897b136b3fc06595b8456': [
+ '9555a92d072897b136b3fc06595b8456': [
'junk_file'],
- # meta file and bad data file name
- '9373a92d072897b136b3fc06595b9456': [
- ts1.internal + '.data',
+ # missing .durable
+ '9666a92d072897b136b3fc06595b9456': [
+ ts1.internal + '#2.data',
ts2.internal + '.meta'],
- # only meta file
- '9373a92d072897b136b3fc06595ba456': [
+ # .meta files w/o .data files can't be opened, and are ignored
+ '9777a92d072897b136b3fc06595ba456': [
ts1.internal + '.meta'],
# multiple meta files with no data
- '9373a92d072897b136b3fc06595bb456': [
+ '9888a92d072897b136b3fc06595bb456': [
ts1.internal + '.meta',
ts2.internal + '.meta'],
+ # this is good with meta
+ '9999a92d072897b136b3fc06595bb456': [
+ ts1.internal + '#2.data',
+ ts1.internal + '.durable',
+ ts2.internal + '.meta'],
+ # this one is wrong frag index
+ '9aaaa92d072897b136b3fc06595b0456': [
+ ts1.internal + '#7.data',
+ ts1.internal + '.durable'],
},
}
expected = {
- '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1}
+ '9333a92d072897b136b3fc06595b0456': {'ts_data': ts1},
+ '9999a92d072897b136b3fc06595bb456': {'ts_data': ts1,
+ 'ts_meta': ts2},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2)
@@ -2036,7 +2074,8 @@ class DiskFileMixin(BaseDiskFileTestMixin):
def test_get_metadata_not_opened(self):
df = self._simple_get_diskfile()
- self.assertRaises(DiskFileNotOpen, df.get_metadata)
+ with self.assertRaises(DiskFileNotOpen):
+ df.get_metadata()
def test_get_datafile_metadata(self):
ts_iter = make_timestamp_iter()
@@ -2059,7 +2098,8 @@ class DiskFileMixin(BaseDiskFileTestMixin):
def test_get_datafile_metadata_not_opened(self):
df = self._simple_get_diskfile()
- self.assertRaises(DiskFileNotOpen, df.get_datafile_metadata)
+ with self.assertRaises(DiskFileNotOpen):
+ df.get_datafile_metadata()
def test_get_metafile_metadata(self):
ts_iter = make_timestamp_iter()
@@ -2082,17 +2122,14 @@ class DiskFileMixin(BaseDiskFileTestMixin):
def test_get_metafile_metadata_not_opened(self):
df = self._simple_get_diskfile()
- self.assertRaises(DiskFileNotOpen, df.get_metafile_metadata)
+ with self.assertRaises(DiskFileNotOpen):
+ df.get_metafile_metadata()
def test_not_opened(self):
df = self._simple_get_diskfile()
- try:
+ with self.assertRaises(DiskFileNotOpen):
with df:
pass
- except DiskFileNotOpen:
- pass
- else:
- self.fail("Expected DiskFileNotOpen exception")
def test_disk_file_default_disallowed_metadata(self):
# build an object with some meta (at t0+1s)
@@ -3207,7 +3244,8 @@ class DiskFileMixin(BaseDiskFileTestMixin):
def test_data_timestamp_not_open(self):
df = self._simple_get_diskfile()
- self.assertRaises(DiskFileNotOpen, lambda: df.data_timestamp)
+ with self.assertRaises(DiskFileNotOpen):
+ df.data_timestamp
def test_durable_timestamp(self):
ts_1 = self.ts()
diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py
index 70e60cf..9a79c5a 100644
--- a/test/unit/obj/test_ssync_receiver.py
+++ b/test/unit/obj/test_ssync_receiver.py
@@ -2044,42 +2044,122 @@ class TestSsyncRxServer(unittest.TestCase):
class TestModuleMethods(unittest.TestCase):
def test_decode_missing(self):
- hash = '9d41d8cd98f00b204e9800998ecf0abc'
+ object_hash = '9d41d8cd98f00b204e9800998ecf0abc'
ts_iter = make_timestamp_iter()
t_data = ts_iter.next()
t_meta = ts_iter.next()
d_meta_data = t_meta.raw - t_data.raw
# legacy single timestamp string
- msg = '%s %s' % (hash, t_data.internal)
- expected = dict(hash=hash, ts_meta=t_data, ts_data=t_data)
- self.assertEqual(
- expected, ssync_receiver._decode_missing(msg))
+ msg = '%s %s' % (object_hash, t_data.internal)
+ expected = dict(object_hash=object_hash,
+ ts_meta=t_data,
+ ts_data=t_data)
+ self.assertEqual(expected, ssync_receiver.decode_missing(msg))
# hex meta delta encoded as extra message part
- msg = '%s %s m:%x' % (hash, t_data.internal, d_meta_data)
- expected = dict(hash=hash, ts_data=t_data, ts_meta=t_meta)
- self.assertEqual(
- expected, ssync_receiver._decode_missing(msg))
+ msg = '%s %s m:%x' % (object_hash, t_data.internal, d_meta_data)
+ expected = dict(object_hash=object_hash,
+ ts_data=t_data,
+ ts_meta=t_meta)
+ self.assertEqual(expected, ssync_receiver.decode_missing(msg))
# unexpected zero delta is tolerated
- msg = '%s %s m:0' % (hash, t_data.internal)
- expected = dict(hash=hash, ts_meta=t_data, ts_data=t_data)
- self.assertEqual(
- expected, ssync_receiver._decode_missing(msg))
+ msg = '%s %s m:0' % (object_hash, t_data.internal)
+ expected = dict(object_hash=object_hash,
+ ts_meta=t_data,
+ ts_data=t_data)
+ self.assertEqual(expected, ssync_receiver.decode_missing(msg))
# unexpected subparts in timestamp delta part are tolerated
- msg = '%s %s c:12345,m:%x,junk' % (hash, t_data.internal, d_meta_data)
- expected = dict(
- hash=hash, ts_meta=t_meta, ts_data=t_data)
+ msg = '%s %s c:12345,m:%x,junk' % (object_hash,
+ t_data.internal,
+ d_meta_data)
+ expected = dict(object_hash=object_hash,
+ ts_meta=t_meta,
+ ts_data=t_data)
self.assertEqual(
- expected, ssync_receiver._decode_missing(msg))
+ expected, ssync_receiver.decode_missing(msg))
# extra message parts tolerated
- msg = '%s %s m:%x future parts' % (hash, t_data.internal, d_meta_data)
- expected = dict(hash=hash, ts_meta=t_meta, ts_data=t_data)
- self.assertEqual(
- expected, ssync_receiver._decode_missing(msg))
+ msg = '%s %s m:%x future parts' % (object_hash,
+ t_data.internal,
+ d_meta_data)
+ expected = dict(object_hash=object_hash,
+ ts_meta=t_meta,
+ ts_data=t_data)
+ self.assertEqual(expected, ssync_receiver.decode_missing(msg))
+
+ def test_encode_wanted(self):
+ ts_iter = make_timestamp_iter()
+ old_t_data = ts_iter.next()
+ t_data = ts_iter.next()
+ old_t_meta = ts_iter.next()
+ t_meta = ts_iter.next()
+
+ remote = {
+ 'object_hash': 'theremotehash',
+ 'ts_data': t_data,
+ 'ts_meta': t_meta,
+ }
+
+ # missing
+ local = {}
+ expected = 'theremotehash dm'
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local),
+ expected)
+
+ # in-sync
+ local = {
+ 'ts_data': t_data,
+ 'ts_meta': t_meta,
+ }
+ expected = None
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local),
+ expected)
+
+ # out-of-sync
+ local = {
+ 'ts_data': old_t_data,
+ 'ts_meta': old_t_meta,
+ }
+ expected = 'theremotehash dm'
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local),
+ expected)
+
+ # old data
+ local = {
+ 'ts_data': old_t_data,
+ 'ts_meta': t_meta,
+ }
+ expected = 'theremotehash d'
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local),
+ expected)
+
+ # old metadata
+ local = {
+ 'ts_data': t_data,
+ 'ts_meta': old_t_meta,
+ }
+ expected = 'theremotehash m'
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local),
+ expected)
+
+ # in-sync tombstone
+ local = {
+ 'ts_data': t_data,
+ }
+ expected = None
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local),
+ expected)
+
+ # old tombstone
+ local = {
+ 'ts_data': old_t_data,
+ }
+ expected = 'theremotehash d'
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local),
+ expected)
if __name__ == '__main__':
diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py
index a072c8d..4af25c1 100644
--- a/test/unit/obj/test_ssync_sender.py
+++ b/test/unit/obj/test_ssync_sender.py
@@ -487,6 +487,53 @@ class TestSender(BaseTestSender):
'1380144474.44444')])
self.assertEqual(expected_map, candidates)
+ def test_call_and_missing_check_metadata_legacy_response(self):
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
+ if device == 'dev' and partition == '9' and suffixes == ['abc'] \
+ and policy == POLICIES.legacy:
+ yield (
+ '/srv/node/dev/objects/9/abc/'
+ '9d41d8cd98f00b204e9800998ecf0abc',
+ '9d41d8cd98f00b204e9800998ecf0abc',
+ {'ts_data': Timestamp(1380144470.00000),
+ 'ts_meta': Timestamp(1380155570.00005)})
+ else:
+ raise Exception(
+ 'No match for %r %r %r' % (device, partition, suffixes))
+
+ self.sender.connection = FakeConnection()
+ self.sender.node = {}
+ self.sender.job = {
+ 'device': 'dev',
+ 'partition': '9',
+ 'policy': POLICIES.legacy,
+ 'frag_index': 0,
+ }
+ self.sender.suffixes = ['abc']
+ self.sender.response = FakeResponse(
+ chunk_body=(
+ ':MISSING_CHECK: START\r\n'
+ '9d41d8cd98f00b204e9800998ecf0abc\r\n'
+ ':MISSING_CHECK: END\r\n'
+ ':UPDATES: START\r\n'
+ ':UPDATES: END\r\n'
+ ))
+ self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
+ self.sender.connect = mock.MagicMock()
+ self.sender.df_mgr.get_diskfile_from_hash = mock.MagicMock()
+ self.sender.disconnect = mock.MagicMock()
+ success, candidates = self.sender()
+ self.assertTrue(success)
+ found_post = found_put = False
+ for chunk in self.sender.connection.sent:
+ if 'POST' in chunk:
+ found_post = True
+ if 'PUT' in chunk:
+ found_put = True
+ self.assertFalse(found_post)
+ self.assertTrue(found_put)
+ self.assertEqual(self.sender.failures, 0)
+
def test_call_and_missing_check(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
@@ -501,6 +548,7 @@ class TestSender(BaseTestSender):
'No match for %r %r %r' % (device, partition, suffixes))
self.sender.connection = FakeConnection()
+ self.sender.node = {}
self.sender.job = {
'device': 'dev',
'partition': '9',
@@ -511,7 +559,7 @@ class TestSender(BaseTestSender):
self.sender.response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
- '9d41d8cd98f00b204e9800998ecf0abc\r\n'
+ '9d41d8cd98f00b204e9800998ecf0abc d\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
@@ -578,13 +626,13 @@ class TestSender(BaseTestSender):
'policy': POLICIES.legacy,
'frag_index': 0,
}
- self.sender = ssync_sender.Sender(self.daemon, None, job, ['abc'],
+ self.sender = ssync_sender.Sender(self.daemon, {}, job, ['abc'],
['9d41d8cd98f00b204e9800998ecf0abc'])
self.sender.connection = FakeConnection()
self.sender.response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
- '9d41d8cd98f00b204e9800998ecf0abc\r\n'
+ '9d41d8cd98f00b204e9800998ecf0abc d\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
@@ -2317,7 +2365,7 @@ class TestSsyncReplication(TestBaseSsync):
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
- def _legacy_encode_wanted(self, line):
+ def _legacy_check_missing(self, line):
# reproduces behavior of 'legacy' ssync receiver missing_checks()
parts = line.split()
object_hash, timestamp = [urllib.unquote(v) for v in parts[:2]]
@@ -2341,8 +2389,8 @@ class TestSsyncReplication(TestBaseSsync):
return None
# run the sync protocol...
- func = 'swift.obj.ssync_receiver.Receiver._encode_wanted'
- with mock.patch(func, _legacy_encode_wanted):
+ func = 'swift.obj.ssync_receiver.Receiver._check_missing'
+ with mock.patch(func, _legacy_check_missing):
success, in_sync_objs = sender()
self.assertEqual(1, len(in_sync_objs))
@@ -2370,30 +2418,53 @@ class TestSsyncReplication(TestBaseSsync):
class TestModuleMethods(unittest.TestCase):
def test_encode_missing(self):
- hash = '9d41d8cd98f00b204e9800998ecf0abc'
+ object_hash = '9d41d8cd98f00b204e9800998ecf0abc'
ts_iter = make_timestamp_iter()
t_data = ts_iter.next()
t_meta = ts_iter.next()
d_meta_data = t_meta.raw - t_data.raw
# equal data and meta timestamps -> legacy single timestamp string
- expected = '%s %s' % (hash, t_data.internal)
+ expected = '%s %s' % (object_hash, t_data.internal)
self.assertEqual(
expected,
- ssync_sender._encode_missing(hash, t_data, ts_meta=t_data))
+ ssync_sender.encode_missing(object_hash, t_data, ts_meta=t_data))
# newer meta timestamp -> hex data delta encoded as extra message part
- expected = '%s %s m:%x' % (hash, t_data.internal, d_meta_data)
+ expected = '%s %s m:%x' % (object_hash, t_data.internal, d_meta_data)
self.assertEqual(
expected,
- ssync_sender._encode_missing(hash, t_data, ts_meta=t_meta))
+ ssync_sender.encode_missing(object_hash, t_data, ts_meta=t_meta))
# test encode and decode functions invert
- expected = {'hash': hash, 'ts_meta': t_meta, 'ts_data': t_data}
- msg = ssync_sender._encode_missing(**expected)
- actual = ssync_receiver._decode_missing(msg)
+ expected = {'object_hash': object_hash, 'ts_meta': t_meta,
+ 'ts_data': t_data}
+ msg = ssync_sender.encode_missing(**expected)
+ actual = ssync_receiver.decode_missing(msg)
self.assertEqual(expected, actual)
+ def test_decode_wanted(self):
+ parts = ['d']
+ expected = {'data': True}
+ self.assertEqual(ssync_sender.decode_wanted(parts), expected)
+
+ parts = ['m']
+ expected = {'meta': True}
+ self.assertEqual(ssync_sender.decode_wanted(parts), expected)
+
+ parts = ['dm']
+ expected = {'data': True, 'meta': True}
+ self.assertEqual(ssync_sender.decode_wanted(parts), expected)
+
+ # you don't really these next few...
+ parts = ['md']
+ expected = {'data': True, 'meta': True}
+ self.assertEqual(ssync_sender.decode_wanted(parts), expected)
+
+ parts = ['xcy', 'funny', {'business': True}]
+ expected = {'data': True}
+ self.assertEqual(ssync_sender.decode_wanted(parts), expected)
+
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment