Skip to content

Instantly share code, notes, and snippets.

@bonelifer
Created May 4, 2024 05:26
Show Gist options
  • Save bonelifer/5040ad823bf5f2c59467938ca41227f0 to your computer and use it in GitHub Desktop.
Save bonelifer/5040ad823bf5f2c59467938ca41227f0 to your computer and use it in GitHub Desktop.
This script monitors MPD for database update events and sends notifications accordingly to a local NTFY.sh instance.
#!/usr/bin/env python3
"""
MPD Database Update Notification Script
This script monitors MPD for database update events and sends notifications accordingly to a local NTFY.sh instance.
Usage:
Run the script without any arguments to monitor MPD updates in real-time.
Use the -t or --test flag to send a test notification regardless of MPD state.
Author:
[Your Name]
"""
import argparse
import subprocess
import requests
import random
# Script arguments
parser = argparse.ArgumentParser(description="MPD update notification script")
parser.add_argument("-t", "--test", action="store_true", default=False,
help="Send test notification (sends to topic regardless of MPD state)")
args = parser.parse_args()
# NTFY.sh Notification settings
notification_ip = "192.168.1.80"
notification_port = "9191"
notification_topic = "mpd-notifications"
# Hardcoded notification tags
regular_tags = "mpd,mpd-notification,mpd-database-update"
test_tags = "warning,skull"
# myMPD Click link variables
myMPD_URL = "http://192.168.1.80"
myMPD_PORT = "8081"
# Constructing the click URL
click_url = f"{myMPD_URL}:{myMPD_PORT}"
notification_url = f"http://{notification_ip}:{notification_port}/{notification_topic}"
# Global variable to track the database update state
update_started = False
def send_alert(text, title="MPD Action", priority="urgent", tags=regular_tags):
"""
Send an alert using the specified URL with the given parameters.
Args:
text (str): The text describing the action being taken.
title (str, optional): The title of the alert. Defaults to "MPD Action".
priority (str, optional): The priority level of the alert. Defaults to "urgent".
tags (str, optional): Tags associated with the alert. Defaults to regular_tags.
Returns:
requests.Response: Response object from the HTTP request.
"""
headers = {
"Title": title,
"Priority": priority,
"Tags": tags,
"Click": click_url # Use the constructed click URL
}
response = requests.post(notification_url, data=text, headers=headers)
return response
def monitor_mpd_updates():
"""Monitors MPD for changes using mpc idle"""
global update_started
process = subprocess.Popen(["mpc", "idle", "database"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
text=True)
for line in iter(process.stdout.readline, ""):
if line.strip().startswith("changed:: database"):
# Database event detected
if "update" in line:
if not update_started:
send_alert("MPD database update started", tags=test_tags if args.test else regular_tags)
update_started = True
else:
if update_started:
send_alert("MPD database update finished", tags=test_tags if args.test else regular_tags)
update_started = False
process.wait()
if __name__ == "__main__":
# Send test notification if requested
if args.test:
# Randomly choose between "started" and "finished" for test notification
test_message = random.choice(["MPD database update started (Test Notification)",
"MPD database update finished (Test Notification)"])
send_alert(test_message, priority="high", tags=test_tags)
else:
monitor_mpd_updates()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment