Last active
March 19, 2016 19:18
-
-
Save xlcommunity/a289a5a766fde13c1614 to your computer and use it in GitHub Desktop.
Code for "Automatically handle failures in XL Release tasks" (https://support.xebialabs.com/hc/en-us/community/posts/204769703)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS | |
# FOR A PARTICULAR PURPOSE. THIS CODE AND INFORMATION ARE NOT SUPPORTED BY XEBIALABS. | |
# | |
import sys, string | |
from java.io import IOException | |
from com.xebialabs.xlrelease.plugin.webhook import JsonPathResult | |
from com.xebialabs.xlrelease.plugin.webhook import XmlPathResult | |
from urlparse import urlparse | |
def isJson(): | |
return 'jsonPathExpression' in globals() or 'jsonPathExpression2' in globals() or 'jsonPathExpression3' in globals() | |
def isXml(): | |
return 'xPathExpression' in globals() or 'xPathExpression2' in globals() or 'xPathExpression3' in globals() | |
def process(response, expression, ExpressionProcessor): | |
result_expected = expression is not None and len(expression) > 0 | |
if result_expected: | |
resultVariable = ExpressionProcessor(response, expression).get() | |
if resultVariable is None: | |
print "Expression %s did not match anything in the response" % expression | |
sys.exit(1) | |
print "Result: %s" % resultVariable | |
return resultVariable | |
def process_json(response, jsonPathExpression): | |
return process(response, jsonPathExpression, JsonPathResult) | |
def process_xml(response, xPathExpression): | |
return process(response, xPathExpression, XmlPathResult) | |
if isJson(): | |
content_type = 'application/json' | |
elif isXml(): | |
content_type = 'application/xml' | |
else: | |
print 'Could not determine Webhook format, neither JsonPath nor XPath expression was provided' | |
sys.exit(1) | |
uri = urlparse(URL) | |
host = '%s://%s' % (uri.scheme, uri.netloc) | |
context = uri.path | |
if uri.query: | |
context = '%s?%s' % (context, uri.query) | |
server = { 'url': host, 'username': username, 'password': password, 'proxyHost': proxyHost, 'proxyPort': proxyPort} | |
request = HttpRequest(server, username, password) | |
if body is None: | |
body = "" | |
# don't throw an exception if there is a connection error | |
try: | |
response = request.doRequest(method = method, context = context, body = body, contentType = content_type) | |
except IOException, e: | |
if alwaysSucceed: | |
statusCode = 418 # I'm a teapot | |
sys.exit(0) | |
else: | |
raise e | |
if response.isSuccessful(): | |
print "Http Status: %s" % response.status | |
statusCode = response.status | |
if isJson(): | |
result = process_json(response.response, jsonPathExpression) | |
result2 = process_json(response.response, jsonPathExpression2) | |
result3 = process_json(response.response, jsonPathExpression3) | |
if isXml(): | |
result = process_xml(response.response, xPathExpression) | |
result2 = process_xml(response.response, xPathExpression2) | |
result3 = process_xml(response.response, xPathExpression3) | |
else: | |
print "Failed to connect at %s." % URL | |
response.errorDump() | |
if alwaysSucceed: | |
if response.status: | |
statusCode = response.status | |
else: | |
statusCode = 418 # I'm a teapot | |
else: | |
sys.exit(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!-- | |
THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS | |
FOR A PARTICULAR PURPOSE. THIS CODE AND INFORMATION ARE NOT SUPPORTED BY XEBIALABS. | |
--> | |
<type type="acme.JsonWebhook" extends="webhook.JsonWebhook"> | |
<!-- override the default webhook implementation --> | |
<property name="scriptLocation" hidden="true" required="true" default="acme/JsonWebhook.py" /> | |
<property name="statusCode" kind="integer" category="output" required="false" | |
description="The HTTP status code of the response" /> | |
<property name="alwaysSucceed" kind="boolean" category="input" | |
required="false" default="false" description="If checked, this task will succeed irrespective of the HTTP response code. The 'statusCode' output property can be checked by subsequent tasks to determine whether the call was actually successful." /> | |
</type> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment