Skip to content

Instantly share code, notes, and snippets.

@clayg
Created August 19, 2015 21:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save clayg/e4c54ff9ab1331cc67d3 to your computer and use it in GitHub Desktop.
Save clayg/e4c54ff9ab1331cc67d3 to your computer and use it in GitHub Desktop.
commit 363f242ce0860224ee945daa187e9207c679af76
Author: Clay Gerrard <clay.gerrard@gmail.com>
Date: Wed Aug 19 13:59:53 2015 -0700
wip: fixups for paul
Change-Id: I93d80055a8dc7d700743dffae48303c83fbdc3d5
diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py
index 554469c..1df83f6 100644
--- a/swift/proxy/controllers/base.py
+++ b/swift/proxy/controllers/base.py
@@ -28,6 +28,7 @@ import os
import time
import functools
import inspect
+import itertools
import operator
from sys import exc_info
from swift import gettext_ as _
@@ -1120,6 +1121,99 @@ class GetOrHeadHandler(ResumingGetter):
return res
+class NodeIter(object):
+ """
+ Yields nodes for a ring partition, skipping over error
+ limited nodes and stopping at the configurable number of nodes. If a
+ node yielded subsequently gets error limited, an extra node will be
+ yielded to take its place.
+
+ Note that if you're going to iterate over this concurrently from
+ multiple greenthreads, you'll want to use a
+ swift.common.utils.GreenthreadSafeIterator to serialize access.
+ Otherwise, you may get ValueErrors from concurrent access. (You also
+ may not, depending on how logging is configured, the vagaries of
+ socket IO and eventlet, and the phase of the moon.)
+
+ :param app: a proxy app
+ :param ring: ring to get yield nodes from
+ :param partition: ring partition to yield nodes for
+ :param node_iter: optional iterable of nodes to try. Useful if you
+ want to filter or reorder the nodes.
+ """
+
+ def __init__(self, app, ring, partition, node_iter=None):
+ self.app = app
+ self.ring = ring
+ self.partition = partition
+
+ part_nodes = ring.get_part_nodes(partition)
+ if node_iter is None:
+ node_iter = itertools.chain(
+ part_nodes, ring.get_more_nodes(partition))
+ num_primary_nodes = len(part_nodes)
+ self.nodes_left = self.app.request_node_count(num_primary_nodes)
+ self.expected_handoffs = self.nodes_left - num_primary_nodes
+
+ # Use of list() here forcibly yanks the first N nodes (the primary
+ # nodes) from node_iter, so the rest of its values are handoffs.
+ self.primary_nodes = self.app.sort_nodes(
+ list(itertools.islice(node_iter, num_primary_nodes)))
+ self.handoff_iter = node_iter
+
+ def __iter__(self):
+ self._node_iter = self._node_gen()
+ return self
+
+ def log_handoffs(self, handoffs):
+ """
+ Log handoff requests if handoff logging is enabled and the
+ handoff was not expected.
+
+ We only log handoffs when we've pushed the handoff count further
+ than we would normally have expected under normal circumstances,
+ that is (request_node_count - num_primaries), when handoffs goes
+ higher than that it means one of the primaries must have been
+ skipped because of error limiting before we consumed all of our
+ nodes_left.
+ """
+ if not self.app.log_handoffs:
+ return
+ extra_handoffs = handoffs - self.expected_handoffs
+ if extra_handoffs > 0:
+ self.logger.increment('handoff_count')
+ self.logger.warning(
+ 'Handoff requested (%d)' % handoffs)
+ if (extra_handoffs == len(self.primary_nodes)):
+ # all the primaries were skipped, and handoffs didn't help
+ self.logger.increment('handoff_all_count')
+
+ def _node_gen(self):
+ for node in self.primary_nodes:
+ if not self.app.error_limited(node):
+ yield node
+ if not self.app.error_limited(node):
+ self.nodes_left -= 1
+ if self.nodes_left <= 0:
+ return
+ handoffs = 0
+ for node in self.handoff_iter:
+ if not self.app.error_limited(node):
+ handoffs += 1
+ self.log_handoffs(handoffs)
+ yield node
+ if not self.app.error_limited(node):
+ self.nodes_left -= 1
+ if self.nodes_left <= 0:
+ return
+
+ def next(self):
+ return next(self._node_iter)
+
+ def __next__(self):
+ return self.next()
+
+
class Controller(object):
"""Base WSGI controller class for the proxy"""
server_type = 'Base'
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py
index 928d6a0..921898d 100644
--- a/swift/proxy/controllers/obj.py
+++ b/swift/proxy/controllers/obj.py
@@ -1951,50 +1951,54 @@ class ECObjectController(BaseObjectController):
orig_range = req.range
range_specs = self._convert_range(req, policy)
- node_iter = GreenthreadSafeIterator(node_iter)
+ safe_iter = GreenthreadSafeIterator(node_iter)
min_gets = policy.ec_ndata
with ContextPool(min_gets) as pool:
pile = GreenAsyncPile(pool)
for _junk in range(min_gets):
pile.spawn(self._fragment_GET_request,
- req, node_iter, partition,
+ req, safe_iter, partition,
policy)
- gets = list(pile)
- good_gets = []
bad_gets = []
- etag_buckets = {}
- max_bucket = {}
- for get, parts_iter in gets:
- if is_success(get.last_status):
- good_gets.append((get, parts_iter))
+ etag_buckets = collections.defaultdict(list)
+ best_etag = None
+ while True:
+ for get, parts_iter in pile:
+ if not is_success(get.last_status):
+ # these are almost *always* some kind of client
+ # error, 404's, 503's and Timeouts are all hidden
+ # down in the ResumingGetter spawned by
+ # _fragment_GET_request.
+ bad_gets.append((get, parts_iter))
+ continue
etag = HeaderKeyDict(
get.last_headers)['X-Object-Sysmeta-Ec-Etag']
- etag_buckets.setdefault(etag, []). \
- append((get, parts_iter))
- if len(etag_buckets[etag]) > len(max_bucket):
- max_bucket = etag_buckets[etag]
- if len(good_gets) >= min_gets:
- # we've got enough good responses but we need to
- # make sure they are from the same etag set and
- # if not spawn more getters to the parity nodes
- # to get some more
- if len(max_bucket) < min_gets:
- pile.spawn(self._fragment_GET_request, req,
- node_iter, partition, policy)
- gets += list(pile)
- # NOTE: it may not be obvious but we eventually
- # exit this loop (if not big enough buckets) when
- # we stop spawning above because node_iter runs out,
- # and then 'gets' runs out...
- else:
- bad_gets.append((get, parts_iter))
+ etag_buckets[etag].append((get, parts_iter))
+ if (etag != best_etag and len(etag_buckets[etag]) >
+ len(etag_buckets[best_etag])):
+ best_etag = etag
+
+ num_resp = max(len(bad_gets),
+ len(etag_buckets[best_etag]))
+ if num_resp >= min_gets:
+ break
+ # we've catagorized all of the responses we started,
+ # but there doesn't seem to be any concensus on
+ # error or etag; as long as the node_iter has
+ # nodes_left we'll spawn some more.
+ need_more = min_gets - num_resp
+ if not node_iter.nodes_left >= need_more:
+ break
+ for i in range(need_more):
+ pile.spawn(self._fragment_GET_request, req,
+ safe_iter, partition, policy)
req.range = orig_range
- if len(max_bucket) == min_gets:
+ if len(etag_buckets[best_etag]) >= min_gets:
# headers can come from any of the getters
resp_headers = HeaderKeyDict(
- max_bucket[0][0].source_headers[-1])
+ etag_buckets[best_etag][0][0].source_headers[-1])
resp_headers.pop('Content-Range', None)
eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length')
obj_length = int(eccl) if eccl is not None else None
@@ -2005,7 +2009,8 @@ class ECObjectController(BaseObjectController):
app_iter = ECAppIter(
req.swift_entity_path,
policy,
- [iterator for getter, iterator in max_bucket],
+ [iterator for getter, iterator in
+ etag_buckets[best_etag]],
range_specs, fa_length, obj_length,
self.app.logger)
resp = Response(
diff --git a/swift/proxy/server.py b/swift/proxy/server.py
index d55dcda..7db40ef 100644
--- a/swift/proxy/server.py
+++ b/swift/proxy/server.py
@@ -19,7 +19,6 @@ import socket
from swift import gettext_ as _
from random import shuffle
from time import time
-import itertools
import functools
import sys
@@ -36,7 +35,7 @@ from swift.common.utils import cache_from_env, get_logger, \
from swift.common.constraints import check_utf8, valid_api_version
from swift.proxy.controllers import AccountController, ContainerController, \
ObjectControllerRouter, InfoController
-from swift.proxy.controllers.base import get_container_info
+from swift.proxy.controllers.base import get_container_info, NodeIter
from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \
HTTPServerError, HTTPException, Request, HTTPServiceUnavailable
@@ -500,60 +499,7 @@ class Application(object):
'port': node['port'], 'device': node['device']})
def iter_nodes(self, ring, partition, node_iter=None):
- """
- Yields nodes for a ring partition, skipping over error
- limited nodes and stopping at the configurable number of nodes. If a
- node yielded subsequently gets error limited, an extra node will be
- yielded to take its place.
-
- Note that if you're going to iterate over this concurrently from
- multiple greenthreads, you'll want to use a
- swift.common.utils.GreenthreadSafeIterator to serialize access.
- Otherwise, you may get ValueErrors from concurrent access. (You also
- may not, depending on how logging is configured, the vagaries of
- socket IO and eventlet, and the phase of the moon.)
-
- :param ring: ring to get yield nodes from
- :param partition: ring partition to yield nodes for
- :param node_iter: optional iterable of nodes to try. Useful if you
- want to filter or reorder the nodes.
- """
- part_nodes = ring.get_part_nodes(partition)
- if node_iter is None:
- node_iter = itertools.chain(part_nodes,
- ring.get_more_nodes(partition))
- num_primary_nodes = len(part_nodes)
-
- # Use of list() here forcibly yanks the first N nodes (the primary
- # nodes) from node_iter, so the rest of its values are handoffs.
- primary_nodes = self.sort_nodes(
- list(itertools.islice(node_iter, num_primary_nodes)))
- handoff_nodes = node_iter
- nodes_left = self.request_node_count(len(primary_nodes))
-
- log_handoffs_threshold = nodes_left - len(primary_nodes)
- for node in primary_nodes:
- if not self.error_limited(node):
- yield node
- if not self.error_limited(node):
- nodes_left -= 1
- if nodes_left <= 0:
- return
- handoffs = 0
- for node in handoff_nodes:
- if not self.error_limited(node):
- handoffs += 1
- if self.log_handoffs and handoffs > log_handoffs_threshold:
- self.logger.increment('handoff_count')
- self.logger.warning(
- 'Handoff requested (%d)' % handoffs)
- if handoffs - log_handoffs_threshold == len(primary_nodes):
- self.logger.increment('handoff_all_count')
- yield node
- if not self.error_limited(node):
- nodes_left -= 1
- if nodes_left <= 0:
- return
+ return NodeIter(self, ring, partition, node_iter=node_iter)
def exception_occurred(self, node, typ, additional_info,
**kwargs):
diff --git a/test/probe/common.py b/test/probe/common.py
index d7bbe0e..8b7623b 100644
--- a/test/probe/common.py
+++ b/test/probe/common.py
@@ -283,7 +283,7 @@ class Body(object):
def read(self, amount):
if len(self.buff) < amount:
try:
- self.buff = next(self)
+ self.buff += next(self)
except StopIteration:
pass
rv, self.buff = self.buff[:amount], self.buff[amount:]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment