Created
January 24, 2017 20:41
-
-
Save zaitcev/b5378c18c522bd695f3ac75c3b5b66f6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
commit 065a697f5537454123479a8929e0c6d8492d5c8d | |
Author: Pete Zaitcev <zaitcev@kotori.zaitcev.us> | |
Date: Wed Jan 18 23:27:40 2017 -0700 | |
Embrio of new POST /a/c/object | |
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py | |
index d6aec48..bf25d90 100644 | |
--- a/swift/obj/diskfile.py | |
+++ b/swift/obj/diskfile.py | |
@@ -1542,6 +1542,10 @@ class BaseDiskFileWriter(object): | |
:param timestamp: object put timestamp, an instance of | |
:class:`~swift.common.utils.Timestamp` | |
""" | |
+ # P3 | |
+ fp = open("/tmp/dump", 'a') | |
+ fp.write("BaseDiskFileWriter.commit %s\n" % self._name) | |
+ fp.close() | |
pass | |
@@ -2523,6 +2527,10 @@ class DiskFileWriter(BaseDiskFileWriter): | |
:param metadata: dictionary of metadata to be associated with the | |
object | |
""" | |
+ # P3 | |
+ fp = open("/tmp/dump", 'a') | |
+ fp.write("DiskFileWriter.put %s\n" % self._name) | |
+ fp.close() | |
super(DiskFileWriter, self)._put(metadata, True) | |
@@ -2717,6 +2725,10 @@ class ECDiskFileWriter(BaseDiskFileWriter): | |
:raises DiskFileError: if the diskfile frag_index has not been set | |
(either during initialisation or a call to put()) | |
""" | |
+ # P3 | |
+ fp = open("/tmp/dump", 'a') | |
+ fp.write("ECDiskFileWriter.commit %s\n" % self._name) | |
+ fp.close() | |
data_file_path = join( | |
self._datadir, self.manager.make_on_disk_filename( | |
timestamp, '.data', self._diskfile._frag_index)) | |
@@ -2733,6 +2745,10 @@ class ECDiskFileWriter(BaseDiskFileWriter): | |
:param metadata: dictionary of metadata to be associated with object | |
""" | |
+ # P3 | |
+ fp = open("/tmp/dump", 'a') | |
+ fp.write("ECDiskFileWriter.put %s\n" % self._name) | |
+ fp.close() | |
fi = None | |
cleanup = True | |
if self._extension == '.data': | |
diff --git a/swift/obj/server.py b/swift/obj/server.py | |
index 157cca4..632af0e 100644 | |
--- a/swift/obj/server.py | |
+++ b/swift/obj/server.py | |
@@ -45,7 +45,7 @@ from swift.obj import ssync_receiver | |
from swift.common.http import is_success | |
from swift.common.base_storage_server import BaseStorageServer | |
from swift.common.header_key_dict import HeaderKeyDict | |
-from swift.common.request_helpers import get_name_and_placement, \ | |
+from swift.common.request_helpers import get_name_and_placement, get_param, \ | |
is_user_meta, is_sys_or_user_meta, is_object_transient_sysmeta, \ | |
resolve_etag_is_at_header, is_sys_meta | |
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \ | |
@@ -586,6 +586,17 @@ class ObjectController(BaseStorageServer): | |
except (DiskFileXattrNotSupported, DiskFileNoSpace): | |
return HTTPInsufficientStorage(drive=device, request=request) | |
+ if not get_param(request, 'durable') is None: | |
+ # P3 | |
+ fp = open("/tmp/dump", 'a') | |
+ fp.write("POST to durable %s/%s/%s\n" % (account, container, obj)) | |
+ fp.close() | |
+ else: | |
+ # P3 | |
+ fp = open("/tmp/dump", 'a') | |
+ fp.write("POST normal %s/%s/%s\n" % (account, container, obj)) | |
+ fp.close() | |
+ | |
if (content_type_headers['Content-Type-Timestamp'] | |
!= disk_file.data_timestamp): | |
# Current content-type is not from the datafile, but the datafile | |
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py | |
index b79fa14..c813c00 100644 | |
--- a/swift/proxy/controllers/obj.py | |
+++ b/swift/proxy/controllers/obj.py | |
@@ -1747,6 +1747,157 @@ class MIMEPutter(Putter): | |
mime_boundary, multiphase=need_multiphase) | |
+class DoublePutter(Putter): | |
+ """ | |
+ Putter for backend PUT requests that use two-stage PUT/POST scheme | |
+ | |
+ Note: make sure to call DoublePutter.connect() instead of instantiating | |
+ """ | |
+ def __init__(self, conn, node, resp, req, connect_duration, | |
+ logger, mime_boundary, multiphase=False): | |
+ super(DoublePutter, self).__init__(conn, node, resp, req, | |
+ connect_duration, logger) | |
+ # XXX leave chunked and multiphase in for now | |
+ self.chunked = True # MIME requests always send chunked body | |
+ self.mime_boundary = mime_boundary | |
+ # We ignore multiphase on purpose and always send a POST. | |
+ # XXX maybe get rid of these lies then, one day | |
+ self.multiphase = multiphase | |
+ self.footer_meta = None | |
+ | |
+ def _start_object_data(self): | |
+ # We're sending the object plus other stuff in the same request | |
+ # body, all wrapped up in multipart MIME, so we'd better start | |
+ # off the MIME document before sending any object data. | |
+ self.queue.put("--%s\r\nX-Document: object body\r\n\r\n" % | |
+ (self.mime_boundary,)) | |
+ | |
+ def end_of_object_data(self, footer_metadata=None): | |
+ """ | |
+ Call when there is no more data to send. | |
+ | |
+ Overrides superclass implementation to send any footer metadata | |
+ after object data. | |
+ | |
+ :param footer_metadata: dictionary of metadata items | |
+ to be sent as footers. | |
+ """ | |
+ if self.state == DATA_SENT: | |
+ raise ValueError("called end_of_object_data twice") | |
+ elif self.state == NO_DATA_SENT and self.mime_boundary: | |
+ self._start_object_data() | |
+ | |
+ ## P3 | |
+ #fp = open("/tmp/dump",'a') | |
+ #fp.write("end_of_object_data: %r\n" % (footer_metadata,)) | |
+ #fp.close() | |
+ # We rely on the caller to be good and just keep a reference for now. | |
+ self.footer_meta = footer_metadata | |
+ | |
+ # XXX Suppose converted footers into normal headers. What about MD5? | |
+ # footer_body = json.dumps(footer_metadata) | |
+ # footer_md5 = md5(footer_body).hexdigest() | |
+ | |
+ # this will be the last part sent | |
+ message_parts = [ | |
+ "\r\n--%s--\r\n" % self.mime_boundary, | |
+ ] | |
+ self.queue.put("".join(message_parts)) | |
+ | |
+ self.queue.put('') | |
+ self.state = DATA_SENT | |
+ | |
+ def send_commit_confirmation(self): | |
+ """ | |
+ Call when there are > quorum 2XX responses received. Send commit | |
+ confirmations to all object nodes to finalize the PUT. | |
+ """ | |
+ if self.state == COMMIT_SENT: | |
+ raise ValueError("called send_commit_confirmation twice") | |
+ | |
+ # XXX is waiting to encode and drain really necessary? | |
+ self.queue.put('') | |
+ self.wait() | |
+ | |
+ # XXX patch node_timeout through from connect() somehow | |
+ node_timeout = 60 | |
+ put_resp = self.await_response(node_timeout, False) | |
+ # P3 | |
+ fp = open("/tmp/dump",'a') | |
+ fp.write("put_resp: %s\n" % (put_resp.status,)) | |
+ fp.close() | |
+ # XXX raise something if put_resp.status != 201 or other 2xx. | |
+ | |
+ self.state = DATA_ACKED | |
+ | |
+ headers = self.footer_meta | |
+ headers['Content-Length'] = '0' | |
+ self.conn.putrequest('POST', self.path, skip_host=('Host' in headers)) | |
+ for header, value in headers.items(): | |
+ self.conn.putheader(header, str(value)) | |
+ self.conn.endheaders() | |
+ | |
+ self.state = COMMIT_SENT | |
+ | |
+ @classmethod | |
+ def connect(cls, node, part, path, headers, conn_timeout, node_timeout, | |
+ logger=None, need_multiphase=True, **kwargs): | |
+ """ | |
+ Connect to a backend node and send the headers. | |
+ | |
+ Override superclass method to notify object of need for support for | |
+ multipart body with footers and optionally multiphase commit, and | |
+ verify object server's capabilities. | |
+ | |
+ :param need_multiphase: if True then multiphase support is required of | |
+ the object server | |
+ :raises: FooterNotSupported if need_metadata_footer is set but | |
+ backend node can't process footers | |
+ :raises: MultiphasePUTNotSupported if need_multiphase is set but | |
+ backend node can't handle multiphase PUT | |
+ """ | |
+ mime_boundary = "%.64x" % random.randint(0, 16 ** 64) | |
+ headers = HeaderKeyDict(headers) | |
+ # when using a multipart mime request to backend the actual | |
+ # content-length is not equal to the object content size, so move the | |
+ # object content size to X-Backend-Obj-Content-Length if that has not | |
+ # already been set by the EC PUT path. | |
+ headers.setdefault('X-Backend-Obj-Content-Length', | |
+ headers.pop('Content-Length', None)) | |
+ # We're going to be adding some unknown amount of data to the | |
+ # request, so we can't use an explicit content length, and thus | |
+ # we must use chunked encoding. | |
+ # XXX The POST removes his need, drop this code later. | |
+ headers['Transfer-Encoding'] = 'chunked' | |
+ headers['Expect'] = '100-continue' | |
+ | |
+ headers['X-Backend-Obj-Multipart-Mime-Boundary'] = mime_boundary | |
+ | |
+ headers['X-Backend-Obj-Metadata-Footer'] = 'yes' | |
+ | |
+ if need_multiphase: | |
+ headers['X-Backend-Obj-Multiphase-Commit'] = 'yes' | |
+ | |
+ conn, expect_resp, final_resp, connect_duration = cls._make_connection( | |
+ node, part, path, headers, conn_timeout, node_timeout) | |
+ | |
+ if is_informational(expect_resp.status): | |
+ continue_headers = HeaderKeyDict(expect_resp.getheaders()) | |
+ can_send_metadata_footer = config_true_value( | |
+ continue_headers.get('X-Obj-Metadata-Footer', 'no')) | |
+ can_handle_multiphase_put = config_true_value( | |
+ continue_headers.get('X-Obj-Multiphase-Commit', 'no')) | |
+ | |
+ if not can_send_metadata_footer: | |
+ raise FooterNotSupported() | |
+ | |
+ if need_multiphase and not can_handle_multiphase_put: | |
+ raise MultiphasePUTNotSupported() | |
+ | |
+ return cls(conn, node, final_resp, path, connect_duration, logger, | |
+ mime_boundary, multiphase=need_multiphase) | |
+ | |
+ | |
def chunk_transformer(policy, nstreams): | |
segment_size = policy.ec_segment_size | |
diff --git a/test-ecput.py b/test-ecput.py | |
index 4e1e1fd..93d26a8 100644 | |
--- a/test-ecput.py | |
+++ b/test-ecput.py | |
@@ -18,7 +18,7 @@ import time | |
from hashlib import md5 | |
from swift.common.utils import ContextPool | |
-from swift.proxy.controllers.obj import MIMEPutter | |
+from swift.proxy.controllers.obj import DoublePutter | |
from swift.common.header_key_dict import HeaderKeyDict | |
def exception_occurred(node, typ, additional_info, **kwargs): | |
@@ -75,11 +75,11 @@ def poke(netloc, path, policy_index): | |
headers['Content-Type'] = "application/octet-stream" | |
headers['X-Backend-Storage-Policy-Index'] = policy_index | |
headers['X-Timestamp'] = "%016.05f" % (time.time(),) | |
- headers['Referer'] = "Test-MIME-PUT" | |
+ headers['Referer'] = "Test-EC-PUT" | |
part = int(path_parts[1]) | |
path_aco = '/' + path_parts[2] | |
- putter = MIMEPutter.connect(node, part, path_aco, headers, | |
+ putter = DoublePutter.connect(node, part, path_aco, headers, | |
conn_timeout, node_timeout, | |
logger=logger, need_multiphase=True) | |
if putter.failed: | |
diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py | |
index b68d296..6521a0c 100644 | |
--- a/test/unit/obj/test_server.py | |
+++ b/test/unit/obj/test_server.py | |
@@ -391,6 +391,41 @@ class TestObjectController(unittest.TestCase): | |
gmtime(math.ceil(float(post_timestamp)))), | |
}) | |
+ def test_POST_with_query(self): | |
+ # Test swift.obj.server.ObjectController.POST with query parameter | |
+ put_timestamp = normalize_timestamp(time()) | |
+ headers = {'X-Timestamp': put_timestamp, | |
+ 'Content-Type': 'application/x-test', | |
+ 'X-Object-Sysmeta-Color': 'blue', | |
+ 'X-Object-Transient-Sysmeta-Shape': 'circle', | |
+ 'X-Object-Meta-1': 'One'} | |
+ req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, | |
+ headers=headers) | |
+ req.body = 'VERIFY' | |
+ etag = '"%s"' % md5('VERIFY').hexdigest() | |
+ resp = req.get_response(self.object_controller) | |
+ self.assertEqual(resp.status_int, 201) | |
+ self.assertEqual(dict(resp.headers), { | |
+ 'Content-Type': 'text/html; charset=UTF-8', | |
+ 'Content-Length': str(len(resp.body)), | |
+ 'Etag': etag, | |
+ }) | |
+ | |
+ post_timestamp = normalize_timestamp(time()) | |
+ headers = {'X-Timestamp': post_timestamp, | |
+ 'Content-Type': 'application/x-test'} | |
+ req = Request.blank('/sda1/p/a/c/o', | |
+ environ={'REQUEST_METHOD': 'POST', | |
+ 'QUERY_STRING': 'durable=yes'}, | |
+ headers=headers) | |
+ resp = req.get_response(self.object_controller) | |
+ self.assertEqual(resp.status_int, 202) | |
+ self.assertEqual(dict(resp.headers), { | |
+ 'Content-Type': 'text/html; charset=UTF-8', | |
+ 'Content-Length': str(len(resp.body)), | |
+ 'X-Object-Sysmeta-Color': 'blue', | |
+ }) | |
+ | |
def test_POST_old_timestamp(self): | |
ts = time() | |
orig_timestamp = utils.Timestamp(ts).internal |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment