Created
January 15, 2014 08:42
-
-
Save zhouyuan/8432820 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for nheaders in outgoing_headers: | |
# RFC2616:8.2.3 disallows 100-continue without a body | |
if (req.content_length > 0) or chunked: | |
nheaders['Expect'] = '100-continue' | |
##### | |
nheaders['Content-Length'] = \ | |
int(req.content_length / 4) | |
# need to sync up with kevin, how long is the header? | |
# then we could calculate the chunk_length and put it here | |
pile.spawn(self._connect_put_node, node_iter, partition, | |
req.swift_entity_path, nheaders, | |
self.app.logger.thread_locals) | |
conns = [conn for conn in pile if conn] | |
min_conns = policy.quorum_size(len(nodes)) | |
if len(conns) < min_conns: | |
self.app.logger.error( | |
_('Object PUT returning 503, %(conns)s/%(nodes)s ' | |
'required connections'), | |
{'conns': len(conns), 'nodes': min_conns}) | |
return HTTPServiceUnavailable(request=req) | |
bytes_transferred = 0 | |
# EC-specific declarations | |
segetag = md5() | |
ec_driver = ec_fragments = ec_fragment = None | |
try: | |
with ContextPool(len(nodes)) as pool: | |
for conn in conns: | |
conn.failed = False | |
conn.queue = Queue(self.app.put_queue_depth) | |
pool.spawn(self._send_file, conn, req.path) | |
_contents = "" | |
while True: | |
with ChunkReadTimeout(self.app.client_timeout): | |
try: | |
chunk = next(data_source) | |
# If policy is erasure_coding, encode obj data into | |
# ec_k + ec_m fragments. EC fragments are streamed | |
# to the object server similar to non-EC objects. | |
# We define a *minimum* object segment size (not | |
# user-configurable at the moment) to apply Erasure | |
# Coding transformation on, reasonably large so we | |
# don't end up with too many small files, or end up | |
# calling into expensive EC encode operations too | |
# many times. We buffer HTTP chunks until we have | |
# enough to meet the EC_MIN_OBJ_SEGMENT_SIZE | |
# constraint. | |
if policy.policy_type == 'erasure_coding': | |
# Buffer up to EC_MIN_OBJ_SEGMENT_SIZE | |
_contents += chunk | |
while len(_contents) < EC_MIN_OBJ_SEGMENT_SIZE: | |
chunk = next(data_source) | |
_contents += chunk | |
except StopIteration: | |
if chunked: | |
for conn in conns: | |
conn.queue.put('0\r\n\r\n') | |
break | |
if policy.policy_type == 'erasure_coding': | |
# OK to Encode and stream now | |
segetag.update(_contents) | |
ec_driver = policy.policy_config.pyeclib_driver | |
ec_fragments = ec_driver.encode(_contents) | |
print len(ec_fragments[0]) | |
ec_fragment = iter(ec_fragments) | |
bytes_transferred += len(_contents) | |
print "bytes_transferred = %d" % (bytes_transferred) | |
print "segetag = %s" % (segetag.hexdigest()) | |
else: | |
bytes_transferred += len(chunk) | |
# Should this matter in the EC case? | |
if bytes_transferred > MAX_FILE_SIZE: | |
return HTTPRequestEntityTooLarge(request=req) | |
for conn in list(conns): | |
if not conn.failed: | |
if policy.policy_type == 'erasure_coding': | |
print req.headers | |
conn.queue.put(next(ec_fragment)) | |
else: | |
conn.queue.put( | |
'%x\r\n%s\r\n' % (len(chunk), chunk) | |
if chunked else chunk) | |
else: | |
conns.remove(conn) | |
if len(conns) < min_conns: | |
self.app.logger.error(_( | |
'Object PUT exceptions during' | |
' send, %(conns)s/%(nodes)s required connections'), | |
{'conns': len(conns), 'nodes': min_conns}) | |
return HTTPServiceUnavailable(request=req) | |
for conn in conns: | |
if conn.queue.unfinished_tasks: | |
conn.queue.join() | |
conns = [conn for conn in conns if not conn.failed] | |
except ChunkReadTimeout as err: | |
self.app.logger.warn( | |
_('ERROR Client read timeout (%ss)'), err.seconds) | |
self.app.logger.increment('client_timeouts') | |
return HTTPRequestTimeout(request=req) | |
except (Exception, Timeout): | |
self.app.logger.exception( | |
_('ERROR Exception causing client disconnect')) | |
return HTTPClientDisconnect(request=req) | |
if req.content_length and bytes_transferred < req.content_length: | |
req.client_disconnect = True | |
self.app.logger.warn( | |
_('Client disconnected without sending enough data')) | |
self.app.logger.increment('client_disconnects') | |
return HTTPClientDisconnect(request=req) | |
statuses, reasons, bodies, etags = self._get_put_responses(req, conns, | |
nodes) | |
if policy.policy_type == 'erasure_coding': | |
# validate etags received in response | |
# TBD(check_tags) | |
# whole object etag for EC | |
etag = segetag.hexdigest() | |
print "setting etag = %s" % (etag) | |
else: | |
if len(etags) > 1: | |
self.app.logger.error( | |
_('Object servers returned %s mismatched etags'), | |
len(etags)) | |
return HTTPServerError(request=req) | |
etag = etags.pop() if len(etags) else None | |
resp = self.best_response(req, statuses, reasons, bodies, | |
_('Object PUT'), etag=etag) | |
if source_header: | |
resp.headers['X-Copied-From'] = quote( | |
source_header.split('/', 3)[3]) | |
if 'last-modified' in source_resp.headers: | |
resp.headers['X-Copied-From-Last-Modified'] = \ | |
source_resp.headers['last-modified'] | |
copy_headers_into(req, resp) | |
resp.last_modified = math.ceil(float(req.headers['X-Timestamp'])) | |
return resp |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment