Created
March 5, 2015 22:05
-
-
Save gabrielfalcao/fc492b93a4bd1aa7dddb to your computer and use it in GitHub Desktop.
A file with useful wait functions for lettuce tests
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import re | |
import time | |
import threading | |
from functools import wraps | |
from lxml import html | |
from lxml.etree import XMLSyntaxError | |
from lettuce import world, before, after | |
from selenium.common.exceptions import WebDriverException | |
from urlparse import urlsplit | |
def parse_dom(step): | |
source = world.get_current_page_source() | |
try: | |
world.dom = html.fromstring(source) | |
except (ValueError, XMLSyntaxError): | |
pass | |
after.each_step(parse_dom) | |
before.each_step(parse_dom) | |
def non_blocking(func): | |
@wraps(func) | |
def wrapper(*args, **kw): | |
thread = threading.Thread( | |
target=func, | |
args=args, | |
kwargs=kw, | |
) | |
thread.start() | |
return thread | |
return wrapper | |
@world.absorb | |
def wait_for_condition(finished, timeout=5, interval=0.5, dont_fail=False): | |
"""polls a callable until it returns True. | |
arguments: | |
finished - a callable that takes no parameters | |
timeout - the timeout in seconds, [defaults to five seconds] | |
interval - the interval in seconds, [defaults to half second] | |
""" | |
started = time.time() | |
doc = finished.__doc__ | |
if doc: | |
msg = 'timed out waiting for the "{0}"'.format(doc) | |
else: | |
msg = 'timed out waiting for the condition {0}'.format(finished) | |
while not finished(): | |
time.sleep(interval) | |
now = time.time() | |
if (now - started) >= timeout: | |
if dont_fail: | |
break | |
else: | |
raise AssertionError(msg) | |
@world.absorb | |
def wait_for_element(selector, timeout=5, interval=0.5, dont_fail=False): | |
found = world.wait_for_many_elements( | |
selector, timeout, interval, dont_fail=dont_fail) | |
err = 'could not find an element through the selector: {0}'. \ | |
format(selector) | |
if found is not None: | |
return found[0] | |
elif dont_fail: | |
return | |
else: | |
raise AssertionError(err) | |
@world.absorb | |
def wait_until_visible(selector, timeout=5, interval=0.5): | |
world.wait_for_many_elements( | |
selector, timeout, interval) | |
def element_become_visible(): | |
sel = 'jQuery("{0}:visible").length'.format(selector) | |
return int(world.browser.evaluate_script(sel)) > 0 | |
world.wait_for_condition(element_become_visible) | |
@world.absorb | |
def wait_until_hidden(selector, timeout=5, interval=0.5): | |
world.wait_for_many_elements( | |
selector, timeout, interval) | |
def element_become_hidden(): | |
sel = 'jQuery("{0}:visible").length'.format(selector) | |
return int(world.browser.evaluate_script(sel)) == 0 | |
world.wait_for_condition(element_become_hidden) | |
@world.absorb | |
def wait_for_many_elements(selector, timeout=5, interval=0.5, | |
for_at_least=1, dont_fail=False): | |
if selector.startswith("//"): | |
find_function = lambda x: world.dom.xpath(x) | |
selector_type = 'xpath' | |
else: | |
find_function = lambda x: world.dom.cssselect(x) | |
selector_type = 'css' | |
def element_is_there(): | |
raw = world.get_current_page_source().strip() | |
if not raw: | |
return False | |
if raw.startswith('<?xml'): | |
raw = unicode(re.sub(r'[<][?]xml[^>]+[>]', '', raw)) | |
try: | |
world.dom = html.fromstring(raw) | |
except WebDriverException: | |
return False | |
return find_function(selector) | |
element_is_there.__doc__ = \ | |
"element %s to be in the DOM (not necessarily visible)" % selector | |
world.wait_for_condition( | |
element_is_there, | |
timeout=timeout, | |
interval=interval, | |
dont_fail=dont_fail, | |
) | |
found = find_function(selector) | |
if not dont_fail: | |
assert len(found) >= for_at_least, \ | |
'could not find {2} occurrence(s) of the {3} selector "{0}" not ' \ | |
'found within {1} seconds'.format( | |
selector, timeout, for_at_least, selector_type) | |
elif not found: | |
return None | |
if selector_type == 'xpath': | |
return list(world.browser.find_by_xpath(selector)) | |
return list(world.browser.find_by_css(selector)) | |
@world.absorb | |
def wait_for_element_to_disappear( | |
selector, timeout=5, interval=0.5, for_at_least=1, dont_fail=False): | |
if selector.startswith("//"): | |
find_function = lambda x: world.dom.xpath(x) | |
else: | |
find_function = lambda x: world.dom.cssselect(x) | |
def element_is_there(): | |
raw = world.get_current_page_source().strip() | |
if not raw: | |
return False | |
if raw.startswith('<?xml'): | |
raw = unicode(re.sub(r'[<][?]xml[^>]+[>]', '', raw)) | |
try: | |
world.dom = html.fromstring(raw) | |
except WebDriverException: | |
return False | |
return not find_function(selector) | |
element_is_there.__doc__ = \ | |
"element %s to vanish from the DOM" % selector | |
world.wait_for_condition( | |
element_is_there, | |
timeout=timeout, | |
interval=interval, | |
dont_fail=dont_fail, | |
) | |
not_found = find_function(selector) | |
return not_found | |
@world.absorb | |
def wait_until_reach_url_containing(url, timeout=10, dont_fail=False): | |
err = 'did not reach the url containing "{0}" within {1} seconds. ' \ | |
'The browser still at "{2}"' | |
try: | |
def reach_url(): | |
return world.browser.url and url in world.browser.url | |
world.wait_for_condition( | |
reach_url, timeout=timeout, dont_fail=dont_fail) | |
except AssertionError: | |
raise AssertionError(err.format(url, timeout, world.browser.url)) | |
return world.browser.url | |
@world.absorb | |
def wait_until_reach_url(url, timeout=10, dont_fail=False): | |
err = 'did not reach the url "{0}" within {1} seconds. ' \ | |
'The browser still at "{2}"' | |
if not url.startswith('/'): | |
url = "/%s" % url | |
try: | |
world.browser.driver.switch_to_default_content() | |
world.wait_for_condition( | |
lambda: url == urlsplit(world.browser.url).path, | |
timeout=timeout, | |
dont_fail=dont_fail, | |
) | |
except AssertionError: | |
current = urlsplit(world.browser.url) | |
raise AssertionError(err.format(url, timeout, current.path)) | |
return world.browser.url | |
@world.absorb | |
def wait_until_any_frame_url_contains(url, timeout=10, dont_fail=False): | |
def get_urls(): | |
old_handle = world.browser.driver.get_current_window_handle() | |
all_handles = world.browser.driver.get_window_handles() | |
urls = [] | |
for hwnd in all_handles: | |
world.browser.driver.switch_to_window(hwnd) | |
urls.append(world.browser.url.strip()) | |
world.browser.driver.switch_to_window(old_handle) | |
return '\n'.join(urls) | |
def check_url(): | |
return url in get_urls() | |
try: | |
world.wait_for_condition(check_url, | |
timeout=timeout, | |
interval=1, | |
dont_fail=dont_fail) | |
except AssertionError: | |
urls = get_urls().splitlines() | |
raise AssertionError('{0} not in {1}'.format(url, urls)) | |
@world.absorb | |
def wait_for_selector(selector, timeout=5, interval=0.5, dont_fail=False): | |
"""waits for a given selector within {timeout} seconds, polling | |
every {interval} seconds. | |
Note that this decorator has a uncommon behaviour (compared to | |
most decorators). Since it starts his action as soon as it is | |
declared. | |
So this decorator should be used ONLY within tests. | |
If the given {selector} argument starts with // then it will be | |
considered a xpath, otherwise will be considered a css selector. | |
Example with xpath: | |
@step('Given I am done with the current page') | |
def given_im_done_with_that_page(step): | |
@world.wait_for_selector('//a[contains(text(), "Done")]') | |
def then_click_on(links_containing_done): | |
links_containing_done.first.click() | |
@step('Then I am logout') | |
@world.wait_for_selector('button#logout') | |
def then_click_on(buttons): | |
buttons.first.click() | |
""" | |
if selector.startswith("//"): | |
elements_for = lambda x: world.browser.find_by_xpath(x) | |
else: | |
elements_for = lambda x: world.browser.find_by_css(x) | |
def decorate(immediately_call): | |
found = False | |
found = world.wait_for_many_elements( | |
selector, timeout, interval, dont_fail=dont_fail) | |
if found: | |
return immediately_call(elements_for(selector)) | |
return decorate | |
@world.absorb | |
def wait_for_element_with_text(text="", dont_fail=False, prefix='//*'): | |
return world.wait_for_selector( | |
'//*[contains(text(), "%s")]' % text, dont_fail=dont_fail) | |
@world.absorb | |
def wait_for_element_with(__prefix='//*', dont_fail=False, **attributes): | |
selector = " and ".join( | |
map( | |
(lambda (k, v): 'contains(@%s, "%s")' % (k, v)), | |
attributes.items(), | |
) | |
) | |
return world.wait_for_selector( | |
'%s[%s]' % (__prefix, selector), dont_fail=dont_fail) | |
@world.absorb | |
def wait_for_link_with_text(text="", dont_fail=False): | |
return world.wait_for_element_with_text( | |
text, prefix='//a', dont_fail=dont_fail) | |
@world.absorb | |
def wait_for_input_with_value(value="OK", dont_fail=False): | |
return world.wait_for_element_with( | |
value=value, __prefix='//input', dont_fail=dont_fail) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment