Created
April 1, 2018 15:59
-
-
Save devxpy/10e2ccaf9391cd6c9a0c5e91fa3c9af1 to your computer and use it in GitHub Desktop.
A music player in python that uses aplay as backend, written using zproc ( https://github.com/devxpy/zproc )
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pip install delegator.py zproc | |
# (python3.6 only!) | |
# | |
# python aplay_player.py | |
# (remember to set path to an audio file of your choice at bottom) | |
import atexit | |
import os | |
import signal | |
from pathlib import Path | |
from time import sleep | |
import delegator | |
from zproc import ZeroProcess | |
def resolve_path(filepath): | |
if isinstance(filepath, Path): | |
path = filepath | |
else: | |
path = Path(filepath) | |
if not path.exists(): | |
path = Path.cwd().joinpath(filepath) | |
if not path.exists(): | |
raise FileNotFoundError | |
return path | |
def mainloop(zstate, del_after_play): | |
while True: | |
state = zstate.get_state() # get state | |
playlist = state.get('playlist') # get playlist | |
# if playing and playlist isn't empty | |
if state.get('is_playing') and playlist: | |
# start aplay | |
aplay = delegator.run(['aplay', playlist[0]], block=False) | |
zstate.set_state(aplay_pid=aplay.pid) | |
# wait for aplay to finish | |
aplay.block() | |
zstate.set_state(aplay_pid=None) | |
# check if aplay was stopped manually by user or ended gracefully | |
if zstate.get_state('manually_stopped'): | |
zstate.set_state(manually_stopped=None) # remove manually_stopped flag | |
else: | |
# delete the file if required | |
if del_after_play: | |
try: | |
Path(playlist[0]).unlink() | |
except FileNotFoundError: | |
pass | |
# remove song from playlist | |
del playlist[0] | |
# update playlist | |
zstate.set_state(playlist=playlist) | |
else: | |
sleep(0.1) | |
class Speaker: | |
def __init__(self, del_after_play=False): | |
self.zproc, self.zstate = ZeroProcess(mainloop, props=del_after_play).run() | |
self.zstate.set_state(playlist=[], is_playing=False) | |
def add_to_playlist(self, audio_file): | |
playlist = self.zstate.get_state('playlist') | |
playlist.append(str(resolve_path(audio_file))) | |
self.zstate.set_state(playlist=playlist) | |
def add_to_playlist_and_play(self, audio_file): | |
self.add_to_playlist(audio_file) | |
self.play() | |
def get_playlist(self): | |
return self.zstate.get_state('playlist') | |
def already_in_playlist(self, audio_file): | |
return str(resolve_path(audio_file)) in self.zstate.get_state('playlist') | |
@property | |
def is_playing(self): | |
return self.zstate.get_state('is_playing') | |
def get_aplay_pid(self): | |
return self.zstate.get_state('aplay_pid') | |
def stop(self): | |
self.zstate.set_state(is_playing=False) | |
aplay_pid = self.zstate.get_state('aplay_pid') | |
if aplay_pid: | |
os.kill(aplay_pid, signal.SIGTERM) | |
self.zstate.set_state(manually_stopped=True) | |
def play(self): | |
self.zstate.set_state(is_playing=True) | |
def kill(self): | |
self.zproc.kill() | |
# example usage | |
if __name__ == '__main__': | |
a = Speaker() | |
a.add_to_playlist('test.wav') | |
print(a.get_playlist()) | |
print(a.is_playing) | |
a.play() | |
print(a.is_playing) | |
input() | |
a.stop() | |
print(a.is_playing) | |
a.play() | |
print(a.is_playing) | |
print(a.get_playlist()) | |
input() | |
atexit.register(a.stop) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment