A music player in python that uses aplay as backend, written using zproc ( https://github.com/devxpy/zproc )
# pip install delegator.py zproc | |
# (python3.6 only!) | |
# | |
# python aplay_player.py | |
# (remember to set path to an audio file of your choice at bottom) | |
import atexit | |
import os | |
import signal | |
from pathlib import Path | |
from time import sleep | |
import delegator | |
from zproc import ZeroProcess | |
def resolve_path(filepath): | |
if isinstance(filepath, Path): | |
path = filepath | |
else: | |
path = Path(filepath) | |
if not path.exists(): | |
path = Path.cwd().joinpath(filepath) | |
if not path.exists(): | |
raise FileNotFoundError | |
return path | |
def mainloop(zstate, del_after_play): | |
while True: | |
state = zstate.get_state() # get state | |
playlist = state.get('playlist') # get playlist | |
# if playing and playlist isn't empty | |
if state.get('is_playing') and playlist: | |
# start aplay | |
aplay = delegator.run(['aplay', playlist[0]], block=False) | |
zstate.set_state(aplay_pid=aplay.pid) | |
# wait for aplay to finish | |
aplay.block() | |
zstate.set_state(aplay_pid=None) | |
# check if aplay was stopped manually by user or ended gracefully | |
if zstate.get_state('manually_stopped'): | |
zstate.set_state(manually_stopped=None) # remove manually_stopped flag | |
else: | |
# delete the file if required | |
if del_after_play: | |
try: | |
Path(playlist[0]).unlink() | |
except FileNotFoundError: | |
pass | |
# remove song from playlist | |
del playlist[0] | |
# update playlist | |
zstate.set_state(playlist=playlist) | |
else: | |
sleep(0.1) | |
class Speaker: | |
def __init__(self, del_after_play=False): | |
self.zproc, self.zstate = ZeroProcess(mainloop, props=del_after_play).run() | |
self.zstate.set_state(playlist=[], is_playing=False) | |
def add_to_playlist(self, audio_file): | |
playlist = self.zstate.get_state('playlist') | |
playlist.append(str(resolve_path(audio_file))) | |
self.zstate.set_state(playlist=playlist) | |
def add_to_playlist_and_play(self, audio_file): | |
self.add_to_playlist(audio_file) | |
self.play() | |
def get_playlist(self): | |
return self.zstate.get_state('playlist') | |
def already_in_playlist(self, audio_file): | |
return str(resolve_path(audio_file)) in self.zstate.get_state('playlist') | |
@property | |
def is_playing(self): | |
return self.zstate.get_state('is_playing') | |
def get_aplay_pid(self): | |
return self.zstate.get_state('aplay_pid') | |
def stop(self): | |
self.zstate.set_state(is_playing=False) | |
aplay_pid = self.zstate.get_state('aplay_pid') | |
if aplay_pid: | |
os.kill(aplay_pid, signal.SIGTERM) | |
self.zstate.set_state(manually_stopped=True) | |
def play(self): | |
self.zstate.set_state(is_playing=True) | |
def kill(self): | |
self.zproc.kill() | |
# example usage | |
if __name__ == '__main__': | |
a = Speaker() | |
a.add_to_playlist('test.wav') | |
print(a.get_playlist()) | |
print(a.is_playing) | |
a.play() | |
print(a.is_playing) | |
input() | |
a.stop() | |
print(a.is_playing) | |
a.play() | |
print(a.is_playing) | |
print(a.get_playlist()) | |
input() | |
atexit.register(a.stop) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment