Created
December 9, 2019 21:40
-
-
Save YannMjl/e9fb9e1d5b7f5e8c09d8fc9d3b10b1e5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import json | |
import time | |
import codecs | |
import os.path | |
import datetime | |
import requests | |
import urllib.parse | |
from collections import OrderedDict | |
from requests.auth import HTTPBasicAuth | |
assert sys.version_info >= (3, 0) | |
if sys.version_info < (3, 0, 0): | |
print('Pyhton version', sys.version) | |
sys.stderr.write("You need python 3 or later to run this script\n") | |
exit(1) | |
#-----------------------------------------------------------------------------# | |
# Global variables: Azure AD credentials, List of service # | |
#-----------------------------------------------------------------------------# | |
tenantID = "yourTenantID" | |
clientID = "yourClientID" | |
resource = "https://manage.office.com" | |
#Nagios staus codes | |
NAGIOS_OK = 0 # | |
NAGIOS_WARNING = 1 | |
NAGIOS_CRITICAL = 2 | |
NAGIOS_UNKNOWN = 3 | |
#-----------------------------------------------------------------------------# | |
# Request an access token # | |
#-----------------------------------------------------------------------------# | |
def getToken(clientSecret): | |
azureAD_endpoint = "https://login.microsoftonline.com/" + tenantID + "/oauth2/token" | |
post_request_body = { | |
'grant_type': 'client_credentials', | |
'client_id': clientID, | |
'client_secret': clientSecret, | |
'resource': resource | |
} | |
post_request = requests.post(azureAD_endpoint, post_request_body) | |
responsedata = json.loads(post_request.text) | |
# get token value | |
jsondata = post_request.json() | |
access_token = jsondata['access_token'] | |
return access_token | |
#-----------------------------------------------------------------------------# | |
# Base Service Communications URl # | |
#-----------------------------------------------------------------------------# | |
baseuri = "https://manage.office.com/api/v1.0/" + tenantID | |
# URL Paths | |
# get services | |
services = '/ServiceComms/Services' | |
# get current Status | |
current_status = '/ServiceComms/CurrentStatus' | |
# get historical Status | |
historical_status = '/ServiceComms/HistoricalStatus' | |
# get messages | |
messages = '/ServiceComms/Messages' | |
# check subscription | |
chceck_subscription = '/activity/feed/subscriptions/list' | |
# Calculate the offset taking into account daylight saving time | |
utc_offset_sec = time.altzone if time.localtime().tm_isdst else time.timezone | |
utc_offset = datetime.timedelta(seconds=-utc_offset_sec) | |
#-----------------------------------------------------------------------------# | |
# get services information : Names, Status, Messages # | |
#-----------------------------------------------------------------------------# | |
def getServicesInformation(clientSecret): | |
token = getToken(clientSecret) | |
myheader = { | |
'accept': 'application/json; odata.metadata=full', | |
'Authorization': "Bearer " + token | |
} | |
response = requests.get(baseuri + services, headers=myheader) | |
responsedata = json.loads(response.text) | |
jsonresponse = response.json() | |
return jsonresponse | |
def getMessages(clientSecret): | |
token = getToken(clientSecret) | |
myheader = { | |
'accept': 'application/json; odata.metadata=full', | |
'Authorization': "Bearer " + token | |
} | |
response = requests.get(baseuri + messages, headers=myheader) | |
responsedata = json.loads(response.text) | |
jsonresponse = response.json() | |
return jsonresponse | |
def getCurentStatus(clientSecret): | |
token = getToken(clientSecret) | |
myheader = { | |
'accept': 'application/json; odata.metadata=full', | |
'Authorization': "Bearer " + token | |
} | |
response = requests.get(baseuri + current_status, headers=myheader) | |
responsedata = json.loads(response.text) | |
jsonresponse = response.json() | |
return jsonresponse | |
def getHistoricalStatus(clientSecret): | |
token = getToken(clientSecret) | |
myheader = { | |
'accept': 'application/json; odata.metadata=full', | |
'Authorization': "Bearer " + token | |
} | |
response = requests.get(baseuri + historical_status, headers=myheader) | |
responsedata = json.loads(response.text) | |
jsonresponse = response.json() | |
return jsonresponse | |
def getServiceNames(clientSecret): | |
serviceInfos = getServicesInformation(clientSecret) | |
# list of services | |
service_names = [] | |
for service in serviceInfos['value']: | |
serviceName = service['DisplayName'] | |
serviceId = service['Id'] | |
serviceFeatures = service['Features'] | |
service_names.append(serviceName) | |
# print('List of service Features') | |
# for featureName in serviceFeatures: | |
# service_feature_name = featureName['Name'] | |
# service_feature_display_name = featureName['DisplayName'] | |
# print(service_feature_display_name) | |
# print(*service_names, sep='\n \n') | |
return service_names | |
def getServiceMessage(clientSecret, service_name): | |
service_messages = getMessages(clientSecret) | |
list_of_features = [] | |
latest_message = "No Message at this time" | |
for service in service_messages['value']: | |
serviceId = service['Id'] | |
Messages = service['Messages'] | |
servicestatus = service['Status'] | |
LastUpdatedTime = service['LastUpdatedTime'] | |
FeatureDisplayName = service['FeatureDisplayName'] | |
WorkloadDisplayName = service['WorkloadDisplayName'] | |
list_of_messages = [] | |
if (WorkloadDisplayName == service_name): | |
# print('\n') | |
# print("----------------------------- Start: Message -----------------------------") | |
# print('Message ID', serviceId, sep=' : ') | |
# print('Status', servicestatus, sep=' : ') | |
# print('Service Name', WorkloadDisplayName, sep=' : ') | |
# print('Service Feature Name', FeatureDisplayName, sep=' : ') | |
# print('Last Updated Time', LastUpdatedTime, sep=' : ') | |
list_of_features.append(FeatureDisplayName) | |
for message in Messages: | |
MessageText = message['MessageText'] | |
list_of_messages.append(MessageText) | |
if not list_of_messages: | |
latest_message = "No recent event message" | |
else: | |
latest_message = list_of_messages[-1] | |
return latest_message | |
def getwarningMessage(clientSecret, service_name): | |
WarningMessage = getServiceMessage(clientSecret, service_name) | |
WarningMessage_str = str(WarningMessage) | |
# Parse message | |
latest_warning_message = WarningMessage_str.split('\n''\n') | |
message = latest_warning_message[0] | |
return message | |
def printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName): | |
print("------------------------------------------------------") | |
print('Service ID', serviceId, sep=' : ') | |
print('Service Name', WorkloadDisplayName, sep=' : ') | |
print('Last checked', status_date, sep=' : ') | |
print('Current Service State', StatusDisplayName, sep=' : ') | |
print("------------------------------------------------------") | |
def getServiceCurrentStatus(clientSecret, service_name): | |
serviceState = getCurentStatus(clientSecret) | |
for service in serviceState['value']: | |
serviceId = service['Id'] | |
WorkloadDisplayName = service['WorkloadDisplayName'] | |
servicestatus = service['Status'] | |
StatusDisplayName = service['StatusDisplayName'] | |
StatusTime = service['StatusTime'] | |
# Parse date | |
# trim off last 3 digits from the status time string to be able | |
# to format it without time zone | |
statusTime_convert_to_date = datetime.datetime.strptime(StatusTime[:-4], "%Y-%m-%dT%H:%M:%S.%f") | |
# .replace(tzinfo=datetime.timezone(offset=utc_offset)).isoformat() | |
status_date = statusTime_convert_to_date.strftime("%A, %B %d, %Y at %I:%M %p") | |
if (WorkloadDisplayName == service_name): | |
if servicestatus in ('ServiceRestored', 'ServiceOperational'): | |
print("OK - Healty", StatusDisplayName, sep = ' : ') | |
printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName) | |
print('There is no issue; the service works as expected') | |
sys.exit(NAGIOS_OK) | |
elif servicestatus in ('ServiceInterruption'): | |
print( | |
"CRITICAL!", StatusDisplayName + | |
' : An issue affects the ability for users to access the service', sep=' : ' | |
) | |
printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName) | |
print("an issue affects the ability for users to access the service" + '\n' | |
+ "the issue is significant and can be reproduced consistently") | |
# print message | |
getServiceMessage(clientSecret, WorkloadDisplayName) | |
sys.exit(NAGIOS_CRITICAL) | |
else: | |
if servicestatus in ('Investigating', 'VerifyingService'): | |
print( | |
"WARNING", StatusDisplayName + ' -- ' + | |
getwarningMessage(clientSecret, WorkloadDisplayName), sep=' : ' | |
) | |
printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName) | |
print("We're aware of a potential issue and are gathering more information" + '\n' | |
+ " about what's going on and the scope of impact.") | |
elif servicestatus in ('ServiceDegradation'): | |
print( | |
"WARNING", StatusDisplayName + ' -- ' + | |
getwarningMessage(clientSecret, WorkloadDisplayName), sep=' : ' | |
) | |
printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName) | |
print("We've confirmed that there is an issue that may affect" + '\n' | |
+ "use of a service or feature") | |
elif servicestatus in ('RestoringService'): | |
print( | |
"WARNING", StatusDisplayName + ' -- ' + | |
getwarningMessage(clientSecret, WorkloadDisplayName), sep=' : ' | |
) | |
printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName) | |
print("The cause of the issue has been identified, we know" + '\n' | |
+ "what corrective action to take and are in the process" + '\n' | |
+ "of bringing the service back to a healthy state.") | |
elif servicestatus in ('ExtendedRecovery'): | |
print( | |
"WARNING", StatusDisplayName + ' -- ' + | |
getwarningMessage(clientSecret, WorkloadDisplayName), sep=' : ' | |
) | |
printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName) | |
print("A corrective action is in progress to restore service to most" + '\n' | |
+ "users but will take some time to reach all the affected systems") | |
elif servicestatus in ('PIRPublished'): | |
print( | |
"WARNING", StatusDisplayName + ' -- ' + | |
getwarningMessage(clientSecret, WorkloadDisplayName), sep=' : ' | |
) | |
printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName) | |
print("We’ve published a Post Incident Report for a specific" + '\n' | |
+ "issue that includes root cause information and next" + '\n' | |
+ "steps to ensure a similar issue doesn’t reoccur") | |
else: | |
print( | |
servicestatus, StatusDisplayName + ' -- ' + | |
getwarningMessage(clientSecret, WorkloadDisplayName), sep=' : ' | |
) | |
printServicedetails(serviceId, WorkloadDisplayName, status_date, StatusDisplayName) | |
print("------------------------------------------------------") | |
print('List of service Features and states:') | |
FeatureStatus = service['FeatureStatus'] | |
for feature in FeatureStatus: | |
FeatureDisplayName = feature['FeatureDisplayName'] | |
FeatureServiceStatusDisplayName = feature['FeatureServiceStatusDisplayName'] | |
print(FeatureDisplayName, | |
FeatureServiceStatusDisplayName, sep=' --|-- ') | |
# print message | |
print(getServiceMessage(clientSecret, WorkloadDisplayName)) | |
sys.exit(NAGIOS_WARNING) | |
def main(arg): | |
if not (len(arg) == 2): | |
print("UNKNOWN - Wrong Usage: please specify the client secret and service name") | |
print("e.g: o365_check.py \"Client Secret\" \"Exchange Online\"") | |
sys.exit(NAGIOS_UNKNOWN) | |
else: | |
clientSecret = arg[0] | |
serviceName = arg[1] | |
# get list of service names | |
service_name_list = getServiceNames(clientSecret) | |
# check if service name entered exist in service name list | |
if (serviceName in service_name_list): | |
getServiceCurrentStatus(clientSecret, serviceName) | |
else: | |
print('The service name you entered is incorrect') | |
print('Here is the list of service Names:') | |
for service_name in service_name_list: | |
print(service_name) | |
if __name__ == '__main__': | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment