Skip to content

Instantly share code, notes, and snippets.

@cofob
Last active January 6, 2024 11:24
Show Gist options
  • Save cofob/5a7f30d321d9f7a3f224a7ba145b7488 to your computer and use it in GitHub Desktop.
Save cofob/5a7f30d321d9f7a3f224a7ba145b7488 to your computer and use it in GitHub Desktop.
__version__ = (2, 0, 0)
# meta developer: @cofob
# requires: git+https://github.com/MarshalX/yandex-music-api@dev eyed3
# meta desc: Module for downloading music from Yandex Music
import logging
import os
import eyed3
from eyed3.id3.frames import ImageFrame
from asyncio import sleep
from yandex_music import ClientAsync, TracksList, Track
from telethon import TelegramClient
from telethon.errors.rpcerrorlist import FloodWaitError
import traceback
from .. import loader
logger = logging.getLogger(__name__)
logging.getLogger("yandex_music").propagate = False
@loader.tds
class YaDownMod(loader.Module):
"""Module for downloading favorite music from Yandex Music"""
strings = {
"name": "YaDown",
"no_token": "<b>🚫 Specify a token in config!</b>",
"no_channel": "<b>🚫 Specify a channel in config!</b>",
"_cfg_yandexmusictoken": "Yandex Music token",
"_cfg_postchannel": "Channel for posting music",
}
strings_ru = {
"no_token": "<b>🚫 Укажи токен в конфиге!</b>",
"no_channel": "<b>🚫 Укажи канал в конфиге!</b>",
"_cfg_yandexmusictoken": "Токен Yandex Music",
"_cfg_postchannel": "Канал для постинга музыки",
}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"YandexMusicToken",
None,
lambda: self.strings["_cfg_yandexmusictoken"],
validator=loader.validators.Hidden(),
),
loader.ConfigValue("PostChannel", None, lambda: self.strings["_cfg_postchannel"]),
)
async def client_ready(self, client: TelegramClient, db):
self.client = client
self.db = db
if self.config["YandexMusicToken"] and self.config["PostChannel"]:
logger.info("Starting downloader")
self.downloader.start()
@loader.loop(interval=300)
async def downloader(self):
if not self.config["YandexMusicToken"]:
logger.error(self.strings["no_token"])
return
if not self.config["PostChannel"]:
logger.error(self.strings["no_channel"])
return
client = ClientAsync(self.config["YandexMusicToken"])
await client.init()
tracks: list[TracksList] = (await client.users_likes_tracks())[::-1]
for short_track in tracks:
track_id = short_track.track_id
logger.debug(f"Processing track {track_id}")
try:
downloaded = self.get("downloaded", {})
if isinstance(downloaded, list):
downloaded = {}
if track_id in downloaded:
continue
full_track: Track = await short_track.fetch_track_async()
if not full_track.available:
continue
if not full_track.artists:
continue
if not full_track.title:
continue
name = f"{full_track.artists[0].name} - {full_track.title}"
filename = name.replace("/", "-")
text = f"<emoji document_id=5188621441926438751>🎵</emoji> <b>{name}</b>"
text += "\n<emoji document_id=5373012449597335010>👤</emoji> "
for artist in full_track.artists:
artist_name = artist.name.replace(" ", "_").replace(".", "_").replace("!", "_").replace("?", "_").replace("-", "_").replace(",", "_").replace("(", "_").replace(")", "_")
text += f"#{artist_name} "
text = text.strip()
text += f'\n<emoji document_id=5445284980978621387>🚀</emoji> <a href="https://music.yandex.ru/track/{full_track.id}">ya.music</a> | <a href="https://song.link/ya/{full_track.id}">Other</a>'
logger.info(f"Downloading {name}")
await full_track.download_async(f"{filename}.mp3")
audiofile = eyed3.load(f"{filename}.mp3")
if (audiofile.tag == None):
audiofile.initTag()
logger.info(f"Adding cover for {name}")
try:
audiofile.tag.images.set(ImageFrame.FRONT_COVER, await full_track.download_cover_bytes_async("1000x1000"), 'image/jpeg')
except AttributeError:
logger.error(f"Failed to add cover for {name}, cover cant be fetched")
if full_track.artists:
audiofile.tag.artist = full_track.artists[0].name
if full_track.title:
audiofile.tag.title = full_track.title
if full_track.albums:
audiofile.tag.album = full_track.albums[0].title
logger.info(f"Saving tags for {name}")
audiofile.tag.save()
logger.info(f"Uploading {name}")
m = await self.client.send_file(
self.config["PostChannel"],
f"{filename}.mp3",
caption=text,
)
logger.info(f"Removing {name}")
os.remove(f"{filename}.mp3")
downloaded[track_id] = {
"message_id": m.id,
"version": 2,
}
self.set("downloaded", downloaded)
except FloodWaitError as e:
logger.warning(f"Sleeping for {e.seconds} seconds")
await sleep(e.seconds)
except Exception as e:
logger.error(traceback.format_exc())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment