Skip to content

Instantly share code, notes, and snippets.

@adeel
Created August 24, 2009 04:09
Show Gist options
  • Save adeel/173655 to your computer and use it in GitHub Desktop.
Save adeel/173655 to your computer and use it in GitHub Desktop.
mplayer wrapper
import os
import time
import subprocess
import threading
import Queue
# c.f.: http://is.gd/2T1p
path = '/usr/bin/mplayer'
class MPlayerIO(object):
def __init__(self, filename):
if not self.is_path_valid(path):
raise os.error, "%s isn't the mplayer binary." % path
cmd = [path, '-noconsolecontrols', '-slave', filename]
self.pipe = subprocess.Popen(cmd, bufsize=1,
stdout=subprocess.PIPE, stdin=subprocess.PIPE,
stderr=subprocess.STDOUT)
self.queue = Queue.Queue()
self.thread = threading.Thread(target=self.read)
self.thread.start()
def is_path_valid(self, path):
return os.path.exists(path) and os.access(path, os.X_OK)
def write(self, cmd):
"""Send a command to mplayer."""
if not self.is_alive():
raise ClosedPlayer(), "mplayer is not running."
return self.pipe.stdin.write(cmd)
def read(self):
"""Read lines from the pipe and add them to the queue."""
while True:
if not self.is_alive():
return
line = self.pipe.stdout.readline()
if not line:
return
self.queue.put(line)
def readline(self):
"""Read one line from the queue."""
while True:
if self.queue.empty():
time.sleep(0.1)
continue
line = self.queue.get()
return line
def close(self):
"""Close the pipe and wait for the thread to terminate."""
if self.is_alive():
self.pipe.communicate('quit\n')
self.pipe.wait()
self.thread.join()
def is_alive(self):
return self.pipe.poll() is None
class Player(object):
def __init__(self, filename):
"""Open connection to mplayer and start playback."""
if not os.path.exists(filename):
raise os.error, 'File not found: %s' % filename
self.io = MPlayerIO(filename)
while True:
line = self.io.readline()
if line.startswith('Failed to open '):
raise InvalidFile(), 'Failed to open %s.' % self.filename
if line.startswith('Starting playback...'):
break
if line.startswith('A: '):
break
self.is_started = True
self.is_playing = True
def play(self):
"""An alias for unpause()."""
self.unpause()
def pause(self):
"""Attempt to pause playback."""
if not self.is_playing:
return
self.io.write('pause\n')
self.is_playing = False
def unpause(self):
"""Attempt to unpause playback."""
if self.is_playing:
return
self.io.write('pause\n')
self.is_playing = True
def stop(self):
"""Attempt to stop playback."""
self.pause()
self.io.write('seek 0 2')
self.is_started = False
def close(self):
"""Attempt to close the player."""
self.is_started = False
self.is_playing = False
self.pause()
self.io.close()
class InvalidFile(Exception): pass
class ClosedPlayer(Exception): pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment