Skip to content

Instantly share code, notes, and snippets.

@kingosticks
Last active September 19, 2019 22:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kingosticks/db66ecd2a82ac0b36c5f707378a2912c to your computer and use it in GitHub Desktop.
Save kingosticks/db66ecd2a82ac0b36c5f707378a2912c to your computer and use it in GitHub Desktop.
File scanner using Gst discoverer
from __future__ import (
absolute_import, division, print_function, unicode_literals)
import collections
import logging
import time
from mopidy import exceptions
from mopidy.audio import tags as tags_lib, utils
from mopidy.internal import encoding, log
from mopidy.internal.gi import Gst, GstPbutils
# GST_ELEMENT_FACTORY_LIST:
_DECODER = 1 << 0
_AUDIO = 1 << 50
_DEMUXER = 1 << 5
_DEPAYLOADER = 1 << 8
_PARSER = 1 << 6
# GST_TYPE_AUTOPLUG_SELECT_RESULT:
_SELECT_TRY = 0
_SELECT_EXPOSE = 1
_Result = collections.namedtuple(
'Result', ('uri', 'tags', 'duration', 'seekable', 'mime', 'playable'))
logger = logging.getLogger(__name__)
def _trace(*args, **kwargs):
logger.log(log.TRACE_LOG_LEVEL, *args, **kwargs)
class Scanner2(object):
def __init__(self, timeout=1000, proxy_config=None):
self.discoverer = GstPbutils.Discoverer.new(timeout/1000 * Gst.SECOND)
def scan(self, uri, timeout=None):
tags, duration, seekable, mime, have_audio = {}, None, None, None, False
try:
result = self.discoverer.discover_uri(uri)
if result.get_result() == GstPbutils.DiscovererResult.MISSING_PLUGINS:
missing = result.get_missing_elements_installer_details()
logger.error('Missing plugins for %s: %s', uri, missing)
return _Result(uri, tags, duration, seekable, mime, have_audio)
if result.get_result() != GstPbutils.DiscovererResult.OK:
logger.error('Failed to scan %s', uri)
return _Result(uri, tags, duration, seekable, mime, have_audio)
tags = tags_lib.convert_taglist(result.get_tags())
duration = int(result.get_duration() // Gst.MSECOND)
seekable = result.get_seekable()
streams = result.get_audio_streams()
if len(streams) > 0:
mime = streams[0].get_caps().get_structure(0).get_name()
have_audio = True
except Exception as e:
logger.error('Failed to scan %s: %s', uri, e)
return _Result(uri, tags, duration, seekable, mime, have_audio)
# TODO: replace with a scan(uri, timeout=1000, proxy_config=None)?
class Scanner(object):
"""
Helper to get tags and other relevant info from URIs.
:param timeout: timeout for scanning a URI in ms
:param proxy_config: dictionary containing proxy config strings.
:type event: int
"""
def __init__(self, timeout=1000, proxy_config=None):
self._timeout_ms = int(timeout)
self._proxy_config = proxy_config or {}
def scan(self, uri, timeout=None):
"""
Scan the given uri collecting relevant metadata.
:param uri: URI of the resource to scan.
:type uri: string
:param timeout: timeout for scanning a URI in ms. Defaults to the
``timeout`` value used when creating the scanner.
:type timeout: int
:return: A named tuple containing
``(uri, tags, duration, seekable, mime)``.
``tags`` is a dictionary of lists for all the tags we found.
``duration`` is the length of the URI in milliseconds, or
:class:`None` if the URI has no duration. ``seekable`` is boolean.
indicating if a seek would succeed.
"""
timeout = int(timeout or self._timeout_ms)
tags, duration, seekable, mime = None, None, None, None
pipeline, signals = _setup_pipeline(uri, self._proxy_config)
try:
_start_pipeline(pipeline)
tags, mime, have_audio, duration = _process(pipeline, timeout)
seekable = _query_seekable(pipeline)
finally:
signals.clear()
pipeline.set_state(Gst.State.NULL)
del pipeline
return _Result(uri, tags, duration, seekable, mime, have_audio)
# Turns out it's _much_ faster to just create a new pipeline for every as
# decodebins and other elements don't seem to take well to being reused.
def _setup_pipeline(uri, proxy_config=None):
src = Gst.Element.make_from_uri(Gst.URIType.SRC, uri)
if not src:
raise exceptions.ScannerError('GStreamer can not open: %s' % uri)
if proxy_config:
utils.setup_proxy(src, proxy_config)
signals = utils.Signals()
pipeline = Gst.ElementFactory.make('pipeline')
pipeline.add(src)
if _has_src_pads(src):
_setup_decodebin(src, src.get_static_pad('src'), pipeline, signals)
elif _has_dynamic_src_pad(src):
signals.connect(src, 'pad-added', _setup_decodebin, pipeline, signals)
else:
raise exceptions.ScannerError('No pads found in source element.')
return pipeline, signals
def _has_src_pads(element):
pads = []
element.iterate_src_pads().foreach(pads.append)
return bool(pads)
def _has_dynamic_src_pad(element):
for template in element.get_pad_template_list():
if template.direction == Gst.PadDirection.SRC:
if template.presence == Gst.PadPresence.SOMETIMES:
return True
return False
def _setup_decodebin(element, pad, pipeline, signals):
typefind = Gst.ElementFactory.make('typefind')
decodebin = Gst.ElementFactory.make('decodebin')
for element in (typefind, decodebin):
pipeline.add(element)
element.sync_state_with_parent()
pad.link(typefind.get_static_pad('sink'))
typefind.link(decodebin)
signals.connect(typefind, 'have-type', _have_type, decodebin)
signals.connect(decodebin, 'pad-added', _pad_added, pipeline)
signals.connect(decodebin, 'autoplug-select', _autoplug_select)
def _have_type(element, probability, caps, decodebin):
decodebin.set_property('sink-caps', caps)
struct = Gst.Structure.new_empty('have-type')
struct.set_value('caps', caps.get_structure(0))
element.get_bus().post(Gst.Message.new_application(element, struct))
def _pad_added(element, pad, pipeline):
sink = Gst.ElementFactory.make('fakesink')
sink.set_property('sync', False)
pipeline.add(sink)
sink.sync_state_with_parent()
pad.link(sink.get_static_pad('sink'))
if pad.query_caps().is_subset(Gst.Caps.from_string('audio/x-raw')):
# Probably won't happen due to autoplug-select fix, but lets play it
# safe until we've tested more.
struct = Gst.Structure.new_empty('have-audio')
element.get_bus().post(Gst.Message.new_application(element, struct))
def _autoplug_select(element, pad, caps, factory):
if factory.list_is_type(_DECODER | _AUDIO):
struct = Gst.Structure.new_empty('have-audio')
element.get_bus().post(Gst.Message.new_application(element, struct))
if not factory.list_is_type(_DEMUXER | _DEPAYLOADER | _PARSER):
return _SELECT_EXPOSE
return _SELECT_TRY
def _start_pipeline(pipeline):
result = pipeline.set_state(Gst.State.PAUSED)
if result == Gst.StateChangeReturn.NO_PREROLL:
pipeline.set_state(Gst.State.PLAYING)
def _query_duration(pipeline):
success, duration = pipeline.query_duration(Gst.Format.TIME)
if not success:
duration = None # Make sure error case preserves None.
elif duration < 0:
duration = None # Stream without duration.
else:
duration = int(duration // Gst.MSECOND)
return success, duration
def _query_seekable(pipeline):
query = Gst.Query.new_seeking(Gst.Format.TIME)
pipeline.query(query)
return query.parse_seeking()[1]
def _process(pipeline, timeout_ms):
bus = pipeline.get_bus()
tags = {}
mime = None
have_audio = False
missing_message = None
duration = None
types = (
Gst.MessageType.ELEMENT |
Gst.MessageType.APPLICATION |
Gst.MessageType.ERROR |
Gst.MessageType.EOS |
Gst.MessageType.ASYNC_DONE |
Gst.MessageType.DURATION_CHANGED |
Gst.MessageType.TAG
)
timeout = timeout_ms
start = int(time.time() * 1000)
while timeout > 0:
msg = bus.timed_pop_filtered(timeout * Gst.MSECOND, types)
if msg is None:
break
if logger.isEnabledFor(log.TRACE_LOG_LEVEL) and msg.get_structure():
debug_text = msg.get_structure().to_string()
if len(debug_text) > 77:
debug_text = debug_text[:77] + '...'
_trace('element %s: %s', msg.src.get_name(), debug_text)
if msg.type == Gst.MessageType.ELEMENT:
if GstPbutils.is_missing_plugin_message(msg):
missing_message = msg
elif msg.type == Gst.MessageType.APPLICATION:
if msg.get_structure().get_name() == 'have-type':
mime = msg.get_structure().get_value('caps').get_name()
if mime and (
mime.startswith('text/') or mime == 'application/xml'):
return tags, mime, have_audio, duration
elif msg.get_structure().get_name() == 'have-audio':
have_audio = True
elif msg.type == Gst.MessageType.ERROR:
error = encoding.locale_decode(msg.parse_error()[0])
if missing_message and not mime:
caps = missing_message.get_structure().get_value('detail')
mime = caps.get_structure(0).get_name()
return tags, mime, have_audio, duration
raise exceptions.ScannerError(error)
elif msg.type == Gst.MessageType.EOS:
return tags, mime, have_audio, duration
elif msg.type == Gst.MessageType.ASYNC_DONE:
success, duration = _query_duration(pipeline)
if tags and success:
return tags, mime, have_audio, duration
# Don't try workaround for non-seekable sources such as mmssrc:
if not _query_seekable(pipeline):
return tags, mime, have_audio, duration
return tags, mime, have_audio, duration
# Workaround for upstream bug which causes tags/duration to arrive
# after pre-roll. We get around this by starting to play the track
# and then waiting for a duration change.
# https://bugzilla.gnome.org/show_bug.cgi?id=763553
logger.debug('Using workaround for duration missing before play.')
result = pipeline.set_state(Gst.State.PLAYING)
if result == Gst.StateChangeReturn.FAILURE:
return tags, mime, have_audio, duration
elif msg.type == Gst.MessageType.DURATION_CHANGED and tags:
# VBR formats sometimes seem to not have a duration by the time we
# go back to paused. So just try to get it right away.
success, duration = _query_duration(pipeline)
pipeline.set_state(Gst.State.PAUSED)
if success:
return tags, mime, have_audio, duration
elif msg.type == Gst.MessageType.TAG:
taglist = msg.parse_tag()
# Note that this will only keep the last tag.
tags.update(tags_lib.convert_taglist(taglist))
timeout = timeout_ms - (int(time.time() * 1000) - start)
raise exceptions.ScannerError('Timeout after %dms' % timeout_ms)
if __name__ == '__main__':
import os
import sys
from mopidy.internal import path
logging.basicConfig(format='%(asctime)-15s %(levelname)s %(message)s',
level=log.TRACE_LOG_LEVEL)
if sys.argv[1] == '2':
scanner = Scanner2(5000)
else:
scanner = Scanner(5000)
for uri in sys.argv[2:]:
if not Gst.uri_is_valid(uri):
uri = path.path_to_uri(os.path.abspath(uri))
try:
result = scanner.scan(uri)
for key in ('uri', 'mime', 'duration', 'playable', 'seekable'):
print('{:<20} {}'.format(key, getattr(result, key)))
print('tags')
for tag, value in result.tags.items():
line = '{:<20} {}'.format(tag, value)
if len(line) > 77:
line = line[:77] + '...'
print(line)
except exceptions.ScannerError as error:
print('{}: {}'.format(uri, error))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment