Skip to content

Instantly share code, notes, and snippets.

@zhouyuan
Created January 15, 2014 08:42
Show Gist options
  • Save zhouyuan/8432820 to your computer and use it in GitHub Desktop.
Save zhouyuan/8432820 to your computer and use it in GitHub Desktop.
for nheaders in outgoing_headers:
# RFC2616:8.2.3 disallows 100-continue without a body
if (req.content_length > 0) or chunked:
nheaders['Expect'] = '100-continue'
#####
nheaders['Content-Length'] = \
int(req.content_length / 4)
# need to sync up with kevin, how long is the header?
# then we could calculate the chunk_length and put it here
pile.spawn(self._connect_put_node, node_iter, partition,
req.swift_entity_path, nheaders,
self.app.logger.thread_locals)
conns = [conn for conn in pile if conn]
min_conns = policy.quorum_size(len(nodes))
if len(conns) < min_conns:
self.app.logger.error(
_('Object PUT returning 503, %(conns)s/%(nodes)s '
'required connections'),
{'conns': len(conns), 'nodes': min_conns})
return HTTPServiceUnavailable(request=req)
bytes_transferred = 0
# EC-specific declarations
segetag = md5()
ec_driver = ec_fragments = ec_fragment = None
try:
with ContextPool(len(nodes)) as pool:
for conn in conns:
conn.failed = False
conn.queue = Queue(self.app.put_queue_depth)
pool.spawn(self._send_file, conn, req.path)
_contents = ""
while True:
with ChunkReadTimeout(self.app.client_timeout):
try:
chunk = next(data_source)
# If policy is erasure_coding, encode obj data into
# ec_k + ec_m fragments. EC fragments are streamed
# to the object server similar to non-EC objects.
# We define a *minimum* object segment size (not
# user-configurable at the moment) to apply Erasure
# Coding transformation on, reasonably large so we
# don't end up with too many small files, or end up
# calling into expensive EC encode operations too
# many times. We buffer HTTP chunks until we have
# enough to meet the EC_MIN_OBJ_SEGMENT_SIZE
# constraint.
if policy.policy_type == 'erasure_coding':
# Buffer up to EC_MIN_OBJ_SEGMENT_SIZE
_contents += chunk
while len(_contents) < EC_MIN_OBJ_SEGMENT_SIZE:
chunk = next(data_source)
_contents += chunk
except StopIteration:
if chunked:
for conn in conns:
conn.queue.put('0\r\n\r\n')
break
if policy.policy_type == 'erasure_coding':
# OK to Encode and stream now
segetag.update(_contents)
ec_driver = policy.policy_config.pyeclib_driver
ec_fragments = ec_driver.encode(_contents)
print len(ec_fragments[0])
ec_fragment = iter(ec_fragments)
bytes_transferred += len(_contents)
print "bytes_transferred = %d" % (bytes_transferred)
print "segetag = %s" % (segetag.hexdigest())
else:
bytes_transferred += len(chunk)
# Should this matter in the EC case?
if bytes_transferred > MAX_FILE_SIZE:
return HTTPRequestEntityTooLarge(request=req)
for conn in list(conns):
if not conn.failed:
if policy.policy_type == 'erasure_coding':
print req.headers
conn.queue.put(next(ec_fragment))
else:
conn.queue.put(
'%x\r\n%s\r\n' % (len(chunk), chunk)
if chunked else chunk)
else:
conns.remove(conn)
if len(conns) < min_conns:
self.app.logger.error(_(
'Object PUT exceptions during'
' send, %(conns)s/%(nodes)s required connections'),
{'conns': len(conns), 'nodes': min_conns})
return HTTPServiceUnavailable(request=req)
for conn in conns:
if conn.queue.unfinished_tasks:
conn.queue.join()
conns = [conn for conn in conns if not conn.failed]
except ChunkReadTimeout as err:
self.app.logger.warn(
_('ERROR Client read timeout (%ss)'), err.seconds)
self.app.logger.increment('client_timeouts')
return HTTPRequestTimeout(request=req)
except (Exception, Timeout):
self.app.logger.exception(
_('ERROR Exception causing client disconnect'))
return HTTPClientDisconnect(request=req)
if req.content_length and bytes_transferred < req.content_length:
req.client_disconnect = True
self.app.logger.warn(
_('Client disconnected without sending enough data'))
self.app.logger.increment('client_disconnects')
return HTTPClientDisconnect(request=req)
statuses, reasons, bodies, etags = self._get_put_responses(req, conns,
nodes)
if policy.policy_type == 'erasure_coding':
# validate etags received in response
# TBD(check_tags)
# whole object etag for EC
etag = segetag.hexdigest()
print "setting etag = %s" % (etag)
else:
if len(etags) > 1:
self.app.logger.error(
_('Object servers returned %s mismatched etags'),
len(etags))
return HTTPServerError(request=req)
etag = etags.pop() if len(etags) else None
resp = self.best_response(req, statuses, reasons, bodies,
_('Object PUT'), etag=etag)
if source_header:
resp.headers['X-Copied-From'] = quote(
source_header.split('/', 3)[3])
if 'last-modified' in source_resp.headers:
resp.headers['X-Copied-From-Last-Modified'] = \
source_resp.headers['last-modified']
copy_headers_into(req, resp)
resp.last_modified = math.ceil(float(req.headers['X-Timestamp']))
return resp
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment