Created
August 19, 2015 21:00
-
-
Save clayg/e4c54ff9ab1331cc67d3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
commit 363f242ce0860224ee945daa187e9207c679af76 | |
Author: Clay Gerrard <clay.gerrard@gmail.com> | |
Date: Wed Aug 19 13:59:53 2015 -0700 | |
wip: fixups for paul | |
Change-Id: I93d80055a8dc7d700743dffae48303c83fbdc3d5 | |
diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py | |
index 554469c..1df83f6 100644 | |
--- a/swift/proxy/controllers/base.py | |
+++ b/swift/proxy/controllers/base.py | |
@@ -28,6 +28,7 @@ import os | |
import time | |
import functools | |
import inspect | |
+import itertools | |
import operator | |
from sys import exc_info | |
from swift import gettext_ as _ | |
@@ -1120,6 +1121,99 @@ class GetOrHeadHandler(ResumingGetter): | |
return res | |
+class NodeIter(object): | |
+ """ | |
+ Yields nodes for a ring partition, skipping over error | |
+ limited nodes and stopping at the configurable number of nodes. If a | |
+ node yielded subsequently gets error limited, an extra node will be | |
+ yielded to take its place. | |
+ | |
+ Note that if you're going to iterate over this concurrently from | |
+ multiple greenthreads, you'll want to use a | |
+ swift.common.utils.GreenthreadSafeIterator to serialize access. | |
+ Otherwise, you may get ValueErrors from concurrent access. (You also | |
+ may not, depending on how logging is configured, the vagaries of | |
+ socket IO and eventlet, and the phase of the moon.) | |
+ | |
+ :param app: a proxy app | |
+ :param ring: ring to get yield nodes from | |
+ :param partition: ring partition to yield nodes for | |
+ :param node_iter: optional iterable of nodes to try. Useful if you | |
+ want to filter or reorder the nodes. | |
+ """ | |
+ | |
+ def __init__(self, app, ring, partition, node_iter=None): | |
+ self.app = app | |
+ self.ring = ring | |
+ self.partition = partition | |
+ | |
+ part_nodes = ring.get_part_nodes(partition) | |
+ if node_iter is None: | |
+ node_iter = itertools.chain( | |
+ part_nodes, ring.get_more_nodes(partition)) | |
+ num_primary_nodes = len(part_nodes) | |
+ self.nodes_left = self.app.request_node_count(num_primary_nodes) | |
+ self.expected_handoffs = self.nodes_left - num_primary_nodes | |
+ | |
+ # Use of list() here forcibly yanks the first N nodes (the primary | |
+ # nodes) from node_iter, so the rest of its values are handoffs. | |
+ self.primary_nodes = self.app.sort_nodes( | |
+ list(itertools.islice(node_iter, num_primary_nodes))) | |
+ self.handoff_iter = node_iter | |
+ | |
+ def __iter__(self): | |
+ self._node_iter = self._node_gen() | |
+ return self | |
+ | |
+ def log_handoffs(self, handoffs): | |
+ """ | |
+ Log handoff requests if handoff logging is enabled and the | |
+ handoff was not expected. | |
+ | |
+ We only log handoffs when we've pushed the handoff count further | |
+ than we would normally have expected under normal circumstances, | |
+ that is (request_node_count - num_primaries), when handoffs goes | |
+ higher than that it means one of the primaries must have been | |
+ skipped because of error limiting before we consumed all of our | |
+ nodes_left. | |
+ """ | |
+ if not self.app.log_handoffs: | |
+ return | |
+ extra_handoffs = handoffs - self.expected_handoffs | |
+ if extra_handoffs > 0: | |
+ self.logger.increment('handoff_count') | |
+ self.logger.warning( | |
+ 'Handoff requested (%d)' % handoffs) | |
+ if (extra_handoffs == len(self.primary_nodes)): | |
+ # all the primaries were skipped, and handoffs didn't help | |
+ self.logger.increment('handoff_all_count') | |
+ | |
+ def _node_gen(self): | |
+ for node in self.primary_nodes: | |
+ if not self.app.error_limited(node): | |
+ yield node | |
+ if not self.app.error_limited(node): | |
+ self.nodes_left -= 1 | |
+ if self.nodes_left <= 0: | |
+ return | |
+ handoffs = 0 | |
+ for node in self.handoff_iter: | |
+ if not self.app.error_limited(node): | |
+ handoffs += 1 | |
+ self.log_handoffs(handoffs) | |
+ yield node | |
+ if not self.app.error_limited(node): | |
+ self.nodes_left -= 1 | |
+ if self.nodes_left <= 0: | |
+ return | |
+ | |
+ def next(self): | |
+ return next(self._node_iter) | |
+ | |
+ def __next__(self): | |
+ return self.next() | |
+ | |
+ | |
class Controller(object): | |
"""Base WSGI controller class for the proxy""" | |
server_type = 'Base' | |
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py | |
index 928d6a0..921898d 100644 | |
--- a/swift/proxy/controllers/obj.py | |
+++ b/swift/proxy/controllers/obj.py | |
@@ -1951,50 +1951,54 @@ class ECObjectController(BaseObjectController): | |
orig_range = req.range | |
range_specs = self._convert_range(req, policy) | |
- node_iter = GreenthreadSafeIterator(node_iter) | |
+ safe_iter = GreenthreadSafeIterator(node_iter) | |
min_gets = policy.ec_ndata | |
with ContextPool(min_gets) as pool: | |
pile = GreenAsyncPile(pool) | |
for _junk in range(min_gets): | |
pile.spawn(self._fragment_GET_request, | |
- req, node_iter, partition, | |
+ req, safe_iter, partition, | |
policy) | |
- gets = list(pile) | |
- good_gets = [] | |
bad_gets = [] | |
- etag_buckets = {} | |
- max_bucket = {} | |
- for get, parts_iter in gets: | |
- if is_success(get.last_status): | |
- good_gets.append((get, parts_iter)) | |
+ etag_buckets = collections.defaultdict(list) | |
+ best_etag = None | |
+ while True: | |
+ for get, parts_iter in pile: | |
+ if not is_success(get.last_status): | |
+ # these are almost *always* some kind of client | |
+ # error, 404's, 503's and Timeouts are all hidden | |
+ # down in the ResumingGetter spawned by | |
+ # _fragment_GET_request. | |
+ bad_gets.append((get, parts_iter)) | |
+ continue | |
etag = HeaderKeyDict( | |
get.last_headers)['X-Object-Sysmeta-Ec-Etag'] | |
- etag_buckets.setdefault(etag, []). \ | |
- append((get, parts_iter)) | |
- if len(etag_buckets[etag]) > len(max_bucket): | |
- max_bucket = etag_buckets[etag] | |
- if len(good_gets) >= min_gets: | |
- # we've got enough good responses but we need to | |
- # make sure they are from the same etag set and | |
- # if not spawn more getters to the parity nodes | |
- # to get some more | |
- if len(max_bucket) < min_gets: | |
- pile.spawn(self._fragment_GET_request, req, | |
- node_iter, partition, policy) | |
- gets += list(pile) | |
- # NOTE: it may not be obvious but we eventually | |
- # exit this loop (if not big enough buckets) when | |
- # we stop spawning above because node_iter runs out, | |
- # and then 'gets' runs out... | |
- else: | |
- bad_gets.append((get, parts_iter)) | |
+ etag_buckets[etag].append((get, parts_iter)) | |
+ if (etag != best_etag and len(etag_buckets[etag]) > | |
+ len(etag_buckets[best_etag])): | |
+ best_etag = etag | |
+ | |
+ num_resp = max(len(bad_gets), | |
+ len(etag_buckets[best_etag])) | |
+ if num_resp >= min_gets: | |
+ break | |
+ # we've catagorized all of the responses we started, | |
+ # but there doesn't seem to be any concensus on | |
+ # error or etag; as long as the node_iter has | |
+ # nodes_left we'll spawn some more. | |
+ need_more = min_gets - num_resp | |
+ if not node_iter.nodes_left >= need_more: | |
+ break | |
+ for i in range(need_more): | |
+ pile.spawn(self._fragment_GET_request, req, | |
+ safe_iter, partition, policy) | |
req.range = orig_range | |
- if len(max_bucket) == min_gets: | |
+ if len(etag_buckets[best_etag]) >= min_gets: | |
# headers can come from any of the getters | |
resp_headers = HeaderKeyDict( | |
- max_bucket[0][0].source_headers[-1]) | |
+ etag_buckets[best_etag][0][0].source_headers[-1]) | |
resp_headers.pop('Content-Range', None) | |
eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length') | |
obj_length = int(eccl) if eccl is not None else None | |
@@ -2005,7 +2009,8 @@ class ECObjectController(BaseObjectController): | |
app_iter = ECAppIter( | |
req.swift_entity_path, | |
policy, | |
- [iterator for getter, iterator in max_bucket], | |
+ [iterator for getter, iterator in | |
+ etag_buckets[best_etag]], | |
range_specs, fa_length, obj_length, | |
self.app.logger) | |
resp = Response( | |
diff --git a/swift/proxy/server.py b/swift/proxy/server.py | |
index d55dcda..7db40ef 100644 | |
--- a/swift/proxy/server.py | |
+++ b/swift/proxy/server.py | |
@@ -19,7 +19,6 @@ import socket | |
from swift import gettext_ as _ | |
from random import shuffle | |
from time import time | |
-import itertools | |
import functools | |
import sys | |
@@ -36,7 +35,7 @@ from swift.common.utils import cache_from_env, get_logger, \ | |
from swift.common.constraints import check_utf8, valid_api_version | |
from swift.proxy.controllers import AccountController, ContainerController, \ | |
ObjectControllerRouter, InfoController | |
-from swift.proxy.controllers.base import get_container_info | |
+from swift.proxy.controllers.base import get_container_info, NodeIter | |
from swift.common.swob import HTTPBadRequest, HTTPForbidden, \ | |
HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \ | |
HTTPServerError, HTTPException, Request, HTTPServiceUnavailable | |
@@ -500,60 +499,7 @@ class Application(object): | |
'port': node['port'], 'device': node['device']}) | |
def iter_nodes(self, ring, partition, node_iter=None): | |
- """ | |
- Yields nodes for a ring partition, skipping over error | |
- limited nodes and stopping at the configurable number of nodes. If a | |
- node yielded subsequently gets error limited, an extra node will be | |
- yielded to take its place. | |
- | |
- Note that if you're going to iterate over this concurrently from | |
- multiple greenthreads, you'll want to use a | |
- swift.common.utils.GreenthreadSafeIterator to serialize access. | |
- Otherwise, you may get ValueErrors from concurrent access. (You also | |
- may not, depending on how logging is configured, the vagaries of | |
- socket IO and eventlet, and the phase of the moon.) | |
- | |
- :param ring: ring to get yield nodes from | |
- :param partition: ring partition to yield nodes for | |
- :param node_iter: optional iterable of nodes to try. Useful if you | |
- want to filter or reorder the nodes. | |
- """ | |
- part_nodes = ring.get_part_nodes(partition) | |
- if node_iter is None: | |
- node_iter = itertools.chain(part_nodes, | |
- ring.get_more_nodes(partition)) | |
- num_primary_nodes = len(part_nodes) | |
- | |
- # Use of list() here forcibly yanks the first N nodes (the primary | |
- # nodes) from node_iter, so the rest of its values are handoffs. | |
- primary_nodes = self.sort_nodes( | |
- list(itertools.islice(node_iter, num_primary_nodes))) | |
- handoff_nodes = node_iter | |
- nodes_left = self.request_node_count(len(primary_nodes)) | |
- | |
- log_handoffs_threshold = nodes_left - len(primary_nodes) | |
- for node in primary_nodes: | |
- if not self.error_limited(node): | |
- yield node | |
- if not self.error_limited(node): | |
- nodes_left -= 1 | |
- if nodes_left <= 0: | |
- return | |
- handoffs = 0 | |
- for node in handoff_nodes: | |
- if not self.error_limited(node): | |
- handoffs += 1 | |
- if self.log_handoffs and handoffs > log_handoffs_threshold: | |
- self.logger.increment('handoff_count') | |
- self.logger.warning( | |
- 'Handoff requested (%d)' % handoffs) | |
- if handoffs - log_handoffs_threshold == len(primary_nodes): | |
- self.logger.increment('handoff_all_count') | |
- yield node | |
- if not self.error_limited(node): | |
- nodes_left -= 1 | |
- if nodes_left <= 0: | |
- return | |
+ return NodeIter(self, ring, partition, node_iter=node_iter) | |
def exception_occurred(self, node, typ, additional_info, | |
**kwargs): | |
diff --git a/test/probe/common.py b/test/probe/common.py | |
index d7bbe0e..8b7623b 100644 | |
--- a/test/probe/common.py | |
+++ b/test/probe/common.py | |
@@ -283,7 +283,7 @@ class Body(object): | |
def read(self, amount): | |
if len(self.buff) < amount: | |
try: | |
- self.buff = next(self) | |
+ self.buff += next(self) | |
except StopIteration: | |
pass | |
rv, self.buff = self.buff[:amount], self.buff[amount:] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment