Created
December 15, 2014 14:28
-
-
Save zhouyuan/3f6706479384dde0be9b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py | |
index daad23f..11618a3 100644 | |
--- a/swift/common/ring/ring.py | |
+++ b/swift/common/ring/ring.py | |
@@ -243,7 +243,7 @@ class Ring(object): | |
if dev_id not in seen_ids: | |
part_nodes.append(self.devs[dev_id]) | |
seen_ids.add(dev_id) | |
- return part_nodes | |
+ return [dict(node, index=i) for i, node in enumerate(part_nodes)] | |
def get_part(self, account, container=None, obj=None): | |
""" | |
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py | |
index 7d42857..553dff6 100644 | |
--- a/swift/proxy/controllers/obj.py | |
+++ b/swift/proxy/controllers/obj.py | |
@@ -26,6 +26,7 @@ | |
import itertools | |
import mimetypes | |
+import operator | |
import time | |
import math | |
import random | |
@@ -108,6 +109,7 @@ class Putter(object): | |
self.path = path | |
self.connect_duration = connect_duration | |
self.chunked = chunked | |
+ self.node_index = node['index'] | |
self.mime_boundary = mime_boundary | |
self.failed = False | |
@@ -235,8 +237,8 @@ class Putter(object): | |
self.queue.task_done() | |
@classmethod | |
- def connect(cls, node, part, path, headers, conn_timeout, node_timeout, | |
- chunked=False, need_metadata_footer=False): | |
+ def connect(cls, node, part, path, headers, conn_timeout, | |
+ node_timeout, chunked=False, need_metadata_footer=False): | |
""" | |
Connect to a backend node and send the headers. | |
@@ -357,6 +359,18 @@ class ObjectController(Controller): | |
that the request will be serviced by local object servers first, but | |
nonlocal ones will be employed if not enough local ones are available. | |
+ Note that the original indices are preserved; that is, if this moves a | |
+ handoff forward in the chain, the result might look like | |
+ | |
+ [(3, <handoff node (local)>), | |
+ (0, <primary node (local)>), | |
+ (2, <primary node (local)>), | |
+ (1, <primary node (nonlocal)>, | |
+ ... rest of the handoffs ...] | |
+ | |
+ This lets erasure-code fragment archives wind up on the right local | |
+ primary nodes when possible, or end up on local handoffs. | |
+ | |
:param ring: ring to get nodes from | |
:param partition: ring partition to yield nodes for | |
""" | |
@@ -368,14 +382,18 @@ class ObjectController(Controller): | |
primary_nodes = ring.get_part_nodes(partition) | |
num_locals = self.app.write_affinity_node_count(len(primary_nodes)) | |
- all_nodes = itertools.chain(primary_nodes, | |
- ring.get_more_nodes(partition)) | |
+ all_nodes = itertools.chain( | |
+ primary_nodes, | |
+ [dict(node, index=i) for i, node in enumerate( | |
+ ring.get_more_nodes(partition), start=len(primary_nodes))]) | |
first_n_local_nodes = list(itertools.islice( | |
itertools.ifilter(is_local, all_nodes), num_locals)) | |
# refresh it; it moved when we computed first_n_local_nodes | |
- all_nodes = itertools.chain(primary_nodes, | |
- ring.get_more_nodes(partition)) | |
+ all_nodes = itertools.chain( | |
+ primary_nodes, | |
+ [dict(node, index=i) for i, node in enumerate( | |
+ ring.get_more_nodes(partition), start=len(primary_nodes))]) | |
local_first_node_iter = itertools.chain( | |
first_n_local_nodes, | |
itertools.ifilter(lambda node: node not in first_n_local_nodes, | |
@@ -881,9 +899,9 @@ class ObjectController(Controller): | |
chunk_hashers = [None] * len(nodes) | |
for i, p in enumerate(pile): | |
if p: | |
- p.index = i | |
putters.append(p) | |
- chunk_hashers[i] = ( | |
+ p.hshr_index = i | |
+ chunk_hashers[p.hshr_index] = ( | |
None if policy.stores_objects_verbatim else md5()) | |
if req.if_none_match is not None and '*' in req.if_none_match: | |
@@ -916,10 +934,10 @@ class ObjectController(Controller): | |
return | |
for putter in list(putters): | |
- backend_chunk = backend_chunks[putter.index] | |
+ backend_chunk = backend_chunks[chunk_index[putter]] | |
if not putter.failed: | |
- if chunk_hashers[putter.index]: | |
- chunk_hashers[putter.index].update(backend_chunk) | |
+ if chunk_hashers[putter.hshr_index]: | |
+ chunk_hashers[putter.hshr_index].update(backend_chunk) | |
putter.send_chunk(backend_chunk) | |
else: | |
putters.remove(putter) | |
@@ -932,6 +950,31 @@ class ObjectController(Controller): | |
try: | |
with ContextPool(len(putters)) as pool: | |
+ # Give each putter a "chunk index": the index of the | |
+ # transformed chunk that we'll send to it. | |
+ # | |
+ # For primary nodes, that's just its index (primary 0 gets | |
+ # chunk 0, primary 1 gets chunk 1, and so on). For handoffs, | |
+ # we assign the chunk index of a missing primary. | |
+ handoff_conns = [] | |
+ chunk_index = {} | |
+ for p in putters: | |
+ if p.node_index < len(putters): | |
+ chunk_index[p] = p.node_index | |
+ else: | |
+ handoff_conns.append(p) | |
+ | |
+ # Note: we may have more holes than handoffs. This is okay; it | |
+ # just means that we failed to connect to one or more storage | |
+ # nodes. Holes occur when a storage node is down, in which | |
+ # case the connection is not replaced, and when a storage node | |
+ # returns 507, in which case a handoff is used to replace it. | |
+ holes = [x for x in range(len(putters)) | |
+ if x not in chunk_index.values()] | |
+ handoff_conns.sort(key=operator.attrgetter('node_index')) | |
+ for hole, p in zip(holes, handoff_conns): | |
+ chunk_index[p] = hole | |
+ | |
for putter in putters: | |
putter.spawn_sender_greenthread( | |
pool, self.app.put_queue_depth, self.app.node_timeout, | |
@@ -954,10 +997,10 @@ class ObjectController(Controller): | |
for putter in putters: | |
trail_md = policy.trailing_metadata( | |
etag_hasher, bytes_transferred, | |
- putter.index) | |
+ chunk_index[putter]) | |
if not policy.stores_objects_verbatim: | |
trail_md['Etag'] = chunk_hashers[ | |
- putter.index].hexdigest() | |
+ putter.hshr_index].hexdigest() | |
putter.end_of_object_data(trail_md) | |
break | |
bytes_transferred += len(chunk) | |
diff --git a/swift/proxy/server.py b/swift/proxy/server.py | |
index b36a531..9674383 100644 | |
--- a/swift/proxy/server.py | |
+++ b/swift/proxy/server.py | |
@@ -398,7 +398,7 @@ class Application(object): | |
return timing if expires > now else -1.0 | |
nodes.sort(key=key_func) | |
elif self.sorting_method == 'affinity': | |
- nodes.sort(key=self.read_affinity_sort_key) | |
+ nodes.sort(self.read_affinity_sort_key) | |
return nodes | |
def set_node_timing(self, node, timing): | |
@@ -470,9 +470,9 @@ class Application(object): | |
def iter_nodes(self, ring, partition, node_iter=None): | |
""" | |
Yields nodes for a ring partition, skipping over error | |
- limited nodes and stopping at the configurable number of | |
- nodes. If a node yielded subsequently gets error limited, an | |
- extra node will be yielded to take its place. | |
+ limited nodes and stopping at the configurable number of nodes. If a | |
+ node yielded subsequently gets error limited, an extra node will be | |
+ yielded to take its place. | |
Note that if you're going to iterate over this concurrently from | |
multiple greenthreads, you'll want to use a | |
@@ -488,8 +488,9 @@ class Application(object): | |
""" | |
part_nodes = ring.get_part_nodes(partition) | |
if node_iter is None: | |
- node_iter = itertools.chain(part_nodes, | |
- ring.get_more_nodes(partition)) | |
+ node_iter = itertools.chain( | |
+ part_nodes, [dict(node, index=i) for i, node in enumerate( | |
+ ring.get_more_nodes(partition), start=len(part_nodes))]) | |
num_primary_nodes = len(part_nodes) | |
# Use of list() here forcibly yanks the first N nodes (the primary |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment