Skip to content

Instantly share code, notes, and snippets.

@ldotlopez
Created August 16, 2020 19:15
Show Gist options
  • Save ldotlopez/e45702d7c3c7d4f1ff6418996821b873 to your computer and use it in GitHub Desktop.
Save ldotlopez/e45702d7c3c7d4f1ff6418996821b873 to your computer and use it in GitHub Desktop.
Micro prototype of Eina in python - github.com/ldotlopez/eina
import gi
gi.require_version('Gst', '1.0')
gi.require_version('Gtk', '3.0')
gi.require_version('GLib', '2.0')
import os
from urllib import parse
from gi.repository import (
Gst,
GObject,
Gtk,
GLib
)
class LomoPlayer(GObject.GObject):
# __gsignals__ = {
# 'my_signal': (GObject.SIGNAL_RUN_FIRST, None,
# (int,))
# }
__gsignals__ = {
'state-changed': (GObject.SIGNAL_RUN_FIRST, None,
(Gst.State,))
}
# foo = GObject.Property(type=str, default='bar')
# property_float = GObject.Property(type=float)
def __init__(self):
super().__init__()
self._pipeline = Gst.parse_launch('playbin')
bus = self._pipeline.get_bus()
bus.add_signal_watch()
bus.connect('message', self.on_bus_message)
self._queue = []
self.volume = 1.0
def add(self, uri):
parsed = parse.urlparse(uri)
if not parsed.scheme:
if not uri.startswith('/'):
uri = os.path.abspath(os.path.curdir) + '/' + uri
uri = os.path.realpath(uri)
uri = 'file://' + parse.quote(uri)
self._queue.append(uri)
if len(self._queue) == 1:
self._pipeline.set_property('uri', uri)
def remove(self, uri_or_index):
def _remove_idx(index):
if len(self._queue) == 1:
self.stop()
elif idx == self.current_index:
self.next()
self._queue.pop(index)
if isinstance(uri_or_index, str):
idx = self._queue.find(uri_or_index)
if isinstance(uri_or_index, int):
return _remove_idx(idx)
raise ValueError(uri_or_index)
def play(self):
self._pipeline.set_state(Gst.State.PLAYING)
def stop(self):
pass
@GObject.Property(type=int, minimum=-1)
def current_index(self):
raise NotImplementedError()
@current_index.setter
def current_index(self, value):
raise NotImplementedError()
@GObject.Property(type=float, minimum=0.0, maximum=1.0)
def volume(self):
self._pipeline.get_property('volume')
@volume.setter
def volume(self, value):
self._pipeline.set_property('volume', value)
def on_bus_message(self, bus, message):
if message.type is Gst.MessageType.STATE_CHANGED:
old, new, pending = message.parse_state_changed()
if pending is Gst.State.VOID_PENDING:
self.emit("state-changed", new)
else:
print(repr(message.type))
def main():
import sys
def on_state_changed(lomo, state):
print("State changed to", repr(state))
Gst.init([])
lomo = LomoPlayer()
lomo.connect('state-changed', on_state_changed)
for x in sys.argv[1:]:
lomo.add(x)
lomo.play()
GLib.MainLoop().run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment