-
-
Save JasonLG1979/70c493bfb351a7f98bd188158b87dab2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from enum import Enum | |
import gi | |
gi.require_version('Gtk', '3.0') | |
gi.require_version('Gst', '1.0') | |
from gi.repository import GObject, Gtk, GLib, Gst | |
class GstreamerTest(Gtk.Window): | |
# PlayState constants. | |
# We need a special 'BUFFERING' state | |
# in case the file is not local | |
# that Gstreamer does not provide. | |
class PlayerState(Enum): | |
PLAYING = Gst.State.PLAYING | |
PAUSED = Gst.State.PAUSED | |
BUFFERING = Gst.State.PAUSED | |
STOPPED = Gst.State.NULL | |
def __init__(self): | |
Gtk.Window.__init__(self, title='Gstreamer Test') | |
self.set_default_size(500, -1) | |
self.set_resizable(False) | |
self.set_border_width(10) | |
self.headerbar = Gtk.HeaderBar() | |
self.headerbar.set_show_close_button(True) | |
self.headerbar.set_title('Gstreamer Test') | |
self.set_titlebar(self.headerbar) | |
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=10) | |
self.add(vbox) | |
self.label1 = Gtk.Label() | |
self.label1.set_label('Absolute file path or url of audio file:') | |
vbox.pack_start(self.label1, True, True, 0) | |
self.entry = Gtk.Entry() | |
vbox.pack_start(self.entry, False, True, 0) | |
self.label = Gtk.Label() | |
self.label.set_label('00:00 / 00:00') | |
vbox.pack_start(self.label, True, True, 0) | |
self.clear_entry_button = Gtk.Button.new_with_label('Clear Entry') | |
self.clear_entry_button.connect('clicked', lambda *ignore: self.entry.set_text('')) | |
self.start_over_button = Gtk.Button.new_with_label('Start Song') | |
self.start_over_button.connect('clicked', self.on_start_song) | |
self.play_pause_button = Gtk.Button.new_with_label('Play/Pause') | |
self.play_pause_button.connect('clicked', self.on_play_pause) | |
self.message = Gtk.Label() | |
self.message.set_label('Stopped') | |
vbox.pack_start(self.clear_entry_button, False, False, 0) | |
vbox.pack_start(self.start_over_button, False, False, 0) | |
vbox.pack_start(self.play_pause_button, False, False, 0) | |
vbox.pack_start(self.message, True, True, 0) | |
self.entry.connect('changed', self.on_entry_changed) | |
self.playstate = None | |
self.playstate_before_buffering = None | |
self.ui_loop_timer_id = 0 | |
self._query_duration = Gst.Query.new_duration(Gst.Format.TIME) | |
self._query_position = Gst.Query.new_position(Gst.Format.TIME) | |
self.player = Gst.ElementFactory.make('playbin', 'player') | |
bus = self.player.get_bus() | |
bus.add_signal_watch() | |
bus.connect('message::eos', self.on_start_song) | |
bus.connect('message::buffering', self.on_gst_buffering) | |
def create_ui_loop(self): | |
if not self.ui_loop_timer_id: | |
self.update_time() | |
self.ui_loop_timer_id = GLib.timeout_add_seconds(1, self.update_time) | |
def destroy_ui_loop(self): | |
if self.ui_loop_timer_id: | |
GLib.source_remove(self.ui_loop_timer_id) | |
self.ui_loop_timer_id = 0 | |
def update_time(self): | |
self.label.set_label('{} / {}'.format(self.position, self.duration)) | |
return True | |
@property | |
def position(self): | |
position = 0 | |
pos_stat = self.player.query(self._query_position) | |
if pos_stat: | |
position = self._query_position.parse_position()[1] | |
return self.format_time(position) | |
@property | |
def duration(self): | |
duration = 0 | |
dur_stat = self.player.query(self._query_duration) | |
if dur_stat: | |
duration = self._query_duration.parse_duration()[1] | |
return self.format_time(duration) | |
def format_time(self, time_int): | |
if time_int is None: | |
return None | |
time_int //= 1000000000 | |
s = time_int % 60 | |
time_int //= 60 | |
m = time_int % 60 | |
time_int //= 60 | |
h = time_int | |
if h: | |
return "%02i:%02i:%02i"%(h,m,s) | |
else: | |
return '%02i:%02i'%(m,s) | |
def _set_state(self, desired_state): | |
if desired_state is self.PlayerState.BUFFERING: | |
self.playstate_before_buffering = self.playstate | |
self.playstate = desired_state | |
self.player.set_state(desired_state.value) | |
if desired_state is self.PlayerState.PLAYING: | |
self.message.set_label('Playing') | |
self.create_ui_loop() | |
else: | |
self.destroy_ui_loop() | |
def on_play_pause(self, *ignore): | |
if self.playstate is self.PlayerState.PLAYING: | |
self._set_state(self.PlayerState.PAUSED) | |
self.message.set_label('Paused') | |
elif self.playstate is self.PlayerState.PAUSED: | |
self._set_state(self.PlayerState.PLAYING) | |
def on_entry_changed(self, *ignore): | |
text = self.entry.get_text().strip('\r\n\x00') | |
self.entry.set_text(text) | |
def on_start_song(self, *ignore): | |
self._set_state(self.PlayerState.STOPPED) | |
self.message.set_label('Stopped') | |
self.update_time() | |
uri = self.entry.get_text() | |
if uri: | |
if os.path.isfile(uri): | |
uri = 'file://' + uri | |
self.player.set_property('uri', uri) | |
if uri.startswith('file://'): | |
self._set_state(self.PlayerState.PLAYING) | |
else: | |
self._set_state(self.PlayerState.BUFFERING) | |
def on_gst_buffering(self, bus, message): | |
percent = message.parse_buffering() | |
if percent < 100: | |
if self.playstate is self.PlayerState.BUFFERING: | |
self.message.set_label('Buffering (%i%%)'%percent) | |
else: | |
self._set_state(self.PlayerState.BUFFERING) | |
else: | |
if self.playstate_before_buffering is self.PlayerState.PLAYING or self.PlayerState.STOPPED: | |
self._set_state(self.PlayerState.PLAYING) | |
else: | |
self._set_state(self.PlayerState.PAUSED) | |
if __name__ == '__main__': | |
Gst.init(None) | |
win = GstreamerTest() | |
win.connect('delete-event', Gtk.main_quit) | |
win.show_all() | |
Gtk.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment