Skip to content

Instantly share code, notes, and snippets.

@anax32
Created October 8, 2020 08:13
Show Gist options
  • Save anax32/501419330e77f72680567c70629d1aa0 to your computer and use it in GitHub Desktop.
Save anax32/501419330e77f72680567c70629d1aa0 to your computer and use it in GitHub Desktop.
Attempt to use the Azure Devops REST API to re-trigger a release.
"""
list and trigger releases from azure devops
API:
https://docs.microsoft.com/en-us/rest/api/azure/devops/?view=azure-devops-rest-6.0
https://docs.microsoft.com/en-us/rest/api/azure/devops/release/releases/create?view=azure-devops-rest-6.0
pypi:
https://github.com/microsoft/azure-devops-python-api
serialization package:
https://azuresdkdocs.blob.core.windows.net/$web/python/azure-appconfiguration/1.0.0b5/_modules/msrest/serialization.html
"""
import os
import sys
import logging
import json
from azure.devops.connection import Connection
from msrest.authentication import BasicAuthentication
logger = logging.getLogger()
logger.setLevel(os.getenv("LOGLEVEL", "WARN").upper())
handler = logging.StreamHandler(sys.stdout)
#formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
#handler.setFormatter(formatter)
logger.addHandler(handler)
PROJECT_NAME = os.getenv("PROJECT_NAME", "default")
def list_projects(connection):
"""
list the projects to which we have access
"""
core_client = connection.clients.get_core_client()
# Get the first page of projects
get_projects_response = core_client.get_projects()
index = 0
while get_projects_response is not None:
for project in get_projects_response.value:
print("project[%03i]: '%s'" % (index, project.name))
index += 1
if get_projects_response.continuation_token is not None and get_projects_response.continuation_token != "":
# Get the next page of projects
get_projects_response = core_client.get_projects(continuation_token=get_projects_response.continuation_token)
else:
# All projects have been retrieved
get_projects_response = None
def list_releases(connection):
"""
list the releases
"""
# Get a client (the "core" client provides access to projects, teams, etc)
release_client = connection.clients.get_release_client()
R = release_client.get_releases()
Rj = [x.as_dict() for x in R.value]
print(json.dumps(Rj, indent=2))
def repeat_last_release(connection, service_name, environments):
"""
get a the last successful release,
get the environment,
update the environment status variable to 'in progress'
profit
"""
# Get a client (the "core" client provides access to projects, teams, etc)
release_client = connection.clients_v6_0.get_release_client()
R = release_client.get_release_definitions(PROJECT_NAME, search_text=service_name)
assert R is not None
assert len(R) == 1
definition_id = R[0].id # assume first release-definition is our one
print("'%s' definition id: %03i" % (service_name, definition_id))
# logger.debug(json.dumps([x.as_dict() for x in R]))
print("DEFINITIONS---")
print(json.dumps(R[0].as_dict(), indent=2))
print("---")
# get the list of releases for this release definition
R = release_client.get_releases(
project=PROJECT_NAME,
definition_id = definition_id,
status_filter="active"
)
assert R is not None
print("RELEASES---")
print(json.dumps(R[0].as_dict(), indent=2))
print("---")
Renvs = {k: x for x in R[0].environments if x.name == k for k in environments}
print("release environments:'%s'" % json.dumps(Renvs, indent=2))
# get the release object
R = release_client.get_release_environment(
project=PROJECT_NAME,
release_id=release_id,
environment_id=release_environment_id
)
assert R is not None
print("count: %i" % len(R))
print("ENVIRONMENTS---")
print(json.dumps(R[0].as_dict(), indent=2))
print("---")
# PATCH
print("patch")
url = "https://vsrm.dev.azure.com/blaise-gcp/%s/_apis/Release/releases/%i/environments/%i?api-version=6.0" % (PROJECT_NAME, release_id, release_environment_id)
print("url")
if __name__ == "__main__":
# Fill in with your personal access token and org URL
personal_access_token = os.environ["AZURE_PAT"]
organization_url = os.environ["AZURE_DEVOPS_ORG_URL"]
# Create a connection to the org
credentials = BasicAuthentication('', personal_access_token)
connection = Connection(base_url=organization_url, creds=credentials)
# list some stuff
list_projects(connection)
repeat_last_release(
connection,
service_name="BlaiseCaseHandler",
environments=["dev"]
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment