Skip to content

Instantly share code, notes, and snippets.

@viglesiasce
Last active November 19, 2018 03:55
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save viglesiasce/6012546 to your computer and use it in GitHub Desktop.
Save viglesiasce/6012546 to your computer and use it in GitHub Desktop.
Monitor Jenkins Free Executors and post to CloudWatch
#!/usr/bin/python
import subprocess
from eucaops import Eucaops
from eucaops import EC2ops
from eutester.eutestcase import EutesterTestCase
from eutester.sshconnection import CommandExitCodeException
import ast
import urllib
class JenkinsCloudwatch(EutesterTestCase):
def __init__(self, extra_args= None):
self.setuptestcase()
self.setup_parser()
self.parser.add_argument('-n', '--namespace', help='Namespace to put data under', default="Jenkins")
self.parser.add_argument('-m', '--metric-name', help='Metric name to put data under', default="Free Executors")
self.parser.add_argument('-u', '--unit', default=None, help="Unit that the value be returned with")
self.parser.add_argument('-i', '--interval', default=10, type=int, help='Time between exectutions of the monitoring task')
if extra_args:
for arg in extra_args:
self.parser.add_argument(arg)
self.get_args()
# Setup basic eutester object
if self.args.region:
self.tester = EC2ops( credpath=self.args.credpath, region=self.args.region)
else:
self.tester = Eucaops( credpath=self.args.credpath, config_file=self.args.config,password=self.args.password)
def clean_method(self):
pass
def MonitorJenkins(self):
while True:
try:
computer_info = ast.literal_eval(urllib.urlopen( "http://localhost:8080/computer/api/python?pretty=true").read())
available = computer_info["totalExecutors"] - computer_info["busyExecutors"]
self.tester.put_metric_data(self.args.namespace, self.args.metric_name, value=available, unit=self.args.unit)
except subprocess.CalledProcessError:
self.tester.critical("Command exited Non-zero not putting data")
except ValueError:
self.tester.critical("Command returned non-integer")
self.tester.sleep(self.args.interval)
if __name__ == "__main__":
testcase = JenkinsCloudwatch()
### Use the list of tests passed from config/command line to determine what subset of tests to run
### or use a predefined list
list = testcase.args.tests or [ "MonitorJenkins"]
### Convert test suite methods to EutesterUnitTest objects
unit_list = [ ]
for test in list:
unit_list.append( testcase.create_testunit_by_name(test) )
### Run the EutesterUnitTest objects
result = testcase.run_test_case_list(unit_list,clean_on_exit=True)
exit(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment