|
from pulsectl import Pulse |
|
from spotipy import Spotify |
|
from threading import Thread |
|
from pydbus import SessionBus |
|
from urllib.request import urlretrieve |
|
from time import time, gmtime, strftime |
|
from gi.repository.GObject import MainLoop |
|
from subprocess import Popen, PIPE, DEVNULL |
|
from spotipy.oauth2 import SpotifyClientCredentials |
|
|
|
|
|
APP_ID = 'yourspotifyappid' |
|
APP_SECRET = 'yourspotifyappsecret' |
|
|
|
|
|
formats = { |
|
0: {'name': 'PA_SAMPLE_U8', 'bitwidth': 8}, |
|
1: {'name': 'PA_SAMPLE_ALAW', 'bitwidth': 8}, |
|
2: {'name': 'PA_SAMPLE_ULAW', 'bitwidth': 8}, |
|
3: {'name': 'PA_SAMPLE_S16LE', 'bitwidth': 16}, |
|
4: {'name': 'PA_SAMPLE_S16BE', 'bitwidth': 16}, |
|
5: {'name': 'PA_SAMPLE_FLOAT32LE', 'bitwidth': 32}, |
|
6: {'name': 'PA_SAMPLE_FLOAT32BE', 'bitwidth': 32}, |
|
7: {'name': 'PA_SAMPLE_S32LE', 'bitwidth': 32}, |
|
8: {'name': 'PA_SAMPLE_S32BE', 'bitwidth': 32}, |
|
9: {'name': 'PA_SAMPLE_S24LE', 'bitwidth': 24}, |
|
10: {'name': 'PA_SAMPLE_S24BE', 'bitwidth': 24}, |
|
11: {'name': 'PA_SAMPLE_S24_32LE', 'bitwidth': 32}, |
|
12: {'name': 'PA_SAMPLE_S24_32BE', 'bitwidth': 32}, |
|
} |
|
|
|
spotipy = Spotify(client_credentials_manager=SpotifyClientCredentials(APP_ID, APP_SECRET)) |
|
|
|
|
|
def add_info(song): |
|
a = spotipy.track(song['id'])['album'] |
|
# Add song year by album |
|
song['year'] = a['release_date'][:4] |
|
# Tell if it is a compilation |
|
song['compilation'] = a['album_type'] == 'compilation' |
|
# Get all album tracks |
|
results = spotipy.album_tracks(a['uri']) |
|
tracks = results['items'] |
|
while results['next']: |
|
results = spotipy.next(results) |
|
tracks.extend(results['items']) |
|
# Fetch song cover by album |
|
song['cover'], _ = urlretrieve(a['images'][0]['url']) |
|
# Count discs and tracks |
|
discs = {} |
|
for tmp in tracks: |
|
disc = tmp['disc_number'] |
|
track = tmp['track_number'] |
|
if not disc in discs: |
|
discs[disc] = 0 |
|
discs[disc] += 1 |
|
song['track'] = '%d/%d' % (song['track'], discs[song['disc']]) |
|
song['disc'] = '%d/%d' % (song['disc'], len(discs)) |
|
return song |
|
|
|
def report_duration(secs): |
|
print() |
|
from time import sleep |
|
for sec in range(secs): |
|
perc = int((sec + 1) * 100 / secs) |
|
print('--- RECORDING: %3d%% ---' % perc, end='\r', flush=True) |
|
sleep(1) |
|
print('--- RECORDING FINISHED ---') |
|
|
|
def main(): |
|
pulse = Pulse() |
|
spotify = SessionBus().get('org.mpris.MediaPlayer2.spotify', '/org/mpris/MediaPlayer2') |
|
|
|
# STEP 00: Helpers |
|
def sink(key=None): |
|
for sink in pulse.sink_input_list(): |
|
if sink.name == 'Spotify': |
|
if not key: |
|
return sink |
|
return getattr(sink, key) |
|
|
|
# STEP 01: Get info |
|
meta = spotify.Metadata |
|
secs = meta['mpris:length'] / 1e6 |
|
song = add_info({ |
|
# Base info |
|
'title': meta['xesam:title'], |
|
'artist': '; '.join(meta['xesam:artist']), |
|
'album': meta['xesam:album'], |
|
'duration': strftime('%H:%M:%S', gmtime(secs)), |
|
# Additional info |
|
'id': meta['mpris:trackid'], |
|
'disc': meta['xesam:discNumber'], |
|
'track': meta['xesam:trackNumber'], |
|
}) |
|
|
|
# STEP 02: Print info |
|
print('--- SONG INFO ---') |
|
print('Title: %s' % song['title']) |
|
print('Artist: %s' % song['artist']) |
|
print('Album: %s' % song['album']) |
|
print('Duration: %s' % song['duration']) |
|
print('Year: %s' % song['year']) |
|
|
|
# STEP 03: 100% volume |
|
volume = sink('volume') |
|
volume.value_flat = 1 |
|
pulse.sink_input_volume_set(sink('index'), volume) |
|
|
|
# STEP 04: Define record |
|
spec = sink('sample_spec') |
|
spec_format = spec.format |
|
meta_format = formats[spec_format] |
|
args = { |
|
'rate': spec.rate, |
|
'channels': spec.channels, |
|
'format': meta_format['name'], |
|
'bitwidth': meta_format['bitwidth'], |
|
'signed': 'SAMPLE_S' in meta_format['name'], |
|
'endianness': 'little' if meta_format['name'].endswith('LE') else 'big', |
|
} |
|
|
|
# STEP 05: Print info |
|
print() |
|
print('--- AUDIO INFO ---') |
|
print('Channels: %d' % args['channels']) |
|
print('Format: %s' % args['format'].lower()[10:]) |
|
print('Endianness: %s' % args['endianness']) |
|
print('Sample bits: %d bits' % args['bitwidth']) |
|
print('Sample rate: %d Hz' % args['rate']) |
|
|
|
# STEP 06: Prepare record |
|
null_name = 'rec_' + str(int(time())) |
|
null_id = pulse.module_load('module-null-sink', 'sink_name=' + null_name) |
|
null_sink = pulse.get_sink_by_name(null_name) |
|
pulse.sink_input_move(sink('index'), null_sink.index) |
|
|
|
# STEP 07: Reset |
|
spotify.Stop() |
|
spotify.Previous() |
|
|
|
# STEP 08: Prepare commands |
|
parec_cmd = ['parec', '-d', null_name + '.monitor'] |
|
conv_cmd = [ |
|
# Basic |
|
'faac', '-P', |
|
'--overwrite', '-', |
|
'-R', str(args['rate']), |
|
'-B', str(args['bitwidth']), |
|
'-C', str(args['channels']), |
|
# Metadata |
|
'--artist', song['artist'], |
|
'--title', song['title'], |
|
'--album', song['album'], |
|
'--track', song['track'], |
|
'--disc', song['disc'], |
|
'--year', song['year'], |
|
'--cover-art', song['cover'], |
|
'--comment', song['id'], |
|
# Output |
|
'-o', 'Music/%s - %s.m4a' % (meta['xesam:artist'][0], song['title']), |
|
] |
|
if args['endianness'] == 'little': |
|
conv_cmd.insert(4, '-X') |
|
if song['compilation']: |
|
conv_cmd.insert(4, '--compilation') |
|
|
|
# STEP 09: Start record |
|
parec = Popen(parec_cmd, stdout=PIPE) |
|
conv = Popen(conv_cmd, stdin=parec.stdout, stderr=DEVNULL) |
|
spotify.PlayPause() |
|
counter = Thread(target=report_duration, args=(int(secs),)) |
|
counter.start() |
|
|
|
# STEP 10: Wait record |
|
loop = None |
|
curr_id = song['id'] |
|
def changed(_, props, _2): |
|
play_id = props['Metadata']['mpris:trackid'] |
|
|
|
# STEP 11: Save record |
|
if play_id != curr_id: |
|
parec.terminate() |
|
spotify.Stop() |
|
conv.wait() |
|
pulse.module_unload(null_id) |
|
counter.join() |
|
loop.quit() |
|
spotify.onPropertiesChanged = changed |
|
loop = MainLoop() |
|
loop.run() |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |