Skip to content

Instantly share code, notes, and snippets.

@mkrizek
Created August 1, 2022 13:33
Show Gist options
  • Save mkrizek/68aba4e74f74ff5fa34dba41f042af11 to your computer and use it in GitHub Desktop.
Save mkrizek/68aba4e74f74ff5fa34dba41f042af11 to your computer and use it in GitHub Desktop.
diff --git a/lib/ansible/executor/play_iterator.py b/lib/ansible/executor/play_iterator.py
index 51799cc7a4e..c9712500249 100644
--- a/lib/ansible/executor/play_iterator.py
+++ b/lib/ansible/executor/play_iterator.py
@@ -21,7 +21,6 @@ __metaclass__ = type
import fnmatch
-from collections import deque
from enum import IntEnum, IntFlag
from ansible import constants as C
@@ -57,14 +56,17 @@ class FailedStates(IntFlag):
class HostState:
- def __init__(self, blocks):
+ def __init__(self, blocks, handlers=None):
self._blocks = blocks[:]
- self._notified_handlers = deque()
+ if handlers is None:
+ handlers = []
+ self._handlers = handlers[:]
self.cur_block = 0
self.cur_regular_task = 0
self.cur_rescue_task = 0
self.cur_always_task = 0
+ self.cur_handler_task = 0
self.run_state = IteratingStates.SETUP
self.fail_state = FailedStates.NONE
self.pre_flushing_run_state = None
@@ -76,14 +78,13 @@ class HostState:
self.did_rescue = False
self.did_start_at_task = False
- self._handlers_sorted = False
+ self.update_handlers = True
def __repr__(self):
- return "HostState(%r, %r)" % (self._blocks, self._notified_handlers)
+ return "HostState(%r, %r)" % (self._blocks)
def __str__(self):
return ("HOST STATE: block=%d, task=%d, rescue=%d, always=%d, run_state=%s, fail_state=%s, pre_flushing_run_state=%s, pending_setup=%s, "
- "handlers_sorted=%s, "
"tasks child state? (%s), rescue child state? (%s), always child state? (%s), handlers child state? (%s), "
"did rescue? %s, did start at task? %s" % (
self.cur_block,
@@ -94,7 +95,6 @@ class HostState:
self.fail_state,
self.pre_flushing_run_state,
self.pending_setup,
- self._handlers_sorted,
self.tasks_child_state,
self.rescue_child_state,
self.always_child_state,
@@ -107,43 +107,25 @@ class HostState:
if not isinstance(other, HostState):
return False
- for attr in ('_blocks', '_notified_handlers',
+ for attr in ('_blocks',
'cur_block', 'cur_regular_task', 'cur_rescue_task', 'cur_always_task',
'run_state', 'fail_state', 'pre_flushing_run_state', 'pending_setup',
- 'tasks_child_state', 'rescue_child_state', 'always_child_state', 'handlers_child_state',
- '_handlers_sorted'):
+ 'tasks_child_state', 'rescue_child_state', 'always_child_state', 'handlers_child_state'):
if getattr(self, attr) != getattr(other, attr):
return False
return True
- @property
- def handlers(self):
- return self._notified_handlers
-
def get_current_block(self):
return self._blocks[self.cur_block]
- def pop_current_handler(self):
- return self._notified_handlers.popleft()
-
- def notify_handler(self, handler):
- if handler not in self._notified_handlers:
- self._notified_handlers.append(handler)
- return True
- return False
-
- def notify_include_handler(self, handler_blocks):
- # deque.extendleft reverses order
- self._notified_handlers.extendleft(reversed(handler_blocks))
-
def copy(self):
- new_state = HostState(self._blocks)
- new_state._notified_handlers = self._notified_handlers.copy()
+ new_state = HostState(self._blocks, self._handlers)
new_state.cur_block = self.cur_block
new_state.cur_regular_task = self.cur_regular_task
new_state.cur_rescue_task = self.cur_rescue_task
new_state.cur_always_task = self.cur_always_task
+ new_state.cur_handler_task = self.cur_handler_task
new_state.run_state = self.run_state
new_state.fail_state = self.fail_state
new_state.pre_flushing_run_state = self.pre_flushing_run_state
@@ -158,7 +140,7 @@ class HostState:
new_state.always_child_state = self.always_child_state.copy()
if self.handlers_child_state is not None:
new_state.handlers_child_state = self.handlers_child_state.copy()
- new_state._handlers_sorted = self._handlers_sorted
+ new_state.update_handlers = self.update_handlers
return new_state
@@ -205,6 +187,7 @@ class PlayIterator:
if new_block.has_tasks():
self._blocks.append(new_block)
+
self._host_states = {}
start_at_matched = False
batch = inventory.get_hosts(self._play.hosts, order=self._play.order)
@@ -440,12 +423,9 @@ class PlayIterator:
state.cur_always_task += 1
elif state.run_state == IteratingStates.HANDLERS:
- if not state._handlers_sorted:
- # handlers are executed in the order they are defined, not in the order notified
- if state.handlers:
- all_handlers = [h for b in self._play.handlers for h in b.block]
- state._notified_handlers = deque((h for h in all_handlers if h in state.handlers))
- state._handlers_sorted = True
+ if state.update_handlers:
+ state._handlers = [h for b in self._play.handlers for h in b.block]
+ state.update_handlers = False
if state.handlers_child_state:
state.handlers_child_state, task = self._get_next_task_from_state(state.handlers_child_state, host=host)
@@ -460,13 +440,22 @@ class PlayIterator:
if (state.fail_state != FailedStates.NONE and
not self._play.force_handlers and
state.pre_flushing_run_state not in (IteratingStates.RESCUE, IteratingStates.ALWAYS)):
- state._handlers_sorted = False
state.run_state = IteratingStates.COMPLETE
- elif len(state.handlers) == 0:
- state._handlers_sorted = False
- state.run_state = state.pre_flushing_run_state
else:
- task = state.pop_current_handler()
+ while True:
+ try:
+ task = state._handlers[state.cur_handler_task]
+ except IndexError:
+ task = None
+ state.run_state = state.pre_flushing_run_state
+ state.cur_handler_task = 0
+ state.update_handlers = True
+ break
+ else:
+ if task.is_host_notified(host):
+ break
+ state.cur_handler_task += 1
+
if isinstance(task, Block):
# TODO allow full blocks with rescue/always for handlers
restricted_block = task.copy(exclude_parent=True, exclude_tasks=True)
@@ -498,7 +487,7 @@ class PlayIterator:
state.run_state = IteratingStates.RESCUE
elif state._blocks[state.cur_block].always:
state.run_state = IteratingStates.ALWAYS
- elif state.handlers and self._play.force_handlers:
+ elif self._play.force_handlers:
state.run_state = IteratingStates.HANDLERS
else:
state.run_state = IteratingStates.COMPLETE
@@ -509,7 +498,7 @@ class PlayIterator:
state.fail_state |= FailedStates.RESCUE
if state._blocks[state.cur_block].always:
state.run_state = IteratingStates.ALWAYS
- elif state.handlers and self._play.force_handlers:
+ elif self._play.force_handlers:
state.run_state = IteratingStates.HANDLERS
else:
state.run_state = IteratingStates.COMPLETE
@@ -518,7 +507,7 @@ class PlayIterator:
state.always_child_state = self._set_failed_state(state.always_child_state)
else:
state.fail_state |= FailedStates.ALWAYS
- if state.handlers and self._play.force_handlers:
+ if self._play.force_handlers:
state.run_state = IteratingStates.HANDLERS
else:
state.run_state = IteratingStates.COMPLETE
@@ -632,7 +621,7 @@ class PlayIterator:
if state.handlers_child_state:
state.handlers_child_state = self._insert_tasks_into_state(state.handlers_child_state, task_list)
else:
- state.notify_include_handler(task_list)
+ state._handlers[state.cur_handler_task+1:state.cur_handler_task+1] = [h for b in task_list for h in b.block]
return state
@@ -660,6 +649,3 @@ class PlayIterator:
if not isinstance(fail_state, FailedStates):
raise AnsibleAssertionError('Expected fail_state to be a FailedStates but was %s' % (type(fail_state)))
self._host_states[hostname].fail_state = fail_state
-
- def notify_handler(self, host, handler):
- return self._host_states[host.name].notify_handler(handler)
diff --git a/lib/ansible/playbook/handler.py b/lib/ansible/playbook/handler.py
index a68964215cc..9ad8c8a88c9 100644
--- a/lib/ansible/playbook/handler.py
+++ b/lib/ansible/playbook/handler.py
@@ -28,11 +28,8 @@ class Handler(Task):
listen = FieldAttribute(isa='list', default=list, listof=string_types, static=True)
- obj_cnt = 0
-
def __init__(self, block=None, role=None, task_include=None):
- self.lockstep_id = Handler.obj_cnt
- Handler.obj_cnt += 1
+ self.notified_hosts = []
self.cached_name = False
@@ -47,6 +44,15 @@ class Handler(Task):
t = Handler(block=block, role=role, task_include=task_include)
return t.load_data(data, variable_manager=variable_manager, loader=loader)
+ def notify_host(self, host):
+ if not self.is_host_notified(host):
+ self.notified_hosts.append(host)
+ return True
+ return False
+
+ def is_host_notified(self, host):
+ return host in self.notified_hosts
+
def serialize(self):
result = super(Handler, self).serialize()
result['is_handler'] = True
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py
index 903987ec008..a141774252b 100644
--- a/lib/ansible/plugins/strategy/__init__.py
+++ b/lib/ansible/plugins/strategy/__init__.py
@@ -628,7 +628,7 @@ class StrategyBase:
target_handler = search_handler_blocks_by_name(handler_name, iterator._play.handlers)
if target_handler is not None:
found = True
- if iterator.notify_handler(original_host, target_handler):
+ if target_handler.notify_host(original_host):
self._tqm.send_callback('v2_playbook_on_notify', target_handler, original_host)
for listening_handler_block in iterator._play.handlers:
@@ -645,7 +645,7 @@ class StrategyBase:
else:
found = True
- if iterator.notify_handler(original_host, listening_handler):
+ if listening_handler.notify_host(original_host):
self._tqm.send_callback('v2_playbook_on_notify', listening_handler, original_host)
# and if none were found, then we raise an error
diff --git a/lib/ansible/plugins/strategy/free.py b/lib/ansible/plugins/strategy/free.py
index a2515f8e3d7..89e80370252 100644
--- a/lib/ansible/plugins/strategy/free.py
+++ b/lib/ansible/plugins/strategy/free.py
@@ -147,6 +147,8 @@ class StrategyModule(StrategyBase):
# advance the host, mark the host blocked, and queue it
self._blocked_hosts[host_name] = True
iterator.set_state_for_host(host.name, state)
+ if isinstance(task, Handler):
+ del task.notified_hosts[task.notified_hosts.index(host)]
try:
action = action_loader.get(task.action, class_only=True, collection_list=task.collections)
@@ -267,6 +269,8 @@ class StrategyModule(StrategyBase):
for new_block in new_blocks:
if is_handler:
+ for task in new_block.block:
+ task.notified_hosts = included_file._hosts[:]
# TODO filter tags to allow tags on handlers from include_tasks: merge with the else block
# also where handlers are inserted from roles/include_role/import_role and regular handlers
final_block = new_block
diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py
index 6245a886bd1..309f5103dd5 100644
--- a/lib/ansible/plugins/strategy/linear.py
+++ b/lib/ansible/plugins/strategy/linear.py
@@ -196,9 +196,9 @@ class StrategyModule(StrategyBase):
if num_handlers:
display.debug("advancing hosts in HANDLERS")
- lowest_lockstep_id = min((
- t.lockstep_id for h, (s, t) in host_tasks_to_run
- if s.run_state != IteratingStates.COMPLETE and isinstance(t, Handler)
+ lowest_handler = min((
+ s.cur_handler_task for h, (s, t) in host_tasks_to_run
+ if s.run_state == IteratingStates.HANDLERS
))
rvals = []
for host in hosts:
@@ -208,8 +208,9 @@ class StrategyModule(StrategyBase):
(state, task) = host_state_task
if task is None:
continue
- if getattr(task, 'lockstep_id', -1) == lowest_lockstep_id and state.run_state == IteratingStates.HANDLERS:
+ if state.cur_handler_task == lowest_handler and state.run_state == IteratingStates.HANDLERS:
iterator.set_state_for_host(host.name, state)
+ del task.notified_hosts[task.notified_hosts.index(host)]
rvals.append((host, task))
else:
rvals.append((host, noop_task))
@@ -394,6 +395,8 @@ class StrategyModule(StrategyBase):
display.debug("iterating over new_blocks loaded from include file")
for new_block in new_blocks:
if is_handler:
+ for task in new_block.block:
+ task.notified_hosts = included_file._hosts[:]
# TODO filter tags to allow tags on handlers from include_tasks: merge with the else block
# also where handlers are inserted from roles/include_role/import_role and regular handlers
final_block = new_block
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment