Skip to content

Instantly share code, notes, and snippets.

@gabrielfalcao
Created May 10, 2019 10:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gabrielfalcao/3a39c4a85ef98e24184c9dff9f247cf4 to your computer and use it in GitHub Desktop.
Save gabrielfalcao/3a39c4a85ef98e24184c9dff9f247cf4 to your computer and use it in GitHub Desktop.
generic wait functions for browser testing with lettuce
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
import time
import threading
from functools import wraps
from lxml import html
from lxml.etree import XMLSyntaxError
from lettuce import world, before, after
from selenium.common.exceptions import WebDriverException
from urlparse import urlsplit
def parse_dom(step):
source = world.get_current_page_source()
try:
world.dom = html.fromstring(source)
except (ValueError, XMLSyntaxError):
pass
after.each_step(parse_dom)
before.each_step(parse_dom)
def non_blocking(func):
@wraps(func)
def wrapper(*args, **kw):
thread = threading.Thread(
target=func,
args=args,
kwargs=kw,
)
thread.start()
return thread
return wrapper
@world.absorb
def wait_for_condition(finished, timeout=5, interval=0.5, dont_fail=False):
"""polls a callable until it returns True.
arguments:
finished - a callable that takes no parameters
timeout - the timeout in seconds, [defaults to five seconds]
interval - the interval in seconds, [defaults to half second]
"""
started = time.time()
doc = finished.__doc__
if doc:
msg = 'timed out waiting for the "{0}"'.format(doc)
else:
msg = 'timed out waiting for the condition {0}'.format(finished)
while not finished():
time.sleep(interval)
now = time.time()
if (now - started) >= timeout:
if dont_fail:
break
else:
raise AssertionError(msg)
@world.absorb
def wait_for_element(selector, timeout=5, interval=0.5, dont_fail=False):
found = world.wait_for_many_elements(
selector, timeout, interval, dont_fail=dont_fail)
err = 'could not find an element through the selector: {0}'. \
format(selector)
if found is not None:
return found[0]
elif dont_fail:
return
else:
raise AssertionError(err)
@world.absorb
def wait_until_visible(selector, timeout=5, interval=0.5):
world.wait_for_many_elements(
selector, timeout, interval)
def element_become_visible():
sel = 'jQuery("{0}:visible").length'.format(selector)
return int(world.browser.evaluate_script(sel)) > 0
world.wait_for_condition(element_become_visible)
@world.absorb
def wait_until_hidden(selector, timeout=5, interval=0.5):
world.wait_for_many_elements(
selector, timeout, interval)
def element_become_hidden():
sel = 'jQuery("{0}:visible").length'.format(selector)
return int(world.browser.evaluate_script(sel)) == 0
world.wait_for_condition(element_become_hidden)
@world.absorb
def wait_for_many_elements(selector, timeout=5, interval=0.5,
for_at_least=1, dont_fail=False):
if selector.startswith("//"):
find_function = lambda x: world.dom.xpath(x)
selector_type = 'xpath'
else:
find_function = lambda x: world.dom.cssselect(x)
selector_type = 'css'
def element_is_there():
raw = world.get_current_page_source().strip()
if not raw:
return False
if raw.startswith('<?xml'):
raw = unicode(re.sub(r'[<][?]xml[^>]+[>]', '', raw))
try:
world.dom = html.fromstring(raw)
except WebDriverException:
return False
return find_function(selector)
element_is_there.__doc__ = \
"element %s to be in the DOM (not necessarily visible)" % selector
world.wait_for_condition(
element_is_there,
timeout=timeout,
interval=interval,
dont_fail=dont_fail,
)
found = find_function(selector)
if not dont_fail:
assert len(found) >= for_at_least, \
'could not find {2} occurrence(s) of the {3} selector "{0}" not ' \
'found within {1} seconds'.format(
selector, timeout, for_at_least, selector_type)
elif not found:
return None
if selector_type == 'xpath':
return list(world.browser.find_by_xpath(selector))
return list(world.browser.find_by_css(selector))
@world.absorb
def wait_for_element_to_disappear(
selector, timeout=5, interval=0.5, for_at_least=1, dont_fail=False):
if selector.startswith("//"):
find_function = lambda x: world.dom.xpath(x)
else:
find_function = lambda x: world.dom.cssselect(x)
def element_is_there():
raw = world.get_current_page_source().strip()
if not raw:
return False
if raw.startswith('<?xml'):
raw = unicode(re.sub(r'[<][?]xml[^>]+[>]', '', raw))
try:
world.dom = html.fromstring(raw)
except WebDriverException:
return False
return not find_function(selector)
element_is_there.__doc__ = \
"element %s to vanish from the DOM" % selector
world.wait_for_condition(
element_is_there,
timeout=timeout,
interval=interval,
dont_fail=dont_fail,
)
not_found = find_function(selector)
return not_found
@world.absorb
def wait_until_reach_url_containing(url, timeout=10, dont_fail=False):
err = 'did not reach the url containing "{0}" within {1} seconds. ' \
'The browser still at "{2}"'
try:
def reach_url():
return world.browser.url and url in world.browser.url
world.wait_for_condition(
reach_url, timeout=timeout, dont_fail=dont_fail)
except AssertionError:
raise AssertionError(err.format(url, timeout, world.browser.url))
return world.browser.url
@world.absorb
def wait_until_reach_url(url, timeout=10, dont_fail=False):
err = 'did not reach the url "{0}" within {1} seconds. ' \
'The browser still at "{2}"'
if not url.startswith('/'):
url = "/%s" % url
try:
world.browser.driver.switch_to_default_content()
world.wait_for_condition(
lambda: url == urlsplit(world.browser.url).path,
timeout=timeout,
dont_fail=dont_fail,
)
except AssertionError:
current = urlsplit(world.browser.url)
raise AssertionError(err.format(url, timeout, current.path))
return world.browser.url
@world.absorb
def wait_until_any_frame_url_contains(url, timeout=10, dont_fail=False):
def get_urls():
old_handle = world.browser.driver.get_current_window_handle()
all_handles = world.browser.driver.get_window_handles()
urls = []
for hwnd in all_handles:
world.browser.driver.switch_to_window(hwnd)
urls.append(world.browser.url.strip())
world.browser.driver.switch_to_window(old_handle)
return '\n'.join(urls)
def check_url():
return url in get_urls()
try:
world.wait_for_condition(check_url,
timeout=timeout,
interval=1,
dont_fail=dont_fail)
except AssertionError:
urls = get_urls().splitlines()
raise AssertionError('{0} not in {1}'.format(url, urls))
@world.absorb
def wait_for_selector(selector, timeout=5, interval=0.5, dont_fail=False):
"""waits for a given selector within {timeout} seconds, polling
every {interval} seconds.
Note that this decorator has a uncommon behaviour (compared to
most decorators). Since it starts his action as soon as it is
declared.
So this decorator should be used ONLY within tests.
If the given {selector} argument starts with // then it will be
considered a xpath, otherwise will be considered a css selector.
Example with xpath:
@step('Given I am done with the current page')
def given_im_done_with_that_page(step):
@world.wait_for_selector('//a[contains(text(), "Done")]')
def then_click_on(links_containing_done):
links_containing_done.first.click()
@step('Then I am logout')
@world.wait_for_selector('button#logout')
def then_click_on(buttons):
buttons.first.click()
"""
if selector.startswith("//"):
elements_for = lambda x: world.browser.find_by_xpath(x)
else:
elements_for = lambda x: world.browser.find_by_css(x)
def decorate(immediately_call):
found = False
found = world.wait_for_many_elements(
selector, timeout, interval, dont_fail=dont_fail)
if found:
return immediately_call(elements_for(selector))
return decorate
@world.absorb
def wait_for_element_with_text(text="", dont_fail=False, prefix='//*'):
return world.wait_for_selector(
'//*[contains(text(), "%s")]' % text, dont_fail=dont_fail)
@world.absorb
def wait_for_element_with(__prefix='//*', dont_fail=False, **attributes):
selector = " and ".join(
map(
(lambda (k, v): 'contains(@%s, "%s")' % (k, v)),
attributes.items(),
)
)
return world.wait_for_selector(
'%s[%s]' % (__prefix, selector), dont_fail=dont_fail)
@world.absorb
def wait_for_link_with_text(text="", dont_fail=False):
return world.wait_for_element_with_text(
text, prefix='//a', dont_fail=dont_fail)
@world.absorb
def wait_for_input_with_value(value="OK", dont_fail=False):
return world.wait_for_element_with(
value=value, __prefix='//input', dont_fail=dont_fail)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment