Last active
August 7, 2023 06:00
-
-
Save airtonix/d20cc582a12345255f8711a2f53f50c9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# #!/usr/bin/env python3 | |
# # code: language=python3 insertSpaces=true tabSize=4 | |
# Install: | |
# - put in ~/.local/bin/slack-musi-status | |
# - chmod +x ~/.local/bin/slack-musi-status | |
# - install python 3.10.10 or better | |
# - pip install dbus-python | |
# | |
# Linux: Create a Systemd Unit | |
# | |
# Macosx: ??? | |
# | |
# Windows: ??? | |
# # import the dbus module | |
import dbus | |
from pgi.repository import GLib | |
from dbus.mainloop.glib import DBusGMainLoop | |
import urllib.parse | |
import urllib.request | |
import json | |
import sys | |
import os | |
DBUS_PATH = "/org/mpris/MediaPlayer2" | |
CONFIG_FILE = "$HOME/.slack_status.conf" | |
config = {} | |
with open(os.path.expandvars(CONFIG_FILE)) as f: | |
for line in f: | |
if line.startswith("#"): | |
continue | |
if line.startswith("\n"): | |
continue | |
key, value = line.strip().split("=") | |
config[key] = value | |
# a function that safely gets an item from an array | |
def safe_get(array, index, default=None): | |
try: | |
return array[index] | |
except IndexError: | |
return default | |
class SlackStatusUpdater: | |
def __init__(self, token): | |
self.token = token | |
self.api_url = "https://slack.com/api/users.profile.set" | |
def send_status_update(self, emoji = "", status = "", *args, **kwargs): | |
profile = { | |
"status_text": status, | |
"status_emoji": emoji, | |
} | |
data = { | |
"token": self.token, | |
"profile": json.dumps(profile), | |
} | |
try: | |
data_encoded = urllib.parse.urlencode(data).encode("utf-8") | |
req = urllib.request.Request(self.api_url, data=data_encoded) | |
with urllib.request.urlopen(req) as response: | |
response_data = response.read().decode("utf-8") | |
response_json = json.loads(response_data) | |
if response_json.get("ok"): | |
print("Status update sent successfully!", data) | |
else: | |
print("Error sending status update:", response_json.get("error")) | |
except Exception as e: | |
print("Error sending status update:", e) | |
class CurrentSongStore: | |
def __init__(self): | |
self.artist = "" | |
self.title = "" | |
self.album = "" | |
self.state = "stopped" | |
# events | |
self.events = {} | |
def publish(self, event, *args, **kwargs): | |
print("store.publish: %s: %o %o", (event, args, kwargs)) | |
if event not in self.events: | |
return | |
callbacks = self.events[event] | |
for callback in callbacks: | |
callback(*args, **kwargs) | |
# subscribe to an event emitter | |
def subscribe(self, event, callback): | |
if event not in self.events: | |
self.events[event] = [] | |
self.events[event].append(callback) | |
# an event emitter unregister | |
def unsubscribe(self, event, callback): | |
if event not in self.events: | |
return | |
# find the callback and remove it | |
callbacks = self.events[event] | |
for i in range(len(callbacks)): | |
if callbacks[i] == callback: | |
del callbacks[i] | |
break | |
def __str__(self): | |
return """ | |
artist: %s | |
title: %s | |
album: %s | |
""" % ( | |
self.artist, | |
self.title, | |
self.album, | |
) | |
def update(self, artist, title, album): | |
self.artist = artist | |
self.title = title | |
self.album = album | |
def song_changed(self, metadata=None): | |
artist = str(safe_get(metadata, "xesam:artist", [""])[0]) | |
title = str(safe_get(metadata, "xesam:title", "")) | |
album = str(safe_get(metadata, "xesam:album", "")) | |
if artist == "" or title == "" or album == "": | |
return | |
print("router.song_changed: %s", (artist, title, album)) | |
self.update( | |
artist, | |
title, | |
album, | |
) | |
if self.state == "playing" and self.artist != "" and self.title != "" and self.album != "": | |
self.publish("song_changed", artist, title, album) | |
def song_paused(self): | |
print("router.song_paused") | |
self.state = "paused" | |
self.update("", "", "") | |
self.publish("song_paused") | |
def song_started(self): | |
print("router.song_started") | |
self.state = "playing" | |
self.publish("song_started") | |
def song_stopped(self): | |
print("router.song_stopped") | |
self.state = "stopped" | |
self.update("", "", "") | |
self.publish("song_stopped") | |
def song_seek(self): | |
print("router.song_seek") | |
self.state = "seeking" | |
self.publish("song_seek") | |
# | |
# Store Instance | |
current_song_store = CurrentSongStore() | |
slack_notifier = SlackStatusUpdater(config.get('TOKEN', None)) | |
def status_update_new_song(*args, **kwargs): | |
text = "Playing %s - %s" % (current_song_store.artist, current_song_store.title) | |
print(text) | |
slack_notifier.send_status_update( | |
":musical_note:", | |
text, | |
) | |
def status_update_clear(*args, **kwargs): | |
slack_notifier.send_status_update("", "") | |
current_song_store.subscribe("song_changed", status_update_new_song) | |
current_song_store.subscribe("song_paused", status_update_clear) | |
current_song_store.subscribe("song_stopped", status_update_clear) | |
# | |
# Event Router | |
def event_router(*args, **kwargs): | |
member = kwargs.get("member", None) | |
metadata = safe_get(args, 1, None) | |
playback_status = metadata and metadata.get("PlaybackStatus", None) | |
is_playing = playback_status and playback_status == "Playing" | |
is_paused = playback_status and playback_status == "Paused" | |
is_stopped = playback_status and playback_status == "Stopped" | |
track_metadata = metadata and metadata.get("Metadata", None) | |
has_track_metadata = track_metadata is not None | |
if member == "Seeked": | |
current_song_store.song_seek() | |
return | |
if member == "PropertiesChanged" and has_track_metadata: | |
current_song_store.song_changed(track_metadata) | |
return | |
if member == "PropertiesChanged" and is_paused: | |
current_song_store.song_paused() | |
return | |
if member == "PropertiesChanged" and is_paused: | |
current_song_store.song_paused() | |
return | |
if member == "PropertiesChanged" and is_stopped: | |
current_song_store.song_stopped() | |
return | |
if member == "PropertiesChanged" and is_playing: | |
current_song_store.song_started() | |
return | |
# # set the dbus loop | |
DBusGMainLoop(set_as_default=True) | |
# # connect to the session bus | |
bus = dbus.SessionBus() | |
bus.add_signal_receiver( | |
event_router, | |
path=DBUS_PATH, | |
signal_name="Seeked", | |
sender_keyword="sender", | |
interface_keyword="interface", | |
member_keyword="member", | |
path_keyword="path", | |
message_keyword="message", | |
) | |
bus.add_signal_receiver( | |
event_router, | |
path=DBUS_PATH, | |
signal_name="PropertiesChanged", | |
sender_keyword="sender", | |
interface_keyword="interface", | |
member_keyword="member", | |
path_keyword="path", | |
message_keyword="message", | |
) | |
loop = GLib.MainLoop() | |
loop.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment