-
-
Save notmyname/c9cc9a9506430fe88343 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/swift/common/utils.py b/swift/common/utils.py | |
index 03224eb..e0af836 100644 | |
--- a/swift/common/utils.py | |
+++ b/swift/common/utils.py | |
@@ -738,6 +738,19 @@ class Timestamp(object): | |
""" | |
def __init__(self, timestamp, offset=0, delta=0): | |
+ """ | |
+ Create a new Timestamp. | |
+ | |
+ :param timestamp: time in seconds since the Epoch, may be any of: | |
+ | |
+ * a float or integer | |
+ * normalized/internalized string | |
+ * another instance of this class (offset is preserved) | |
+ | |
+ :param offset: the second internal offset vector, an int | |
+ :param delta: deca-microsecond difference from the base timestamp | |
+ param, an int | |
+ """ | |
if isinstance(timestamp, basestring): | |
parts = timestamp.split('_', 1) | |
self.timestamp = float(parts.pop(0)) | |
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py | |
index ca71f2d..7de7a2d 100644 | |
--- a/swift/obj/diskfile.py | |
+++ b/swift/obj/diskfile.py | |
@@ -974,30 +974,36 @@ class BaseDiskFileManager(object): | |
suffixes = ( | |
(os.path.join(partition_path, suffix), suffix) | |
for suffix in suffixes) | |
- key_map = {'.data': 'ts_data', '.ts': 'ts_data', '.meta': 'ts_meta'} | |
+ key_preference = ( | |
+ ('ts_meta', '.meta'), | |
+ ('ts_data', '.data'), | |
+ ('ts_data', '.ts'), | |
+ ) | |
for suffix_path, suffix in suffixes: | |
for object_hash in self._listdir(suffix_path): | |
object_path = os.path.join(suffix_path, object_hash) | |
- newest_valid_file = None | |
try: | |
results = self.cleanup_ondisk_files( | |
object_path, self.reclaim_age, **kwargs) | |
timestamps = {} | |
- for ext in key_map: | |
- f = results.get(ext) | |
- if f: | |
- key = key_map[ext] | |
- ts = self.parse_on_disk_filename(f)['timestamp'] | |
- timestamps[key] = ts | |
- if timestamps: | |
- yield (object_path, object_hash, timestamps) | |
+ for ts_key, ext in key_preference: | |
+ if ext not in results: | |
+ continue | |
+ timestamps[ts_key] = self.parse_on_disk_filename( | |
+ results[ext])['timestamp'] | |
+ if 'ts_data' not in timestamps: | |
+ # file sets that do not include a .data or .ts | |
+ # file can not be opened and therefore can not | |
+ # be ssync'd | |
+ continue | |
+ yield (object_path, object_hash, timestamps) | |
except AssertionError as err: | |
self.logger.debug('Invalid file set in %s (%s)' % ( | |
object_path, err)) | |
except DiskFileError as err: | |
self.logger.debug( | |
- 'Invalid diskfile filename %r in %r (%s)' % ( | |
- newest_valid_file, object_path, err)) | |
+ 'Invalid diskfile filename in %r (%s)' % ( | |
+ object_path, err)) | |
class BaseDiskFileWriter(object): | |
@@ -1779,6 +1785,7 @@ class BaseDiskFile(object): | |
""" | |
fp = open(data_file, 'rb') | |
self._datafile_metadata = self._failsafe_read_metadata(fp, data_file) | |
+ self._metadata = {} | |
if meta_file: | |
self._metafile_metadata = self._failsafe_read_metadata( | |
meta_file, meta_file) | |
@@ -1786,12 +1793,12 @@ class BaseDiskFile(object): | |
[(key, val) for key, val in self._datafile_metadata.items() | |
if key.lower() in DATAFILE_SYSTEM_META | |
or is_sys_meta('object', key)]) | |
- self._metadata = self._metafile_metadata.copy() | |
+ self._metadata.update(self._metafile_metadata) | |
self._metadata.update(sys_metadata) | |
# diskfile writer added 'name' to metafile, so remove it here | |
self._metafile_metadata.pop('name', None) | |
else: | |
- self._metadata = self._datafile_metadata | |
+ self._metadata.update(self._datafile_metadata) | |
if self._name is None: | |
# If we don't know our name, we were just given a hash dir at | |
# instantiation, so we'd better validate that the name hashes back | |
@@ -2055,14 +2062,14 @@ class DiskFileManager(BaseDiskFileManager): | |
def _verify_on_disk_files(self, accepted_files, **kwargs): | |
""" | |
Verify that the final combination of on disk files complies with the | |
- diskfile contract. | |
+ replicated diskfile contract. | |
:param accepted_files: files that have been found and accepted | |
:returns: True if the file combination is compliant, False otherwise | |
""" | |
# mimic legacy behavior - .meta is ignored when .ts is found | |
if accepted_files.get('.ts'): | |
- accepted_files['.meta'] = None | |
+ accepted_files.pop('.meta', None) | |
data_file, meta_file, ts_file = tuple( | |
[accepted_files.get(ext) | |
@@ -2495,7 +2502,7 @@ class ECDiskFileManager(BaseDiskFileManager): | |
frag_prefs=None, **kwargs): | |
""" | |
Verify that the final combination of on disk files complies with the | |
- diskfile contract. | |
+ erasure-coded diskfile contract. | |
:param accepted_files: files that have been found and accepted | |
:param frag_index: specifies a specific fragment index .data file | |
diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py | |
index 412c8c0..5793b3e 100644 | |
--- a/swift/obj/reconstructor.py | |
+++ b/swift/obj/reconstructor.py | |
@@ -71,28 +71,27 @@ class RebuildingECDiskFileStream(object): | |
metadata in the DiskFile interface for ssync. | |
""" | |
- def __init__(self, metadata, frag_index, rebuilt_fragment_iter): | |
+ def __init__(self, datafile_metadata, frag_index, rebuilt_fragment_iter): | |
# start with metadata from a participating FA | |
- self.metadata = metadata | |
+ self.datafile_metadata = datafile_metadata | |
# the new FA is going to have the same length as others in the set | |
- self._content_length = self.metadata['Content-Length'] | |
+ self._content_length = self.datafile_metadata['Content-Length'] | |
# update the FI and delete the ETag, the obj server will | |
# recalc on the other side... | |
- self.metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index | |
+ self.datafile_metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index | |
for etag_key in ('ETag', 'Etag'): | |
- self.metadata.pop(etag_key, None) | |
+ self.datafile_metadata.pop(etag_key, None) | |
self.frag_index = frag_index | |
self.rebuilt_fragment_iter = rebuilt_fragment_iter | |
def get_metadata(self): | |
- return self.metadata | |
+ return self.datafile_metadata | |
def get_datafile_metadata(self): | |
- # not a mistake, datafile_metadata == metadata | |
- return self.metadata | |
+ return self.datafile_metadata | |
@property | |
def content_length(self): | |
@@ -222,7 +221,7 @@ class ObjectReconstructor(Daemon): | |
'full_path': self._full_path(node, part, path, policy)}) | |
return resp | |
- def reconstruct_fa(self, job, node, metadata): | |
+ def reconstruct_fa(self, job, node, datafile_metadata): | |
""" | |
Reconstructs a fragment archive - this method is called from ssync | |
after a remote node responds that is missing this object - the local | |
@@ -231,7 +230,8 @@ class ObjectReconstructor(Daemon): | |
:param job: job from ssync_sender | |
:param node: node that we're rebuilding to | |
- :param metadata: the metadata to attach to the rebuilt archive | |
+ :param datafile_metadata: the datafile metadata to attach to | |
+ the rebuilt fragment archive | |
:returns: a DiskFile like class for use by ssync | |
:raises DiskFileError: if the fragment archive cannot be reconstructed | |
""" | |
@@ -248,7 +248,7 @@ class ObjectReconstructor(Daemon): | |
headers = self.headers.copy() | |
headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) | |
pile = GreenAsyncPile(len(part_nodes)) | |
- path = metadata['name'] | |
+ path = datafile_metadata['name'] | |
for node in part_nodes: | |
pile.spawn(self._get_response, node, job['partition'], | |
path, headers, job['policy']) | |
@@ -281,14 +281,14 @@ class ObjectReconstructor(Daemon): | |
'to reconstruct %s with ETag %s' % ( | |
len(responses), job['policy'].ec_ndata, | |
self._full_path(node, job['partition'], | |
- metadata['name'], job['policy']), | |
+ datafile_metadata['name'], job['policy']), | |
etag)) | |
raise DiskFileError('Unable to reconstruct EC archive') | |
rebuilt_fragment_iter = self.make_rebuilt_fragment_iter( | |
responses[:job['policy'].ec_ndata], path, job['policy'], | |
fi_to_rebuild) | |
- return RebuildingECDiskFileStream(metadata, fi_to_rebuild, | |
+ return RebuildingECDiskFileStream(datafile_metadata, fi_to_rebuild, | |
rebuilt_fragment_iter) | |
def _reconstruct(self, policy, fragment_payload, frag_index): | |
@@ -541,18 +541,17 @@ class ObjectReconstructor(Daemon): | |
""" | |
df_mgr = self._df_router[job['policy']] | |
for object_hash, timestamps in objects.items(): | |
- if 'ts_data' in timestamps: | |
- try: | |
- df = df_mgr.get_diskfile_from_hash( | |
- job['local_dev']['device'], job['partition'], | |
- object_hash, job['policy'], | |
- frag_index=frag_index) | |
- df.purge(timestamps['ts_data'], frag_index) | |
- except DiskFileError: | |
- self.logger.exception( | |
- 'Unable to purge DiskFile (%r %r %r)', | |
- object_hash, timestamps['ts_data'], frag_index) | |
- continue | |
+ try: | |
+ df = df_mgr.get_diskfile_from_hash( | |
+ job['local_dev']['device'], job['partition'], | |
+ object_hash, job['policy'], | |
+ frag_index=frag_index) | |
+ df.purge(timestamps['ts_data'], frag_index) | |
+ except DiskFileError: | |
+ self.logger.exception( | |
+ 'Unable to purge DiskFile (%r %r %r)', | |
+ object_hash, timestamps['ts_data'], frag_index) | |
+ continue | |
def process_job(self, job): | |
""" | |
diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py | |
index 4b7762b..7e00e84 100644 | |
--- a/swift/obj/ssync_receiver.py | |
+++ b/swift/obj/ssync_receiver.py | |
@@ -27,15 +27,18 @@ from swift.common import request_helpers | |
from swift.common.utils import Timestamp | |
-def _decode_missing(line): | |
+def decode_missing(line): | |
""" | |
Parse a string of the form generated by | |
- :py:func:`.ssync_sender._encode_missing` and return a dict with keys | |
- 'hash', 'ts_data', 'ts_meta'. | |
+ :py:func:`~swift.obj.ssync_sender.encode_missing` and return a dict | |
+ with keys ``object_hash``, ``ts_data``, ``ts_meta``. | |
+ | |
+ The encoder for this line is | |
+ :py:func:`~swift.obj.ssync_sender.encode_missing` | |
""" | |
result = {} | |
parts = line.split() | |
- result['hash'], t_data = (urllib.unquote(v) for v in parts[:2]) | |
+ result['object_hash'], t_data = (urllib.unquote(v) for v in parts[:2]) | |
result['ts_data'] = result['ts_meta'] = Timestamp(t_data) | |
if len(parts) > 2: | |
# allow for a comma separated list of k:v pairs to future-proof | |
@@ -47,6 +50,38 @@ def _decode_missing(line): | |
return result | |
+def encode_wanted(remote, local): | |
+ """ | |
+ Compare a remote and local results and generate a wanted line. | |
+ | |
+ :param remote: a dict, with ts_data and ts_meta keys in the form | |
+ returned by :py:func:`decode_missing` | |
+ :param local: a dict, possibly empty, with ts_data and ts_meta keys | |
+ in the form returned :py:meth:`Receiver._check_local` | |
+ | |
+ The decoder for this line is | |
+ :py:func:`~swift.obj.ssync_sender.decode_wanted` | |
+ """ | |
+ | |
+ want = {} | |
+ if 'ts_data' in local: | |
+ # we have something, let's get just the right stuff | |
+ if remote['ts_data'] > local['ts_data']: | |
+ want['data'] = True | |
+ if 'ts_meta' in local and remote['ts_meta'] > local['ts_meta']: | |
+ want['meta'] = True | |
+ else: | |
+ # we got nothing, so we'll take whatever the remote has | |
+ want['data'] = True | |
+ want['meta'] = True | |
+ if want: | |
+ # this is the inverse of _decode_wanted's key_map | |
+ key_map = dict(data='d', meta='m') | |
+ parts = ''.join(v for k, v in sorted(key_map.items()) if want.get(k)) | |
+ return '%s %s' % (urllib.quote(remote['object_hash']), parts) | |
+ return None | |
+ | |
+ | |
class Receiver(object): | |
""" | |
Handles incoming SSYNC requests to the object server. | |
@@ -206,41 +241,41 @@ class Receiver(object): | |
raise swob.HTTPInsufficientStorage(drive=self.device) | |
self.fp = self.request.environ['wsgi.input'] | |
- def _encode_wanted(self, line): | |
+ def _check_local(self, object_hash): | |
""" | |
- Parse a line received during missing_check to determine remote | |
- timestamps, compare with local diskfile and return representation | |
- of wanted parts (or None) for inclusion in response to sender. | |
+ Parse local diskfile and return results of current | |
+ representative for comparison to remote. | |
+ | |
+ :param object_hash: the hash of the remote object being offered | |
""" | |
- remote = _decode_missing(line) | |
- want = [] | |
try: | |
df = self.diskfile_mgr.get_diskfile_from_hash( | |
- self.device, self.partition, remote['hash'], self.policy, | |
- frag_index=self.frag_index) | |
+ self.device, self.partition, object_hash, | |
+ self.policy, frag_index=self.frag_index) | |
except exceptions.DiskFileNotExist: | |
- want = ('data', 'meta') | |
- else: | |
- try: | |
- df.open() | |
- except exceptions.DiskFileDeleted as err: | |
- if err.timestamp < remote['ts_data']: | |
- want = ('data', 'meta') | |
- except exceptions.DiskFileError as err: | |
- want = ('data', 'meta') | |
- else: | |
- # we have a local diskfile so be specific about which | |
- # elements need to be sync'd | |
- if df.data_timestamp < remote['ts_data']: | |
- want = ['data'] | |
- if (remote['ts_data'] < remote['ts_meta'] | |
- and df.timestamp < remote['ts_meta']): | |
- want.append('meta') | |
- if want: | |
- # compress wanted parts to just first letters for response | |
- want = ''.join(word[0] for word in want) | |
- return '%s %s' % (urllib.quote(remote['hash']), want) | |
- return None | |
+ return {} | |
+ try: | |
+ df.open() | |
+ except exceptions.DiskFileDeleted as err: | |
+ return {'ts_data': err.timestamp} | |
+ except exceptions.DiskFileError as err: | |
+ return {} | |
+ return { | |
+ 'ts_data': df.data_timestamp, | |
+ 'ts_meta': df.timestamp, | |
+ } | |
+ | |
+ def _check_missing(self, line): | |
+ """ | |
+ Parse offered object from sender, and compare to local diskfile, | |
+ responding with proper protocol line to represented needed data | |
+ or None if in sync. | |
+ | |
+ Anchor point for tests to mock legacy protocol changes. | |
+ """ | |
+ remote = decode_missing(line) | |
+ local = self._check_local(remote['object_hash']) | |
+ return encode_wanted(remote, local) | |
def missing_check(self): | |
""" | |
@@ -295,8 +330,8 @@ class Receiver(object): | |
line = self.fp.readline(self.app.network_chunk_size) | |
if not line or line.strip() == ':MISSING_CHECK: END': | |
break | |
- want = self._encode_wanted(line) | |
- if want is not None: | |
+ want = self._check_missing(line) | |
+ if want: | |
object_hashes.append(want) | |
yield ':MISSING_CHECK: START\r\n' | |
if object_hashes: | |
@@ -384,7 +419,7 @@ class Receiver(object): | |
if header == 'content-length': | |
content_length = int(value) | |
# Establish subrequest body, if needed. | |
- if method == 'DELETE' or method == 'POST': | |
+ if method in ('DELETE', 'POST'): | |
if content_length not in (None, 0): | |
raise Exception( | |
'%s subrequest with content-length %s' | |
diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py | |
index fb893aa..82a1ae4 100644 | |
--- a/swift/obj/ssync_sender.py | |
+++ b/swift/obj/ssync_sender.py | |
@@ -20,19 +20,48 @@ from swift.common import exceptions | |
from swift.common import http | |
-def _encode_missing(hash, ts_data, ts_meta=None): | |
+def encode_missing(object_hash, ts_data, ts_meta=None): | |
""" | |
Returns a string representing the object hash, its data file timestamp | |
and the delta forwards to its metafile timestamp, if non-zero, in the form: | |
- <hash> <timestamp> m:<hex delta> | |
+ ``<hash> <timestamp> m:<hex delta>`` | |
+ | |
+ The decoder for this line is | |
+ :py:func:`~swift.obj.ssync_receiver.decode_missing` | |
""" | |
- msg = '%s %s' % (urllib.quote(hash), urllib.quote(ts_data.internal)) | |
+ msg = '%s %s' % (urllib.quote(object_hash), urllib.quote(ts_data.internal)) | |
if ts_meta and ts_meta != ts_data: | |
delta = ts_meta.raw - ts_data.raw | |
msg = '%s m:%x' % (msg, delta) | |
return msg | |
+def decode_wanted(parts): | |
+ """ | |
+ Parse missing_check line parts to determine which parts of local | |
+ diskfile were wanted by the receiver. | |
+ | |
+ The encoder for parts is | |
+ :py:func:`~swift.obj.ssync_receiver.encode_wanted` | |
+ """ | |
+ wanted = {} | |
+ key_map = dict(d='data', m='meta') | |
+ if parts: | |
+ # receiver specified data and/or meta wanted, so use those as | |
+ # conditions for sending PUT and/or POST subrequests | |
+ for k in key_map: | |
+ if k in parts[0]: | |
+ wanted[key_map[k]] = True | |
+ if not wanted: | |
+ # assume legacy receiver which will only accept PUTs. There is no | |
+ # way to send any meta file content without morphing the timestamp | |
+ # of either the data or the metadata, so we just send data file | |
+ # content to a legacy receiver. Once the receiver gets updated we | |
+ # will be able to send it the meta file content. | |
+ wanted['data'] = True | |
+ return wanted | |
+ | |
+ | |
class Sender(object): | |
""" | |
Sends SSYNC requests to the object server. | |
@@ -235,15 +264,15 @@ class Sender(object): | |
frag_index=self.job.get('frag_index')) | |
if self.remote_check_objs is not None: | |
hash_gen = ifilter( | |
- lambda path_objhash_timestamp: | |
- path_objhash_timestamp[1] in | |
+ lambda path_objhash_timestamps: | |
+ path_objhash_timestamps[1] in | |
self.remote_check_objs, hash_gen) | |
for path, object_hash, timestamps in hash_gen: | |
self.available_map[object_hash] = timestamps | |
with exceptions.MessageTimeout( | |
self.daemon.node_timeout, | |
'missing_check send line'): | |
- msg = '%s\r\n' % _encode_missing(object_hash, **timestamps) | |
+ msg = '%s\r\n' % encode_missing(object_hash, **timestamps) | |
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg)) | |
with exceptions.MessageTimeout( | |
self.daemon.node_timeout, 'missing_check end'): | |
@@ -273,29 +302,7 @@ class Sender(object): | |
break | |
parts = line.split() | |
if parts: | |
- self.send_map[parts[0]] = self._decode_wanted(parts[1:]) | |
- | |
- def _decode_wanted(self, parts): | |
- """ | |
- Parse missing_check line parts to determine which parts of local | |
- diskfile were wanted by the receiver. | |
- """ | |
- wanted = {} | |
- key_map = dict(d='data', m='meta') | |
- if parts: | |
- # receiver specified data and/or meta wanted, so use those as | |
- # conditions for sending PUT and/or POST subrequests | |
- for k in key_map: | |
- if k in parts[0]: | |
- wanted[key_map[k]] = True | |
- if not wanted: | |
- # assume legacy receiver which will only accept PUTs. There is no | |
- # way to send any meta file content without morphing the timestamp | |
- # of either the data or the metadata, so we just send data file | |
- # content to a legacy receiver. Once the receiver gets updated we | |
- # will be able to send it the meta file content. | |
- wanted['data'] = True | |
- return wanted | |
+ self.send_map[parts[0]] = decode_wanted(parts[1:]) | |
def updates(self): | |
""" | |
@@ -310,8 +317,8 @@ class Sender(object): | |
self.daemon.node_timeout, 'updates start'): | |
msg = ':UPDATES: START\r\n' | |
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg)) | |
- for hash, want in self.send_map.items(): | |
- object_hash = urllib.unquote(hash) | |
+ for object_hash, want in self.send_map.items(): | |
+ object_hash = urllib.unquote(object_hash) | |
try: | |
df = self.df_mgr.get_diskfile_from_hash( | |
self.job['device'], self.job['partition'], object_hash, | |
diff --git a/test/probe/test_object_metadata_replication.py b/test/probe/test_object_metadata_replication.py | |
index 35a15b9..e7479d5 100644 | |
--- a/test/probe/test_object_metadata_replication.py | |
+++ b/test/probe/test_object_metadata_replication.py | |
@@ -257,6 +257,7 @@ class Test(ReplProbeTest): | |
self.brain.start_handoff_half() | |
self._assert_consistent_object_metadata() | |
+ self._assert_consistent_container_dbs() | |
def test_sysmeta_after_replication_with_subsequent_put(self): | |
sysmeta = {'x-object-sysmeta-foo': 'older'} | |
@@ -314,7 +315,9 @@ class Test(ReplProbeTest): | |
for key in sysmeta2.keys(): | |
self.assertTrue(key in metadata, key) | |
self.assertEqual(metadata[key], sysmeta2[key]) | |
+ | |
self._assert_consistent_object_metadata() | |
+ self._assert_consistent_container_dbs() | |
def test_sysmeta_after_replication_with_subsequent_post(self): | |
sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'} | |
diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py | |
index 31adc9a..1d46f37 100644 | |
--- a/test/unit/obj/test_diskfile.py | |
+++ b/test/unit/obj/test_diskfile.py | |
@@ -33,7 +33,7 @@ from shutil import rmtree | |
from time import time | |
from tempfile import mkdtemp | |
from hashlib import md5 | |
-from contextlib import closing, nested | |
+from contextlib import closing, nested, contextmanager | |
from gzip import GzipFile | |
from eventlet import hubs, timeout, tpool | |
@@ -936,8 +936,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): | |
device, part, policy, **kwargs)) | |
expected = sorted(expected_items) | |
actual = sorted(hash_items) | |
- self.assertEqual(actual, expected, | |
- 'Expected %s but got %s' % (expected, actual)) | |
+ # default list diff easiest to debug | |
+ self.assertEqual(actual, expected) | |
def test_yield_hashes_tombstones(self): | |
ts_iter = (Timestamp(t) for t in itertools.count(int(time()))) | |
@@ -1098,27 +1098,30 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase): | |
ts3 = next(ts_iter) | |
suffix_map = { | |
'abc': { | |
- '9373a92d072897b136b3fc06595b4abc': [ | |
+ # only tombstone is yield/sync -able | |
+ '9333a92d072897b136b3fc06595b4abc': [ | |
ts1.internal + '.ts', | |
ts2.internal + '.meta'], | |
}, | |
'456': { | |
- '9373a92d072897b136b3fc06595b0456': [ | |
+ # only latest metadata timestamp | |
+ '9444a92d072897b136b3fc06595b0456': [ | |
ts1.internal + '.data', | |
ts2.internal + '.meta', | |
ts3.internal + '.meta'], | |
- '9373a92d072897b136b3fc06595b7456': [ | |
+ # exemplary datadir with .meta | |
+ '9555a92d072897b136b3fc06595b7456': [ | |
ts1.internal + '.data', | |
ts2.internal + '.meta'], | |
}, | |
} | |
expected = { | |
# do not expect meta file timestamp with tombstone timestamp | |
- '9373a92d072897b136b3fc06595b4abc': | |
+ '9333a92d072897b136b3fc06595b4abc': | |
{'ts_data': ts1}, | |
- '9373a92d072897b136b3fc06595b0456': | |
+ '9444a92d072897b136b3fc06595b0456': | |
{'ts_data': ts1, 'ts_meta': ts3}, | |
- '9373a92d072897b136b3fc06595b7456': | |
+ '9555a92d072897b136b3fc06595b7456': | |
{'ts_data': ts1, 'ts_meta': ts2}, | |
} | |
self._check_yield_hashes(POLICIES.default, suffix_map, expected) | |
@@ -1427,6 +1430,27 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): | |
def test_get_ondisk_files_with_stray_meta(self): | |
# get_ondisk_files does not tolerate a stray .meta file | |
+ class_under_test = self._get_diskfile(POLICIES.default) | |
+ | |
+ @contextmanager | |
+ def create_files(df, files): | |
+ os.makedirs(df._datadir) | |
+ for fname in files: | |
+ fpath = os.path.join(df._datadir, fname) | |
+ with open(fpath, 'w') as f: | |
+ diskfile.write_metadata(f, {'name': df._name, | |
+ 'Content-Length': 0}) | |
+ yield | |
+ rmtree(df._datadir, ignore_errors=True) | |
+ | |
+ # sanity | |
+ files = [ | |
+ '0000000006.00000#1.data', | |
+ '0000000006.00000.durable', | |
+ ] | |
+ with create_files(class_under_test, files): | |
+ class_under_test.open() | |
+ | |
scenarios = [['0000000007.00000.meta'], | |
['0000000007.00000.meta', | |
@@ -1440,10 +1464,13 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): | |
'0000000005.00000#1.data'] | |
] | |
for files in scenarios: | |
- class_under_test = self._get_diskfile(POLICIES.default) | |
- with mock.patch('swift.obj.diskfile.os.listdir', | |
- lambda *args: files): | |
- self.assertRaises(DiskFileNotExist, class_under_test.open) | |
+ with create_files(class_under_test, files): | |
+ try: | |
+ class_under_test.open() | |
+ except DiskFileNotExist: | |
+ continue | |
+ self.fail('expected DiskFileNotExist opening %s with %r' % ( | |
+ class_under_test.__class__.__name__, files)) | |
def test_parse_on_disk_filename(self): | |
mgr = self.df_router[POLICIES.default] | |
@@ -1649,7 +1676,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): | |
} | |
self._check_yield_hashes(POLICIES.default, suffix_map, expected) | |
- # but meta timestamp is not returned if specified frag index | |
+ # but meta timestamp is *not* returned if specified frag index | |
# is not found | |
expected = { | |
'9373a92d072897b136b3fc06595b4abc': {'ts_data': ts1}, | |
@@ -1754,31 +1781,42 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): | |
ts2 = next(ts_iter) | |
suffix_map = { | |
'456': { | |
- # ok | |
- '9373a92d072897b136b3fc06595b0456': [ | |
+ # this one is fine | |
+ '9333a92d072897b136b3fc06595b0456': [ | |
ts1.internal + '#2.data', | |
ts1.internal + '.durable'], | |
- # no frag index in data file name | |
- '9373a92d072897b136b3fc06595b7456': [ | |
+ # missing frag index | |
+ '9444a92d072897b136b3fc06595b7456': [ | |
ts1.internal + '.data'], | |
# junk | |
- '9373a92d072897b136b3fc06595b8456': [ | |
+ '9555a92d072897b136b3fc06595b8456': [ | |
'junk_file'], | |
- # meta file and bad data file name | |
- '9373a92d072897b136b3fc06595b9456': [ | |
- ts1.internal + '.data', | |
+ # missing .durable | |
+ '9666a92d072897b136b3fc06595b9456': [ | |
+ ts1.internal + '#2.data', | |
ts2.internal + '.meta'], | |
- # only meta file | |
- '9373a92d072897b136b3fc06595ba456': [ | |
+ # .meta files w/o .data files can't be opened, and are ignored | |
+ '9777a92d072897b136b3fc06595ba456': [ | |
ts1.internal + '.meta'], | |
# multiple meta files with no data | |
- '9373a92d072897b136b3fc06595bb456': [ | |
+ '9888a92d072897b136b3fc06595bb456': [ | |
ts1.internal + '.meta', | |
ts2.internal + '.meta'], | |
+ # this is good with meta | |
+ '9999a92d072897b136b3fc06595bb456': [ | |
+ ts1.internal + '#2.data', | |
+ ts1.internal + '.durable', | |
+ ts2.internal + '.meta'], | |
+ # this one is wrong frag index | |
+ '9aaaa92d072897b136b3fc06595b0456': [ | |
+ ts1.internal + '#7.data', | |
+ ts1.internal + '.durable'], | |
}, | |
} | |
expected = { | |
- '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1} | |
+ '9333a92d072897b136b3fc06595b0456': {'ts_data': ts1}, | |
+ '9999a92d072897b136b3fc06595bb456': {'ts_data': ts1, | |
+ 'ts_meta': ts2}, | |
} | |
self._check_yield_hashes(POLICIES.default, suffix_map, expected, | |
frag_index=2) | |
@@ -2036,7 +2074,8 @@ class DiskFileMixin(BaseDiskFileTestMixin): | |
def test_get_metadata_not_opened(self): | |
df = self._simple_get_diskfile() | |
- self.assertRaises(DiskFileNotOpen, df.get_metadata) | |
+ with self.assertRaises(DiskFileNotOpen): | |
+ df.get_metadata() | |
def test_get_datafile_metadata(self): | |
ts_iter = make_timestamp_iter() | |
@@ -2059,7 +2098,8 @@ class DiskFileMixin(BaseDiskFileTestMixin): | |
def test_get_datafile_metadata_not_opened(self): | |
df = self._simple_get_diskfile() | |
- self.assertRaises(DiskFileNotOpen, df.get_datafile_metadata) | |
+ with self.assertRaises(DiskFileNotOpen): | |
+ df.get_datafile_metadata() | |
def test_get_metafile_metadata(self): | |
ts_iter = make_timestamp_iter() | |
@@ -2082,17 +2122,14 @@ class DiskFileMixin(BaseDiskFileTestMixin): | |
def test_get_metafile_metadata_not_opened(self): | |
df = self._simple_get_diskfile() | |
- self.assertRaises(DiskFileNotOpen, df.get_metafile_metadata) | |
+ with self.assertRaises(DiskFileNotOpen): | |
+ df.get_metafile_metadata() | |
def test_not_opened(self): | |
df = self._simple_get_diskfile() | |
- try: | |
+ with self.assertRaises(DiskFileNotOpen): | |
with df: | |
pass | |
- except DiskFileNotOpen: | |
- pass | |
- else: | |
- self.fail("Expected DiskFileNotOpen exception") | |
def test_disk_file_default_disallowed_metadata(self): | |
# build an object with some meta (at t0+1s) | |
@@ -3207,7 +3244,8 @@ class DiskFileMixin(BaseDiskFileTestMixin): | |
def test_data_timestamp_not_open(self): | |
df = self._simple_get_diskfile() | |
- self.assertRaises(DiskFileNotOpen, lambda: df.data_timestamp) | |
+ with self.assertRaises(DiskFileNotOpen): | |
+ df.data_timestamp | |
def test_durable_timestamp(self): | |
ts_1 = self.ts() | |
diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py | |
index 70e60cf..9a79c5a 100644 | |
--- a/test/unit/obj/test_ssync_receiver.py | |
+++ b/test/unit/obj/test_ssync_receiver.py | |
@@ -2044,42 +2044,122 @@ class TestSsyncRxServer(unittest.TestCase): | |
class TestModuleMethods(unittest.TestCase): | |
def test_decode_missing(self): | |
- hash = '9d41d8cd98f00b204e9800998ecf0abc' | |
+ object_hash = '9d41d8cd98f00b204e9800998ecf0abc' | |
ts_iter = make_timestamp_iter() | |
t_data = ts_iter.next() | |
t_meta = ts_iter.next() | |
d_meta_data = t_meta.raw - t_data.raw | |
# legacy single timestamp string | |
- msg = '%s %s' % (hash, t_data.internal) | |
- expected = dict(hash=hash, ts_meta=t_data, ts_data=t_data) | |
- self.assertEqual( | |
- expected, ssync_receiver._decode_missing(msg)) | |
+ msg = '%s %s' % (object_hash, t_data.internal) | |
+ expected = dict(object_hash=object_hash, | |
+ ts_meta=t_data, | |
+ ts_data=t_data) | |
+ self.assertEqual(expected, ssync_receiver.decode_missing(msg)) | |
# hex meta delta encoded as extra message part | |
- msg = '%s %s m:%x' % (hash, t_data.internal, d_meta_data) | |
- expected = dict(hash=hash, ts_data=t_data, ts_meta=t_meta) | |
- self.assertEqual( | |
- expected, ssync_receiver._decode_missing(msg)) | |
+ msg = '%s %s m:%x' % (object_hash, t_data.internal, d_meta_data) | |
+ expected = dict(object_hash=object_hash, | |
+ ts_data=t_data, | |
+ ts_meta=t_meta) | |
+ self.assertEqual(expected, ssync_receiver.decode_missing(msg)) | |
# unexpected zero delta is tolerated | |
- msg = '%s %s m:0' % (hash, t_data.internal) | |
- expected = dict(hash=hash, ts_meta=t_data, ts_data=t_data) | |
- self.assertEqual( | |
- expected, ssync_receiver._decode_missing(msg)) | |
+ msg = '%s %s m:0' % (object_hash, t_data.internal) | |
+ expected = dict(object_hash=object_hash, | |
+ ts_meta=t_data, | |
+ ts_data=t_data) | |
+ self.assertEqual(expected, ssync_receiver.decode_missing(msg)) | |
# unexpected subparts in timestamp delta part are tolerated | |
- msg = '%s %s c:12345,m:%x,junk' % (hash, t_data.internal, d_meta_data) | |
- expected = dict( | |
- hash=hash, ts_meta=t_meta, ts_data=t_data) | |
+ msg = '%s %s c:12345,m:%x,junk' % (object_hash, | |
+ t_data.internal, | |
+ d_meta_data) | |
+ expected = dict(object_hash=object_hash, | |
+ ts_meta=t_meta, | |
+ ts_data=t_data) | |
self.assertEqual( | |
- expected, ssync_receiver._decode_missing(msg)) | |
+ expected, ssync_receiver.decode_missing(msg)) | |
# extra message parts tolerated | |
- msg = '%s %s m:%x future parts' % (hash, t_data.internal, d_meta_data) | |
- expected = dict(hash=hash, ts_meta=t_meta, ts_data=t_data) | |
- self.assertEqual( | |
- expected, ssync_receiver._decode_missing(msg)) | |
+ msg = '%s %s m:%x future parts' % (object_hash, | |
+ t_data.internal, | |
+ d_meta_data) | |
+ expected = dict(object_hash=object_hash, | |
+ ts_meta=t_meta, | |
+ ts_data=t_data) | |
+ self.assertEqual(expected, ssync_receiver.decode_missing(msg)) | |
+ | |
+ def test_encode_wanted(self): | |
+ ts_iter = make_timestamp_iter() | |
+ old_t_data = ts_iter.next() | |
+ t_data = ts_iter.next() | |
+ old_t_meta = ts_iter.next() | |
+ t_meta = ts_iter.next() | |
+ | |
+ remote = { | |
+ 'object_hash': 'theremotehash', | |
+ 'ts_data': t_data, | |
+ 'ts_meta': t_meta, | |
+ } | |
+ | |
+ # missing | |
+ local = {} | |
+ expected = 'theremotehash dm' | |
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local), | |
+ expected) | |
+ | |
+ # in-sync | |
+ local = { | |
+ 'ts_data': t_data, | |
+ 'ts_meta': t_meta, | |
+ } | |
+ expected = None | |
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local), | |
+ expected) | |
+ | |
+ # out-of-sync | |
+ local = { | |
+ 'ts_data': old_t_data, | |
+ 'ts_meta': old_t_meta, | |
+ } | |
+ expected = 'theremotehash dm' | |
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local), | |
+ expected) | |
+ | |
+ # old data | |
+ local = { | |
+ 'ts_data': old_t_data, | |
+ 'ts_meta': t_meta, | |
+ } | |
+ expected = 'theremotehash d' | |
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local), | |
+ expected) | |
+ | |
+ # old metadata | |
+ local = { | |
+ 'ts_data': t_data, | |
+ 'ts_meta': old_t_meta, | |
+ } | |
+ expected = 'theremotehash m' | |
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local), | |
+ expected) | |
+ | |
+ # in-sync tombstone | |
+ local = { | |
+ 'ts_data': t_data, | |
+ } | |
+ expected = None | |
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local), | |
+ expected) | |
+ | |
+ # old tombstone | |
+ local = { | |
+ 'ts_data': old_t_data, | |
+ } | |
+ expected = 'theremotehash d' | |
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local), | |
+ expected) | |
if __name__ == '__main__': | |
diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py | |
index a072c8d..4af25c1 100644 | |
--- a/test/unit/obj/test_ssync_sender.py | |
+++ b/test/unit/obj/test_ssync_sender.py | |
@@ -487,6 +487,53 @@ class TestSender(BaseTestSender): | |
'1380144474.44444')]) | |
self.assertEqual(expected_map, candidates) | |
+ def test_call_and_missing_check_metadata_legacy_response(self): | |
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs): | |
+ if device == 'dev' and partition == '9' and suffixes == ['abc'] \ | |
+ and policy == POLICIES.legacy: | |
+ yield ( | |
+ '/srv/node/dev/objects/9/abc/' | |
+ '9d41d8cd98f00b204e9800998ecf0abc', | |
+ '9d41d8cd98f00b204e9800998ecf0abc', | |
+ {'ts_data': Timestamp(1380144470.00000), | |
+ 'ts_meta': Timestamp(1380155570.00005)}) | |
+ else: | |
+ raise Exception( | |
+ 'No match for %r %r %r' % (device, partition, suffixes)) | |
+ | |
+ self.sender.connection = FakeConnection() | |
+ self.sender.node = {} | |
+ self.sender.job = { | |
+ 'device': 'dev', | |
+ 'partition': '9', | |
+ 'policy': POLICIES.legacy, | |
+ 'frag_index': 0, | |
+ } | |
+ self.sender.suffixes = ['abc'] | |
+ self.sender.response = FakeResponse( | |
+ chunk_body=( | |
+ ':MISSING_CHECK: START\r\n' | |
+ '9d41d8cd98f00b204e9800998ecf0abc\r\n' | |
+ ':MISSING_CHECK: END\r\n' | |
+ ':UPDATES: START\r\n' | |
+ ':UPDATES: END\r\n' | |
+ )) | |
+ self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes | |
+ self.sender.connect = mock.MagicMock() | |
+ self.sender.df_mgr.get_diskfile_from_hash = mock.MagicMock() | |
+ self.sender.disconnect = mock.MagicMock() | |
+ success, candidates = self.sender() | |
+ self.assertTrue(success) | |
+ found_post = found_put = False | |
+ for chunk in self.sender.connection.sent: | |
+ if 'POST' in chunk: | |
+ found_post = True | |
+ if 'PUT' in chunk: | |
+ found_put = True | |
+ self.assertFalse(found_post) | |
+ self.assertTrue(found_put) | |
+ self.assertEqual(self.sender.failures, 0) | |
+ | |
def test_call_and_missing_check(self): | |
def yield_hashes(device, partition, policy, suffixes=None, **kwargs): | |
if device == 'dev' and partition == '9' and suffixes == ['abc'] \ | |
@@ -501,6 +548,7 @@ class TestSender(BaseTestSender): | |
'No match for %r %r %r' % (device, partition, suffixes)) | |
self.sender.connection = FakeConnection() | |
+ self.sender.node = {} | |
self.sender.job = { | |
'device': 'dev', | |
'partition': '9', | |
@@ -511,7 +559,7 @@ class TestSender(BaseTestSender): | |
self.sender.response = FakeResponse( | |
chunk_body=( | |
':MISSING_CHECK: START\r\n' | |
- '9d41d8cd98f00b204e9800998ecf0abc\r\n' | |
+ '9d41d8cd98f00b204e9800998ecf0abc d\r\n' | |
':MISSING_CHECK: END\r\n')) | |
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes | |
self.sender.connect = mock.MagicMock() | |
@@ -578,13 +626,13 @@ class TestSender(BaseTestSender): | |
'policy': POLICIES.legacy, | |
'frag_index': 0, | |
} | |
- self.sender = ssync_sender.Sender(self.daemon, None, job, ['abc'], | |
+ self.sender = ssync_sender.Sender(self.daemon, {}, job, ['abc'], | |
['9d41d8cd98f00b204e9800998ecf0abc']) | |
self.sender.connection = FakeConnection() | |
self.sender.response = FakeResponse( | |
chunk_body=( | |
':MISSING_CHECK: START\r\n' | |
- '9d41d8cd98f00b204e9800998ecf0abc\r\n' | |
+ '9d41d8cd98f00b204e9800998ecf0abc d\r\n' | |
':MISSING_CHECK: END\r\n')) | |
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes | |
self.sender.connect = mock.MagicMock() | |
@@ -2317,7 +2365,7 @@ class TestSsyncReplication(TestBaseSsync): | |
# wrap connection from tx to rx to capture ssync messages... | |
sender.connect, trace = self.make_connect_wrapper(sender) | |
- def _legacy_encode_wanted(self, line): | |
+ def _legacy_check_missing(self, line): | |
# reproduces behavior of 'legacy' ssync receiver missing_checks() | |
parts = line.split() | |
object_hash, timestamp = [urllib.unquote(v) for v in parts[:2]] | |
@@ -2341,8 +2389,8 @@ class TestSsyncReplication(TestBaseSsync): | |
return None | |
# run the sync protocol... | |
- func = 'swift.obj.ssync_receiver.Receiver._encode_wanted' | |
- with mock.patch(func, _legacy_encode_wanted): | |
+ func = 'swift.obj.ssync_receiver.Receiver._check_missing' | |
+ with mock.patch(func, _legacy_check_missing): | |
success, in_sync_objs = sender() | |
self.assertEqual(1, len(in_sync_objs)) | |
@@ -2370,30 +2418,53 @@ class TestSsyncReplication(TestBaseSsync): | |
class TestModuleMethods(unittest.TestCase): | |
def test_encode_missing(self): | |
- hash = '9d41d8cd98f00b204e9800998ecf0abc' | |
+ object_hash = '9d41d8cd98f00b204e9800998ecf0abc' | |
ts_iter = make_timestamp_iter() | |
t_data = ts_iter.next() | |
t_meta = ts_iter.next() | |
d_meta_data = t_meta.raw - t_data.raw | |
# equal data and meta timestamps -> legacy single timestamp string | |
- expected = '%s %s' % (hash, t_data.internal) | |
+ expected = '%s %s' % (object_hash, t_data.internal) | |
self.assertEqual( | |
expected, | |
- ssync_sender._encode_missing(hash, t_data, ts_meta=t_data)) | |
+ ssync_sender.encode_missing(object_hash, t_data, ts_meta=t_data)) | |
# newer meta timestamp -> hex data delta encoded as extra message part | |
- expected = '%s %s m:%x' % (hash, t_data.internal, d_meta_data) | |
+ expected = '%s %s m:%x' % (object_hash, t_data.internal, d_meta_data) | |
self.assertEqual( | |
expected, | |
- ssync_sender._encode_missing(hash, t_data, ts_meta=t_meta)) | |
+ ssync_sender.encode_missing(object_hash, t_data, ts_meta=t_meta)) | |
# test encode and decode functions invert | |
- expected = {'hash': hash, 'ts_meta': t_meta, 'ts_data': t_data} | |
- msg = ssync_sender._encode_missing(**expected) | |
- actual = ssync_receiver._decode_missing(msg) | |
+ expected = {'object_hash': object_hash, 'ts_meta': t_meta, | |
+ 'ts_data': t_data} | |
+ msg = ssync_sender.encode_missing(**expected) | |
+ actual = ssync_receiver.decode_missing(msg) | |
self.assertEqual(expected, actual) | |
+ def test_decode_wanted(self): | |
+ parts = ['d'] | |
+ expected = {'data': True} | |
+ self.assertEqual(ssync_sender.decode_wanted(parts), expected) | |
+ | |
+ parts = ['m'] | |
+ expected = {'meta': True} | |
+ self.assertEqual(ssync_sender.decode_wanted(parts), expected) | |
+ | |
+ parts = ['dm'] | |
+ expected = {'data': True, 'meta': True} | |
+ self.assertEqual(ssync_sender.decode_wanted(parts), expected) | |
+ | |
+ # you don't really these next few... | |
+ parts = ['md'] | |
+ expected = {'data': True, 'meta': True} | |
+ self.assertEqual(ssync_sender.decode_wanted(parts), expected) | |
+ | |
+ parts = ['xcy', 'funny', {'business': True}] | |
+ expected = {'data': True} | |
+ self.assertEqual(ssync_sender.decode_wanted(parts), expected) | |
+ | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment