Skip to content

Instantly share code, notes, and snippets.

@YannMjl
Created December 9, 2019 21:40
Show Gist options
  • Save YannMjl/e9fb9e1d5b7f5e8c09d8fc9d3b10b1e5 to your computer and use it in GitHub Desktop.
Save YannMjl/e9fb9e1d5b7f5e8c09d8fc9d3b10b1e5 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import sys
import json
import time
import codecs
import os.path
import datetime
import requests
import urllib.parse
from collections import OrderedDict
from requests.auth import HTTPBasicAuth
assert sys.version_info >= (3, 0)
if sys.version_info < (3, 0, 0):
print('Pyhton version', sys.version)
sys.stderr.write("You need python 3 or later to run this script\n")
exit(1)
#-----------------------------------------------------------------------------#
# Global variables: Azure AD credentials, List of service #
#-----------------------------------------------------------------------------#
tenantID = "yourTenantID"
clientID = "yourClientID"
resource = "https://manage.office.com"
#Nagios staus codes
NAGIOS_OK = 0 #
NAGIOS_WARNING = 1
NAGIOS_CRITICAL = 2
NAGIOS_UNKNOWN = 3
#-----------------------------------------------------------------------------#
# Request an access token #
#-----------------------------------------------------------------------------#
def getToken(clientSecret):
azureAD_endpoint = "https://login.microsoftonline.com/" + tenantID + "/oauth2/token"
post_request_body = {
'grant_type': 'client_credentials',
'client_id': clientID,
'client_secret': clientSecret,
'resource': resource
}
post_request = requests.post(azureAD_endpoint, post_request_body)
responsedata = json.loads(post_request.text)
# get token value
jsondata = post_request.json()
access_token = jsondata['access_token']
return access_token
#-----------------------------------------------------------------------------#
# Base Service Communications URl #
#-----------------------------------------------------------------------------#
baseuri = "https://manage.office.com/api/v1.0/" + tenantID
# URL Paths
# get services
services = '/ServiceComms/Services'
# get current Status
current_status = '/ServiceComms/CurrentStatus'
# get historical Status
historical_status = '/ServiceComms/HistoricalStatus'
# get messages
messages = '/ServiceComms/Messages'
# check subscription
chceck_subscription = '/activity/feed/subscriptions/list'
# Calculate the offset taking into account daylight saving time
utc_offset_sec = time.altzone if time.localtime().tm_isdst else time.timezone
utc_offset = datetime.timedelta(seconds=-utc_offset_sec)
#-----------------------------------------------------------------------------#
# get services information : Names, Status, Messages #
#-----------------------------------------------------------------------------#
def getServicesInformation(clientSecret):
token = getToken(clientSecret)
myheader = {
'accept': 'application/json; odata.metadata=full',
'Authorization': "Bearer " + token
}
response = requests.get(baseuri + services, headers=myheader)
responsedata = json.loads(response.text)
jsonresponse = response.json()
return jsonresponse
def getMessages(clientSecret):
token = getToken(clientSecret)
myheader = {
'accept': 'application/json; odata.metadata=full',
'Authorization': "Bearer " + token
}
response = requests.get(baseuri + messages, headers=myheader)
responsedata = json.loads(response.text)
jsonresponse = response.json()
return jsonresponse
def getCurentStatus(clientSecret):
token = getToken(clientSecret)
myheader = {
'accept': 'application/json; odata.metadata=full',
'Authorization': "Bearer " + token
}
response = requests.get(baseuri + current_status, headers=myheader)
responsedata = json.loads(response.text)
jsonresponse = response.json()
return jsonresponse
def getHistoricalStatus(clientSecret):
token = getToken(clientSecret)
myheader = {
'accept': 'application/json; odata.metadata=full',
'Authorization': "Bearer " + token
}
response = requests.get(baseuri + historical_status, headers=myheader)
responsedata = json.loads(response.text)
jsonresponse = response.json()
return jsonresponse
def getServiceNames(clientSecret):
serviceInfos = getServicesInformation(clientSecret)
# list of services
service_names = []
for service in serviceInfos['value']:
serviceName = service['DisplayName']
serviceId = service['Id']
serviceFeatures = service['Features']
service_names.append(serviceName)
# print('List of service Features')
# for featureName in serviceFeatures:
# service_feature_name = featureName['Name']
# service_feature_display_name = featureName['DisplayName']
# print(service_feature_display_name)
# print(*service_names, sep='\n \n')
return service_names
def getServiceMessage(clientSecret, service_name):
service_messages = getMessages(clientSecret)
list_of_features = []
latest_message = "No Message at this time"
for service in service_messages['value']:
serviceId = service['Id']
Messages = service['Messages']
servicestatus = service['Status']
LastUpdatedTime = service['LastUpdatedTime']
FeatureDisplayName = service['FeatureDisplayName']
WorkloadDisplayName = service['WorkloadDisplayName']
list_of_messages = []
if (WorkloadDisplayName == service_name):
# print('\n')
# print("----------------------------- Start: Message -----------------------------")
# print('Message ID', serviceId, sep=' : ')
# print('Status', servicestatus, sep=' : ')
# print('Service Name', WorkloadDisplayName, sep=' : ')
# print('Service Feature Name', FeatureDisplayName, sep=' : ')
# print('Last Updated Time', LastUpdatedTime, sep=' : ')
list_of_features.append(FeatureDisplayName)
for message in Messages:
MessageText = message['MessageText']
list_of_messages.append(MessageText)
if not list_of_messages:
latest_message = "No recent event message"
else:
latest_message = list_of_messages[-1]
return latest_message
def getwarningMessage(clientSecret, service_name):
WarningMessage = getServiceMessage(clientSecret, service_name)
WarningMessage_str = str(WarningMessage)
# Parse message
latest_warning_message = WarningMessage_str.split('\n''\n')
message = latest_warning_message[0]
return message
def printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName):
print("------------------------------------------------------")
print('Service ID', serviceId, sep=' : ')
print('Service Name', WorkloadDisplayName, sep=' : ')
print('Last checked', status_date, sep=' : ')
print('Current Service State', StatusDisplayName, sep=' : ')
print("------------------------------------------------------")
def getServiceCurrentStatus(clientSecret, service_name):
serviceState = getCurentStatus(clientSecret)
for service in serviceState['value']:
serviceId = service['Id']
WorkloadDisplayName = service['WorkloadDisplayName']
servicestatus = service['Status']
StatusDisplayName = service['StatusDisplayName']
StatusTime = service['StatusTime']
# Parse date
# trim off last 3 digits from the status time string to be able
# to format it without time zone
statusTime_convert_to_date = datetime.datetime.strptime(StatusTime[:-4], "%Y-%m-%dT%H:%M:%S.%f")
# .replace(tzinfo=datetime.timezone(offset=utc_offset)).isoformat()
status_date = statusTime_convert_to_date.strftime("%A, %B %d, %Y at %I:%M %p")
if (WorkloadDisplayName == service_name):
if servicestatus in ('ServiceRestored', 'ServiceOperational'):
print("OK - Healty", StatusDisplayName, sep = ' : ')
printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName)
print('There is no issue; the service works as expected')
sys.exit(NAGIOS_OK)
elif servicestatus in ('ServiceInterruption'):
print(
"CRITICAL!", StatusDisplayName +
' : An issue affects the ability for users to access the service', sep=' : '
)
printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName)
print("an issue affects the ability for users to access the service" + '\n'
+ "the issue is significant and can be reproduced consistently")
# print message
getServiceMessage(clientSecret, WorkloadDisplayName)
sys.exit(NAGIOS_CRITICAL)
else:
if servicestatus in ('Investigating', 'VerifyingService'):
print(
"WARNING", StatusDisplayName + ' -- ' +
getwarningMessage(clientSecret, WorkloadDisplayName), sep=' : '
)
printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName)
print("We're aware of a potential issue and are gathering more information" + '\n'
+ " about what's going on and the scope of impact.")
elif servicestatus in ('ServiceDegradation'):
print(
"WARNING", StatusDisplayName + ' -- ' +
getwarningMessage(clientSecret, WorkloadDisplayName), sep=' : '
)
printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName)
print("We've confirmed that there is an issue that may affect" + '\n'
+ "use of a service or feature")
elif servicestatus in ('RestoringService'):
print(
"WARNING", StatusDisplayName + ' -- ' +
getwarningMessage(clientSecret, WorkloadDisplayName), sep=' : '
)
printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName)
print("The cause of the issue has been identified, we know" + '\n'
+ "what corrective action to take and are in the process" + '\n'
+ "of bringing the service back to a healthy state.")
elif servicestatus in ('ExtendedRecovery'):
print(
"WARNING", StatusDisplayName + ' -- ' +
getwarningMessage(clientSecret, WorkloadDisplayName), sep=' : '
)
printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName)
print("A corrective action is in progress to restore service to most" + '\n'
+ "users but will take some time to reach all the affected systems")
elif servicestatus in ('PIRPublished'):
print(
"WARNING", StatusDisplayName + ' -- ' +
getwarningMessage(clientSecret, WorkloadDisplayName), sep=' : '
)
printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName)
print("We’ve published a Post Incident Report for a specific" + '\n'
+ "issue that includes root cause information and next" + '\n'
+ "steps to ensure a similar issue doesn’t reoccur")
else:
print(
servicestatus, StatusDisplayName + ' -- ' +
getwarningMessage(clientSecret, WorkloadDisplayName), sep=' : '
)
printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName)
print("------------------------------------------------------")
print('List of service Features and states:')
FeatureStatus = service['FeatureStatus']
for feature in FeatureStatus:
FeatureDisplayName = feature['FeatureDisplayName']
FeatureServiceStatusDisplayName = feature['FeatureServiceStatusDisplayName']
print(FeatureDisplayName,
FeatureServiceStatusDisplayName, sep=' --|-- ')
# print message
print(getServiceMessage(clientSecret, WorkloadDisplayName))
sys.exit(NAGIOS_WARNING)
def main(arg):
if not (len(arg) == 2):
print("UNKNOWN - Wrong Usage: please specify the client secret and service name")
print("e.g: o365_check.py \"Client Secret\" \"Exchange Online\"")
sys.exit(NAGIOS_UNKNOWN)
else:
clientSecret = arg[0]
serviceName = arg[1]
# get list of service names
service_name_list = getServiceNames(clientSecret)
# check if service name entered exist in service name list
if (serviceName in service_name_list):
getServiceCurrentStatus(clientSecret, serviceName)
else:
print('The service name you entered is incorrect')
print('Here is the list of service Names:')
for service_name in service_name_list:
print(service_name)
if __name__ == '__main__':
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment