Created
August 16, 2020 19:15
-
-
Save ldotlopez/e45702d7c3c7d4f1ff6418996821b873 to your computer and use it in GitHub Desktop.
Micro prototype of Eina in python - github.com/ldotlopez/eina
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gi | |
gi.require_version('Gst', '1.0') | |
gi.require_version('Gtk', '3.0') | |
gi.require_version('GLib', '2.0') | |
import os | |
from urllib import parse | |
from gi.repository import ( | |
Gst, | |
GObject, | |
Gtk, | |
GLib | |
) | |
class LomoPlayer(GObject.GObject): | |
# __gsignals__ = { | |
# 'my_signal': (GObject.SIGNAL_RUN_FIRST, None, | |
# (int,)) | |
# } | |
__gsignals__ = { | |
'state-changed': (GObject.SIGNAL_RUN_FIRST, None, | |
(Gst.State,)) | |
} | |
# foo = GObject.Property(type=str, default='bar') | |
# property_float = GObject.Property(type=float) | |
def __init__(self): | |
super().__init__() | |
self._pipeline = Gst.parse_launch('playbin') | |
bus = self._pipeline.get_bus() | |
bus.add_signal_watch() | |
bus.connect('message', self.on_bus_message) | |
self._queue = [] | |
self.volume = 1.0 | |
def add(self, uri): | |
parsed = parse.urlparse(uri) | |
if not parsed.scheme: | |
if not uri.startswith('/'): | |
uri = os.path.abspath(os.path.curdir) + '/' + uri | |
uri = os.path.realpath(uri) | |
uri = 'file://' + parse.quote(uri) | |
self._queue.append(uri) | |
if len(self._queue) == 1: | |
self._pipeline.set_property('uri', uri) | |
def remove(self, uri_or_index): | |
def _remove_idx(index): | |
if len(self._queue) == 1: | |
self.stop() | |
elif idx == self.current_index: | |
self.next() | |
self._queue.pop(index) | |
if isinstance(uri_or_index, str): | |
idx = self._queue.find(uri_or_index) | |
if isinstance(uri_or_index, int): | |
return _remove_idx(idx) | |
raise ValueError(uri_or_index) | |
def play(self): | |
self._pipeline.set_state(Gst.State.PLAYING) | |
def stop(self): | |
pass | |
@GObject.Property(type=int, minimum=-1) | |
def current_index(self): | |
raise NotImplementedError() | |
@current_index.setter | |
def current_index(self, value): | |
raise NotImplementedError() | |
@GObject.Property(type=float, minimum=0.0, maximum=1.0) | |
def volume(self): | |
self._pipeline.get_property('volume') | |
@volume.setter | |
def volume(self, value): | |
self._pipeline.set_property('volume', value) | |
def on_bus_message(self, bus, message): | |
if message.type is Gst.MessageType.STATE_CHANGED: | |
old, new, pending = message.parse_state_changed() | |
if pending is Gst.State.VOID_PENDING: | |
self.emit("state-changed", new) | |
else: | |
print(repr(message.type)) | |
def main(): | |
import sys | |
def on_state_changed(lomo, state): | |
print("State changed to", repr(state)) | |
Gst.init([]) | |
lomo = LomoPlayer() | |
lomo.connect('state-changed', on_state_changed) | |
for x in sys.argv[1:]: | |
lomo.add(x) | |
lomo.play() | |
GLib.MainLoop().run() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment