Created
October 21, 2020 13:00
-
-
Save evnsio/0a29a3ff263a3d2fccdbfb7c1de9ef66 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#--------------------------------------------------------------------- | |
# Incident Escalations with PagerDuty | |
#--------------------------------------------------------------------- | |
ESCALATE_BUTTON = "escalate-button-id" | |
PAGE_SPECIALIST_DIALOG = "dialog-page-specialist" | |
class PagerDutySpecialist(models.Model): | |
name = models.CharField(max_length=100, unique=True) | |
summary = models.TextField(max_length=1000) | |
escalation_policy = models.CharField(max_length=10) | |
def __str__(self): | |
return f"{self.name} - {self.escalation_policy}" | |
@admin.register(PagerDutySpecialist) | |
class PagerDutySpecialistAdmin(admin.ModelAdmin): | |
pass | |
@incident_command(["escalate", "esc"], helptext="Escalate to a specialist") | |
def handle_escalation(incident: Incident, user_id: str, message: str): | |
msg = block_kit.Message() | |
specialists = PagerDutySpecialist.objects.all().order_by("name") | |
if not specialists: | |
msg.add_block( | |
block_kit.Section( | |
text=block_kit.Text("No specialists have been configured 😢") | |
) | |
) | |
else: | |
msg.add_block( | |
block_kit.Section( | |
text=block_kit.Text("Let's find the right people to help out 🔍") | |
) | |
) | |
msg.add_block(block_kit.Divider()) | |
msg.add_block( | |
block_kit.Section( | |
text=block_kit.Text( | |
"These are the teams available as specialist escalations:" | |
) | |
) | |
) | |
for team in specialists: | |
team_section = block_kit.Section( | |
text=block_kit.Text(f"*{team.name}*\n{team.summary}"), | |
accessory=block_kit.Button( | |
f"📟 Page {team.name}", ESCALATE_BUTTON, value=f"{team.name}" | |
), | |
) | |
msg.add_block(team_section) | |
msg.add_block(block_kit.Divider()) | |
msg.add_block( | |
block_kit.Section( | |
text=block_kit.Text( | |
"Not sure who to pick? The Primary On-callers can help!" | |
) | |
) | |
) | |
comms_channel = CommsChannel.objects.get(incident=incident) | |
msg.send(comms_channel.channel_id) | |
return True, None | |
@action_handler(ESCALATE_BUTTON) | |
def handle_page_oncall_engineer(context: ActionContext): | |
dialog = dialog_builder.Dialog( | |
title="Escalate to a Specialist", | |
submit_label="Escalate", | |
elements=[ | |
dialog_builder.Text( | |
label="Message", | |
name="message", | |
placeholder="Why do you need them?", | |
hint="You might be waking this person up. Please make this friendly and clear.", | |
) | |
], | |
state=context.value, | |
) | |
dialog.send_open_dialog(PAGE_SPECIALIST_DIALOG, context.trigger_id) | |
@dialog_handler(PAGE_SPECIALIST_DIALOG) | |
def page_specialist_dialog( | |
user_id: str, channel_id: str, submission: json, response_url: str, state: json | |
): | |
logger.debug(f"Handling dialog for `page_specialist_dialog`") | |
comms_channel = CommsChannel.objects.get(channel_id=channel_id) | |
specialist = PagerDutySpecialist.objects.get(name=state) | |
page_specialist(comms_channel.incident, specialist, submission["message"]) | |
def page_specialist(incident: Incident, specialist: PagerDutySpecialist, message: str): | |
logger.debug(f"Handling `page_specialist` on Incident {incident.id}") | |
key = incident_key(incident) | |
logger.debug( | |
f"About to call PagerDuty's API: 'pypd.Incident.find(incident_key={key})`" | |
) | |
pd_incident = next(iter(pypd.Incident.find(incident_key=key)), None) | |
logger.debug( | |
f"Completed call to PagerDuty's API: 'pypd.Incident.find(incident_key={key})`" | |
) | |
comms_channel = CommsChannel.objects.get(incident=incident) | |
message = f"{message}. Please join us in #{comms_channel.channel_name}" | |
try: | |
if pd_incident: | |
logger.debug( | |
f"Existing pagerduty incident found so reassigning to the specialists" | |
) | |
from_user = pypd.User.find_one(email=settings.PAGERDUTY_EMAIL) | |
pd_incident.add_responders( | |
settings.PAGERDUTY_EMAIL, | |
from_user.id, | |
message, | |
escalation_policy_ids=[specialist.escalation_policy], | |
) | |
else: | |
logger.debug( | |
f"No existing pagerduty incident so triggering one directly for the specialists" | |
) | |
trigger_incident( | |
message, | |
incident.report or "", | |
settings.PAGERDUTY_SERVICE, | |
key, | |
escalation_policy=specialist.escalation_policy, | |
) | |
comms_channel.post_in_channel( | |
f"We've sent a page to {specialist.name} with the message: \n>{message}" | |
) | |
except Exception as e: | |
logger.error(f"PagerDuty Error: {e}") | |
comms_channel.post_in_channel( | |
f"It looks like that didn't work. You can page them directly at https://monzo.pagerduty.com" | |
) | |
#--------------------------------------------------------------------- | |
# Status Page Updates | |
#--------------------------------------------------------------------- | |
__statuspage_client = None | |
if not ( | |
getattr(settings, "STATUSPAGEIO_API_KEY", None) | |
and getattr(settings, "STATUSPAGEIO_PAGE_ID", None) | |
): | |
raise ImproperlyConfigured( | |
"Statuspage integration is active but STATUSPAGEIO_API_KEY/STATUSPAGEIO_PAGE_ID are not configured" | |
) | |
class StatusPageError(Exception): | |
pass | |
def statuspage_client(): | |
global __statuspage_client | |
if __statuspage_client == None: | |
if getattr(settings, "STATUSPAGEIO_API_KEY", None) and getattr( | |
settings, "STATUSPAGEIO_PAGE_ID", None | |
): | |
__statuspage_client = statuspageio.Client( | |
api_key=settings.STATUSPAGEIO_API_KEY, | |
page_id=settings.STATUSPAGEIO_PAGE_ID, | |
) | |
else: | |
raise ValueError( | |
"Statuspage client called but not configured. Check that STATUSPAGEIO_API_KEY and STATUSPAGEIO_PAGE_ID are configured in Django settings." | |
) | |
return __statuspage_client | |
logger = logging.getLogger(__name__) | |
OPEN_STATUS_PAGE_DIALOG = "dialog-open-status-page" | |
STATUS_PAGE_UPDATE = "status-page-update" | |
class StatusPage(models.Model): | |
incident = models.ForeignKey(Incident, on_delete=models.PROTECT) | |
statuspage_incident_id = models.CharField(max_length=100, unique=True, null=True) | |
def update_statuspage(self, **kwargs): | |
if self.statuspage_incident_id: | |
statuspage_client().incidents.update( | |
incident_id=self.statuspage_incident_id, **kwargs | |
) | |
else: | |
response = statuspage_client().incidents.create(**kwargs) | |
self.statuspage_incident_id = response["id"] | |
self.save() | |
def get_from_statuspage(self): | |
if self.statuspage_incident_id: | |
for incident in statuspage_client().incidents.list(): | |
if incident["id"] == self.statuspage_incident_id: | |
return { | |
"name": incident["name"], | |
"status": incident["status"], | |
"message": incident["incident_updates"][0]["body"], | |
"impact_override": incident["impact_override"], | |
} | |
raise StatusPageError( | |
f"Statuspage incident with id {self.statuspage_incident_id} not found" | |
) | |
return {} | |
@admin.register(StatusPage) | |
class StatusPageAdmin(admin.ModelAdmin): | |
list_display = ("incident_summary", "statuspage_incident_id") | |
def incident_summary(self, obj): | |
return obj.incident.summary | |
@incident_command( | |
["statuspage", "sp"], helptext="Update the statuspage for this incident" | |
) | |
def handle_statuspage(incident: Incident, user_id: str, message: str): | |
logger.info("Handling statuspage command") | |
comms_channel = CommsChannel.objects.get(incident=incident) | |
try: | |
status_page = StatusPage.objects.get(incident=incident) | |
values = status_page.get_from_statuspage() | |
if values.get("status") == "resolved": | |
comms_channel.post_in_channel( | |
"The status page can't be updated after it has been resolved." | |
) | |
return True, None | |
except models.ObjectDoesNotExist: | |
logger.info( | |
"Existing status page not found. Posting button to create a new one" | |
) | |
msg = block_kit.Message() | |
msg.add_block( | |
block_kit.Section( | |
block_id="title", | |
text=block_kit.Text(f"To update the Statuspage, click below!"), | |
) | |
) | |
msg.add_block( | |
block_kit.Actions( | |
block_id="actions", | |
elements=[ | |
block_kit.Button( | |
"Update Statuspage", OPEN_STATUS_PAGE_DIALOG, value=incident.pk | |
) | |
], | |
) | |
) | |
msg.send(comms_channel.channel_id) | |
return True, None | |
@action_handler(OPEN_STATUS_PAGE_DIALOG) | |
def handle_open_status_page_dialog(action_context: ActionContext): | |
try: | |
status_page = StatusPage.objects.get(incident=action_context.incident) | |
values = status_page.get_from_statuspage() | |
if values.get("status") == "resolved": | |
logger.info( | |
f"Status Page incident '{values.get('name')}' has been resolved" | |
) | |
status_page.incident.comms_channel().post_in_channel( | |
"The status page can't be updated after it has been resolved." | |
) | |
return | |
except models.ObjectDoesNotExist: | |
values = { | |
"name": "We're experiencing some issues at the moment", | |
"status": "investigating", | |
"message": "We're getting all the information we need to fix this and will update the status page as soon as we can.", | |
"impact_override": "major", | |
} | |
dialog = dialog_builder.Dialog( | |
title="Statuspage Update", | |
submit_label="Update", | |
state=action_context.incident.pk, | |
elements=[ | |
dialog_builder.Text( | |
label="Name", | |
name="name", | |
value=values.get("name"), | |
hint="Make this concise and clear - it's what will show in the apps", | |
), | |
dialog_builder.SelectWithOptions( | |
[ | |
("Investigating", "investigating"), | |
("Identified", "identified"), | |
("Monitoring", "monitoring"), | |
("Resolved", "resolved"), | |
], | |
label="Status", | |
name="incident_status", | |
value=values.get("status"), | |
), | |
dialog_builder.TextArea( | |
label="Description", | |
name="message", | |
optional=True, | |
value=values.get("message"), | |
), | |
dialog_builder.SelectWithOptions( | |
[ | |
("No - don't share on Twitter", "False"), | |
("Yes - post to @monzo-status on Twitter", "True"), | |
], | |
label="Send to Twitter?", | |
name="wants_twitter_update", | |
optional=True, | |
), | |
dialog_builder.SelectWithOptions( | |
[ | |
("Minor", "minor"), | |
("Major", "major"), | |
("Critical", "critical"), | |
], | |
label="Severity", | |
name="impact_override", | |
optional=True, | |
value=values.get("impact_override"), | |
), | |
], | |
) | |
dialog.send_open_dialog(STATUS_PAGE_UPDATE, action_context.trigger_id) | |
@dialog_handler(STATUS_PAGE_UPDATE) | |
def update_status_page( | |
user_id: str, channel_id: str, submission: json, response_url: str, state: json | |
): | |
incident_id = state | |
incident = Incident.objects.get(pk=incident_id) | |
try: | |
status_page = StatusPage.objects.get(incident=incident_id) | |
except models.ObjectDoesNotExist: | |
status_page = StatusPage(incident=incident) | |
status_page.save() | |
statuspage_incident = { | |
"name": submission["name"], | |
"status": submission["incident_status"], | |
"message": submission["message"] or "", | |
"wants_twitter_update": bool( | |
submission.get("wants_twitter_update", "False") == "True" | |
), | |
} | |
if submission["impact_override"]: | |
statuspage_incident["impact_override"] = submission["impact_override"] | |
status_page.update_statuspage(**statuspage_incident) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment