Skip to content

Instantly share code, notes, and snippets.

@zhouyuan
Created December 15, 2014 14:28
Show Gist options
  • Save zhouyuan/3f6706479384dde0be9b to your computer and use it in GitHub Desktop.
Save zhouyuan/3f6706479384dde0be9b to your computer and use it in GitHub Desktop.
diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py
index daad23f..11618a3 100644
--- a/swift/common/ring/ring.py
+++ b/swift/common/ring/ring.py
@@ -243,7 +243,7 @@ class Ring(object):
if dev_id not in seen_ids:
part_nodes.append(self.devs[dev_id])
seen_ids.add(dev_id)
- return part_nodes
+ return [dict(node, index=i) for i, node in enumerate(part_nodes)]
def get_part(self, account, container=None, obj=None):
"""
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py
index 7d42857..553dff6 100644
--- a/swift/proxy/controllers/obj.py
+++ b/swift/proxy/controllers/obj.py
@@ -26,6 +26,7 @@
import itertools
import mimetypes
+import operator
import time
import math
import random
@@ -108,6 +109,7 @@ class Putter(object):
self.path = path
self.connect_duration = connect_duration
self.chunked = chunked
+ self.node_index = node['index']
self.mime_boundary = mime_boundary
self.failed = False
@@ -235,8 +237,8 @@ class Putter(object):
self.queue.task_done()
@classmethod
- def connect(cls, node, part, path, headers, conn_timeout, node_timeout,
- chunked=False, need_metadata_footer=False):
+ def connect(cls, node, part, path, headers, conn_timeout,
+ node_timeout, chunked=False, need_metadata_footer=False):
"""
Connect to a backend node and send the headers.
@@ -357,6 +359,18 @@ class ObjectController(Controller):
that the request will be serviced by local object servers first, but
nonlocal ones will be employed if not enough local ones are available.
+ Note that the original indices are preserved; that is, if this moves a
+ handoff forward in the chain, the result might look like
+
+ [(3, <handoff node (local)>),
+ (0, <primary node (local)>),
+ (2, <primary node (local)>),
+ (1, <primary node (nonlocal)>,
+ ... rest of the handoffs ...]
+
+ This lets erasure-code fragment archives wind up on the right local
+ primary nodes when possible, or end up on local handoffs.
+
:param ring: ring to get nodes from
:param partition: ring partition to yield nodes for
"""
@@ -368,14 +382,18 @@ class ObjectController(Controller):
primary_nodes = ring.get_part_nodes(partition)
num_locals = self.app.write_affinity_node_count(len(primary_nodes))
- all_nodes = itertools.chain(primary_nodes,
- ring.get_more_nodes(partition))
+ all_nodes = itertools.chain(
+ primary_nodes,
+ [dict(node, index=i) for i, node in enumerate(
+ ring.get_more_nodes(partition), start=len(primary_nodes))])
first_n_local_nodes = list(itertools.islice(
itertools.ifilter(is_local, all_nodes), num_locals))
# refresh it; it moved when we computed first_n_local_nodes
- all_nodes = itertools.chain(primary_nodes,
- ring.get_more_nodes(partition))
+ all_nodes = itertools.chain(
+ primary_nodes,
+ [dict(node, index=i) for i, node in enumerate(
+ ring.get_more_nodes(partition), start=len(primary_nodes))])
local_first_node_iter = itertools.chain(
first_n_local_nodes,
itertools.ifilter(lambda node: node not in first_n_local_nodes,
@@ -881,9 +899,9 @@ class ObjectController(Controller):
chunk_hashers = [None] * len(nodes)
for i, p in enumerate(pile):
if p:
- p.index = i
putters.append(p)
- chunk_hashers[i] = (
+ p.hshr_index = i
+ chunk_hashers[p.hshr_index] = (
None if policy.stores_objects_verbatim else md5())
if req.if_none_match is not None and '*' in req.if_none_match:
@@ -916,10 +934,10 @@ class ObjectController(Controller):
return
for putter in list(putters):
- backend_chunk = backend_chunks[putter.index]
+ backend_chunk = backend_chunks[chunk_index[putter]]
if not putter.failed:
- if chunk_hashers[putter.index]:
- chunk_hashers[putter.index].update(backend_chunk)
+ if chunk_hashers[putter.hshr_index]:
+ chunk_hashers[putter.hshr_index].update(backend_chunk)
putter.send_chunk(backend_chunk)
else:
putters.remove(putter)
@@ -932,6 +950,31 @@ class ObjectController(Controller):
try:
with ContextPool(len(putters)) as pool:
+ # Give each putter a "chunk index": the index of the
+ # transformed chunk that we'll send to it.
+ #
+ # For primary nodes, that's just its index (primary 0 gets
+ # chunk 0, primary 1 gets chunk 1, and so on). For handoffs,
+ # we assign the chunk index of a missing primary.
+ handoff_conns = []
+ chunk_index = {}
+ for p in putters:
+ if p.node_index < len(putters):
+ chunk_index[p] = p.node_index
+ else:
+ handoff_conns.append(p)
+
+ # Note: we may have more holes than handoffs. This is okay; it
+ # just means that we failed to connect to one or more storage
+ # nodes. Holes occur when a storage node is down, in which
+ # case the connection is not replaced, and when a storage node
+ # returns 507, in which case a handoff is used to replace it.
+ holes = [x for x in range(len(putters))
+ if x not in chunk_index.values()]
+ handoff_conns.sort(key=operator.attrgetter('node_index'))
+ for hole, p in zip(holes, handoff_conns):
+ chunk_index[p] = hole
+
for putter in putters:
putter.spawn_sender_greenthread(
pool, self.app.put_queue_depth, self.app.node_timeout,
@@ -954,10 +997,10 @@ class ObjectController(Controller):
for putter in putters:
trail_md = policy.trailing_metadata(
etag_hasher, bytes_transferred,
- putter.index)
+ chunk_index[putter])
if not policy.stores_objects_verbatim:
trail_md['Etag'] = chunk_hashers[
- putter.index].hexdigest()
+ putter.hshr_index].hexdigest()
putter.end_of_object_data(trail_md)
break
bytes_transferred += len(chunk)
diff --git a/swift/proxy/server.py b/swift/proxy/server.py
index b36a531..9674383 100644
--- a/swift/proxy/server.py
+++ b/swift/proxy/server.py
@@ -398,7 +398,7 @@ class Application(object):
return timing if expires > now else -1.0
nodes.sort(key=key_func)
elif self.sorting_method == 'affinity':
- nodes.sort(key=self.read_affinity_sort_key)
+ nodes.sort(self.read_affinity_sort_key)
return nodes
def set_node_timing(self, node, timing):
@@ -470,9 +470,9 @@ class Application(object):
def iter_nodes(self, ring, partition, node_iter=None):
"""
Yields nodes for a ring partition, skipping over error
- limited nodes and stopping at the configurable number of
- nodes. If a node yielded subsequently gets error limited, an
- extra node will be yielded to take its place.
+ limited nodes and stopping at the configurable number of nodes. If a
+ node yielded subsequently gets error limited, an extra node will be
+ yielded to take its place.
Note that if you're going to iterate over this concurrently from
multiple greenthreads, you'll want to use a
@@ -488,8 +488,9 @@ class Application(object):
"""
part_nodes = ring.get_part_nodes(partition)
if node_iter is None:
- node_iter = itertools.chain(part_nodes,
- ring.get_more_nodes(partition))
+ node_iter = itertools.chain(
+ part_nodes, [dict(node, index=i) for i, node in enumerate(
+ ring.get_more_nodes(partition), start=len(part_nodes))])
num_primary_nodes = len(part_nodes)
# Use of list() here forcibly yanks the first N nodes (the primary
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment