Skip to content

Instantly share code, notes, and snippets.

@zaitcev
Created January 24, 2017 20:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zaitcev/b5378c18c522bd695f3ac75c3b5b66f6 to your computer and use it in GitHub Desktop.
Save zaitcev/b5378c18c522bd695f3ac75c3b5b66f6 to your computer and use it in GitHub Desktop.
commit 065a697f5537454123479a8929e0c6d8492d5c8d
Author: Pete Zaitcev <zaitcev@kotori.zaitcev.us>
Date: Wed Jan 18 23:27:40 2017 -0700
Embrio of new POST /a/c/object
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py
index d6aec48..bf25d90 100644
--- a/swift/obj/diskfile.py
+++ b/swift/obj/diskfile.py
@@ -1542,6 +1542,10 @@ class BaseDiskFileWriter(object):
:param timestamp: object put timestamp, an instance of
:class:`~swift.common.utils.Timestamp`
"""
+ # P3
+ fp = open("/tmp/dump", 'a')
+ fp.write("BaseDiskFileWriter.commit %s\n" % self._name)
+ fp.close()
pass
@@ -2523,6 +2527,10 @@ class DiskFileWriter(BaseDiskFileWriter):
:param metadata: dictionary of metadata to be associated with the
object
"""
+ # P3
+ fp = open("/tmp/dump", 'a')
+ fp.write("DiskFileWriter.put %s\n" % self._name)
+ fp.close()
super(DiskFileWriter, self)._put(metadata, True)
@@ -2717,6 +2725,10 @@ class ECDiskFileWriter(BaseDiskFileWriter):
:raises DiskFileError: if the diskfile frag_index has not been set
(either during initialisation or a call to put())
"""
+ # P3
+ fp = open("/tmp/dump", 'a')
+ fp.write("ECDiskFileWriter.commit %s\n" % self._name)
+ fp.close()
data_file_path = join(
self._datadir, self.manager.make_on_disk_filename(
timestamp, '.data', self._diskfile._frag_index))
@@ -2733,6 +2745,10 @@ class ECDiskFileWriter(BaseDiskFileWriter):
:param metadata: dictionary of metadata to be associated with object
"""
+ # P3
+ fp = open("/tmp/dump", 'a')
+ fp.write("ECDiskFileWriter.put %s\n" % self._name)
+ fp.close()
fi = None
cleanup = True
if self._extension == '.data':
diff --git a/swift/obj/server.py b/swift/obj/server.py
index 157cca4..632af0e 100644
--- a/swift/obj/server.py
+++ b/swift/obj/server.py
@@ -45,7 +45,7 @@ from swift.obj import ssync_receiver
from swift.common.http import is_success
from swift.common.base_storage_server import BaseStorageServer
from swift.common.header_key_dict import HeaderKeyDict
-from swift.common.request_helpers import get_name_and_placement, \
+from swift.common.request_helpers import get_name_and_placement, get_param, \
is_user_meta, is_sys_or_user_meta, is_object_transient_sysmeta, \
resolve_etag_is_at_header, is_sys_meta
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
@@ -586,6 +586,17 @@ class ObjectController(BaseStorageServer):
except (DiskFileXattrNotSupported, DiskFileNoSpace):
return HTTPInsufficientStorage(drive=device, request=request)
+ if not get_param(request, 'durable') is None:
+ # P3
+ fp = open("/tmp/dump", 'a')
+ fp.write("POST to durable %s/%s/%s\n" % (account, container, obj))
+ fp.close()
+ else:
+ # P3
+ fp = open("/tmp/dump", 'a')
+ fp.write("POST normal %s/%s/%s\n" % (account, container, obj))
+ fp.close()
+
if (content_type_headers['Content-Type-Timestamp']
!= disk_file.data_timestamp):
# Current content-type is not from the datafile, but the datafile
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py
index b79fa14..c813c00 100644
--- a/swift/proxy/controllers/obj.py
+++ b/swift/proxy/controllers/obj.py
@@ -1747,6 +1747,157 @@ class MIMEPutter(Putter):
mime_boundary, multiphase=need_multiphase)
+class DoublePutter(Putter):
+ """
+ Putter for backend PUT requests that use two-stage PUT/POST scheme
+
+ Note: make sure to call DoublePutter.connect() instead of instantiating
+ """
+ def __init__(self, conn, node, resp, req, connect_duration,
+ logger, mime_boundary, multiphase=False):
+ super(DoublePutter, self).__init__(conn, node, resp, req,
+ connect_duration, logger)
+ # XXX leave chunked and multiphase in for now
+ self.chunked = True # MIME requests always send chunked body
+ self.mime_boundary = mime_boundary
+ # We ignore multiphase on purpose and always send a POST.
+ # XXX maybe get rid of these lies then, one day
+ self.multiphase = multiphase
+ self.footer_meta = None
+
+ def _start_object_data(self):
+ # We're sending the object plus other stuff in the same request
+ # body, all wrapped up in multipart MIME, so we'd better start
+ # off the MIME document before sending any object data.
+ self.queue.put("--%s\r\nX-Document: object body\r\n\r\n" %
+ (self.mime_boundary,))
+
+ def end_of_object_data(self, footer_metadata=None):
+ """
+ Call when there is no more data to send.
+
+ Overrides superclass implementation to send any footer metadata
+ after object data.
+
+ :param footer_metadata: dictionary of metadata items
+ to be sent as footers.
+ """
+ if self.state == DATA_SENT:
+ raise ValueError("called end_of_object_data twice")
+ elif self.state == NO_DATA_SENT and self.mime_boundary:
+ self._start_object_data()
+
+ ## P3
+ #fp = open("/tmp/dump",'a')
+ #fp.write("end_of_object_data: %r\n" % (footer_metadata,))
+ #fp.close()
+ # We rely on the caller to be good and just keep a reference for now.
+ self.footer_meta = footer_metadata
+
+ # XXX Suppose converted footers into normal headers. What about MD5?
+ # footer_body = json.dumps(footer_metadata)
+ # footer_md5 = md5(footer_body).hexdigest()
+
+ # this will be the last part sent
+ message_parts = [
+ "\r\n--%s--\r\n" % self.mime_boundary,
+ ]
+ self.queue.put("".join(message_parts))
+
+ self.queue.put('')
+ self.state = DATA_SENT
+
+ def send_commit_confirmation(self):
+ """
+ Call when there are > quorum 2XX responses received. Send commit
+ confirmations to all object nodes to finalize the PUT.
+ """
+ if self.state == COMMIT_SENT:
+ raise ValueError("called send_commit_confirmation twice")
+
+ # XXX is waiting to encode and drain really necessary?
+ self.queue.put('')
+ self.wait()
+
+ # XXX patch node_timeout through from connect() somehow
+ node_timeout = 60
+ put_resp = self.await_response(node_timeout, False)
+ # P3
+ fp = open("/tmp/dump",'a')
+ fp.write("put_resp: %s\n" % (put_resp.status,))
+ fp.close()
+ # XXX raise something if put_resp.status != 201 or other 2xx.
+
+ self.state = DATA_ACKED
+
+ headers = self.footer_meta
+ headers['Content-Length'] = '0'
+ self.conn.putrequest('POST', self.path, skip_host=('Host' in headers))
+ for header, value in headers.items():
+ self.conn.putheader(header, str(value))
+ self.conn.endheaders()
+
+ self.state = COMMIT_SENT
+
+ @classmethod
+ def connect(cls, node, part, path, headers, conn_timeout, node_timeout,
+ logger=None, need_multiphase=True, **kwargs):
+ """
+ Connect to a backend node and send the headers.
+
+ Override superclass method to notify object of need for support for
+ multipart body with footers and optionally multiphase commit, and
+ verify object server's capabilities.
+
+ :param need_multiphase: if True then multiphase support is required of
+ the object server
+ :raises: FooterNotSupported if need_metadata_footer is set but
+ backend node can't process footers
+ :raises: MultiphasePUTNotSupported if need_multiphase is set but
+ backend node can't handle multiphase PUT
+ """
+ mime_boundary = "%.64x" % random.randint(0, 16 ** 64)
+ headers = HeaderKeyDict(headers)
+ # when using a multipart mime request to backend the actual
+ # content-length is not equal to the object content size, so move the
+ # object content size to X-Backend-Obj-Content-Length if that has not
+ # already been set by the EC PUT path.
+ headers.setdefault('X-Backend-Obj-Content-Length',
+ headers.pop('Content-Length', None))
+ # We're going to be adding some unknown amount of data to the
+ # request, so we can't use an explicit content length, and thus
+ # we must use chunked encoding.
+ # XXX The POST removes his need, drop this code later.
+ headers['Transfer-Encoding'] = 'chunked'
+ headers['Expect'] = '100-continue'
+
+ headers['X-Backend-Obj-Multipart-Mime-Boundary'] = mime_boundary
+
+ headers['X-Backend-Obj-Metadata-Footer'] = 'yes'
+
+ if need_multiphase:
+ headers['X-Backend-Obj-Multiphase-Commit'] = 'yes'
+
+ conn, expect_resp, final_resp, connect_duration = cls._make_connection(
+ node, part, path, headers, conn_timeout, node_timeout)
+
+ if is_informational(expect_resp.status):
+ continue_headers = HeaderKeyDict(expect_resp.getheaders())
+ can_send_metadata_footer = config_true_value(
+ continue_headers.get('X-Obj-Metadata-Footer', 'no'))
+ can_handle_multiphase_put = config_true_value(
+ continue_headers.get('X-Obj-Multiphase-Commit', 'no'))
+
+ if not can_send_metadata_footer:
+ raise FooterNotSupported()
+
+ if need_multiphase and not can_handle_multiphase_put:
+ raise MultiphasePUTNotSupported()
+
+ return cls(conn, node, final_resp, path, connect_duration, logger,
+ mime_boundary, multiphase=need_multiphase)
+
+
def chunk_transformer(policy, nstreams):
segment_size = policy.ec_segment_size
diff --git a/test-ecput.py b/test-ecput.py
index 4e1e1fd..93d26a8 100644
--- a/test-ecput.py
+++ b/test-ecput.py
@@ -18,7 +18,7 @@ import time
from hashlib import md5
from swift.common.utils import ContextPool
-from swift.proxy.controllers.obj import MIMEPutter
+from swift.proxy.controllers.obj import DoublePutter
from swift.common.header_key_dict import HeaderKeyDict
def exception_occurred(node, typ, additional_info, **kwargs):
@@ -75,11 +75,11 @@ def poke(netloc, path, policy_index):
headers['Content-Type'] = "application/octet-stream"
headers['X-Backend-Storage-Policy-Index'] = policy_index
headers['X-Timestamp'] = "%016.05f" % (time.time(),)
- headers['Referer'] = "Test-MIME-PUT"
+ headers['Referer'] = "Test-EC-PUT"
part = int(path_parts[1])
path_aco = '/' + path_parts[2]
- putter = MIMEPutter.connect(node, part, path_aco, headers,
+ putter = DoublePutter.connect(node, part, path_aco, headers,
conn_timeout, node_timeout,
logger=logger, need_multiphase=True)
if putter.failed:
diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py
index b68d296..6521a0c 100644
--- a/test/unit/obj/test_server.py
+++ b/test/unit/obj/test_server.py
@@ -391,6 +391,41 @@ class TestObjectController(unittest.TestCase):
gmtime(math.ceil(float(post_timestamp)))),
})
+ def test_POST_with_query(self):
+ # Test swift.obj.server.ObjectController.POST with query parameter
+ put_timestamp = normalize_timestamp(time())
+ headers = {'X-Timestamp': put_timestamp,
+ 'Content-Type': 'application/x-test',
+ 'X-Object-Sysmeta-Color': 'blue',
+ 'X-Object-Transient-Sysmeta-Shape': 'circle',
+ 'X-Object-Meta-1': 'One'}
+ req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
+ headers=headers)
+ req.body = 'VERIFY'
+ etag = '"%s"' % md5('VERIFY').hexdigest()
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 201)
+ self.assertEqual(dict(resp.headers), {
+ 'Content-Type': 'text/html; charset=UTF-8',
+ 'Content-Length': str(len(resp.body)),
+ 'Etag': etag,
+ })
+
+ post_timestamp = normalize_timestamp(time())
+ headers = {'X-Timestamp': post_timestamp,
+ 'Content-Type': 'application/x-test'}
+ req = Request.blank('/sda1/p/a/c/o',
+ environ={'REQUEST_METHOD': 'POST',
+ 'QUERY_STRING': 'durable=yes'},
+ headers=headers)
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 202)
+ self.assertEqual(dict(resp.headers), {
+ 'Content-Type': 'text/html; charset=UTF-8',
+ 'Content-Length': str(len(resp.body)),
+ 'X-Object-Sysmeta-Color': 'blue',
+ })
+
def test_POST_old_timestamp(self):
ts = time()
orig_timestamp = utils.Timestamp(ts).internal
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment