Skip to content

Instantly share code, notes, and snippets.

@JasonLG1979
Last active September 9, 2016 21:50
Show Gist options
  • Save JasonLG1979/70c493bfb351a7f98bd188158b87dab2 to your computer and use it in GitHub Desktop.
Save JasonLG1979/70c493bfb351a7f98bd188158b87dab2 to your computer and use it in GitHub Desktop.
import os
from enum import Enum
import gi
gi.require_version('Gtk', '3.0')
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gtk, GLib, Gst
class GstreamerTest(Gtk.Window):
# PlayState constants.
# We need a special 'BUFFERING' state
# in case the file is not local
# that Gstreamer does not provide.
class PlayerState(Enum):
PLAYING = Gst.State.PLAYING
PAUSED = Gst.State.PAUSED
BUFFERING = Gst.State.PAUSED
STOPPED = Gst.State.NULL
def __init__(self):
Gtk.Window.__init__(self, title='Gstreamer Test')
self.set_default_size(500, -1)
self.set_resizable(False)
self.set_border_width(10)
self.headerbar = Gtk.HeaderBar()
self.headerbar.set_show_close_button(True)
self.headerbar.set_title('Gstreamer Test')
self.set_titlebar(self.headerbar)
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=10)
self.add(vbox)
self.label1 = Gtk.Label()
self.label1.set_label('Absolute file path or url of audio file:')
vbox.pack_start(self.label1, True, True, 0)
self.entry = Gtk.Entry()
vbox.pack_start(self.entry, False, True, 0)
self.label = Gtk.Label()
self.label.set_label('00:00 / 00:00')
vbox.pack_start(self.label, True, True, 0)
self.clear_entry_button = Gtk.Button.new_with_label('Clear Entry')
self.clear_entry_button.connect('clicked', lambda *ignore: self.entry.set_text(''))
self.start_over_button = Gtk.Button.new_with_label('Start Song')
self.start_over_button.connect('clicked', self.on_start_song)
self.play_pause_button = Gtk.Button.new_with_label('Play/Pause')
self.play_pause_button.connect('clicked', self.on_play_pause)
self.message = Gtk.Label()
self.message.set_label('Stopped')
vbox.pack_start(self.clear_entry_button, False, False, 0)
vbox.pack_start(self.start_over_button, False, False, 0)
vbox.pack_start(self.play_pause_button, False, False, 0)
vbox.pack_start(self.message, True, True, 0)
self.entry.connect('changed', self.on_entry_changed)
self.playstate = None
self.playstate_before_buffering = None
self.ui_loop_timer_id = 0
self._query_duration = Gst.Query.new_duration(Gst.Format.TIME)
self._query_position = Gst.Query.new_position(Gst.Format.TIME)
self.player = Gst.ElementFactory.make('playbin', 'player')
bus = self.player.get_bus()
bus.add_signal_watch()
bus.connect('message::eos', self.on_start_song)
bus.connect('message::buffering', self.on_gst_buffering)
def create_ui_loop(self):
if not self.ui_loop_timer_id:
self.update_time()
self.ui_loop_timer_id = GLib.timeout_add_seconds(1, self.update_time)
def destroy_ui_loop(self):
if self.ui_loop_timer_id:
GLib.source_remove(self.ui_loop_timer_id)
self.ui_loop_timer_id = 0
def update_time(self):
self.label.set_label('{} / {}'.format(self.position, self.duration))
return True
@property
def position(self):
position = 0
pos_stat = self.player.query(self._query_position)
if pos_stat:
position = self._query_position.parse_position()[1]
return self.format_time(position)
@property
def duration(self):
duration = 0
dur_stat = self.player.query(self._query_duration)
if dur_stat:
duration = self._query_duration.parse_duration()[1]
return self.format_time(duration)
def format_time(self, time_int):
if time_int is None:
return None
time_int //= 1000000000
s = time_int % 60
time_int //= 60
m = time_int % 60
time_int //= 60
h = time_int
if h:
return "%02i:%02i:%02i"%(h,m,s)
else:
return '%02i:%02i'%(m,s)
def _set_state(self, desired_state):
if desired_state is self.PlayerState.BUFFERING:
self.playstate_before_buffering = self.playstate
self.playstate = desired_state
self.player.set_state(desired_state.value)
if desired_state is self.PlayerState.PLAYING:
self.message.set_label('Playing')
self.create_ui_loop()
else:
self.destroy_ui_loop()
def on_play_pause(self, *ignore):
if self.playstate is self.PlayerState.PLAYING:
self._set_state(self.PlayerState.PAUSED)
self.message.set_label('Paused')
elif self.playstate is self.PlayerState.PAUSED:
self._set_state(self.PlayerState.PLAYING)
def on_entry_changed(self, *ignore):
text = self.entry.get_text().strip('\r\n\x00')
self.entry.set_text(text)
def on_start_song(self, *ignore):
self._set_state(self.PlayerState.STOPPED)
self.message.set_label('Stopped')
self.update_time()
uri = self.entry.get_text()
if uri:
if os.path.isfile(uri):
uri = 'file://' + uri
self.player.set_property('uri', uri)
if uri.startswith('file://'):
self._set_state(self.PlayerState.PLAYING)
else:
self._set_state(self.PlayerState.BUFFERING)
def on_gst_buffering(self, bus, message):
percent = message.parse_buffering()
if percent < 100:
if self.playstate is self.PlayerState.BUFFERING:
self.message.set_label('Buffering (%i%%)'%percent)
else:
self._set_state(self.PlayerState.BUFFERING)
else:
if self.playstate_before_buffering is self.PlayerState.PLAYING or self.PlayerState.STOPPED:
self._set_state(self.PlayerState.PLAYING)
else:
self._set_state(self.PlayerState.PAUSED)
if __name__ == '__main__':
Gst.init(None)
win = GstreamerTest()
win.connect('delete-event', Gtk.main_quit)
win.show_all()
Gtk.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment