Last active
December 19, 2021 11:39
-
-
Save Cilyan/3a9d0b43a29c6e37efbf to your computer and use it in GitHub Desktop.
Draws the envelop (waveform) of a track on a Gtk.DrawingArea (vertically), uses PyGI and GStreamer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Sources: | |
# | |
# https://stackoverflow.com/questions/9344888/getting-max-amplitude-for-an-audio-file-per-second | |
# https://git.gnome.org/browse/pitivi/tree/pitivi/timeline/previewers.py | |
# http://gstreamer.freedesktop.org/data/doc/gstreamer/head/gst-plugins-base-plugins/html/gst-plugins-base-plugins-audioconvert.html | |
# http://gstreamer.freedesktop.org/data/doc/gstreamer/head/gst-plugins-good-plugins/html/gst-plugins-good-plugins-level.html | |
# https://git.gnome.org/browse/pitivi/tree/pitivi/coptimizations/renderer.c | |
# | |
# https://git.gnome.org/browse/banshee/tree/src/Backends/Banshee.GStreamer/libbanshee/banshee-bpmdetector.c | |
from gi.repository import Gtk, Gst, GLib, GObject | |
import cairo | |
# Code for BPM analysis is left for possible further use but at the moment | |
# the information it provides is useless and the filter is unstable. | |
class SoundTrackOverlay(GObject.GObject): | |
__gsignals__ = { | |
'sampling_finished': (GObject.SIGNAL_RUN_FIRST, None, ()), | |
'title_found': (GObject.SIGNAL_RUN_FIRST, None, (str,)), | |
'duration_found': ( | |
GObject.SIGNAL_RUN_FIRST, None, (GObject.TYPE_UINT64,) | |
), | |
'error': (GObject.SIGNAL_RUN_FIRST, None, (str,)) | |
} | |
def __init__(self, drawingarea): | |
""" | |
Draws the envelop (waveform) of a sound track on GtkDrawingArea. | |
The sampling is done using GStreamer's level filter, then they are | |
normalised and plotted vertically on an internal buffer. The | |
drawing area is then updated from this buffer. | |
TODO: Follow progress of track playback. | |
TODO: Allow change of source file. | |
`drawingarea`: a GtkDrawingArea to paint on. | |
`fileuri`: URI to audio file to be rendered. | |
""" | |
super().__init__() | |
# Holds Peak samples | |
self.samples = [] | |
## Holds detected beat per minute | |
#self.bpms = [] | |
# Indicates when file is finished to be sampled + other state variables | |
self.is_sampled = False | |
self.track_duration = None | |
self.read_position = 0 | |
self.track_title = None | |
# Internal buffer (placeholder) | |
self.surface = None | |
# Input parameters | |
self.fileuri = None | |
self.drawingarea = drawingarea | |
# Connect to drawing area events | |
self.drawingarea.connect("draw", self.on_drawingarea_draw) | |
self.drawingarea.connect("configure-event", self.on_drawingarea_changed) | |
# Setup empty surface | |
self._setup_surface() | |
def load(self, fileuri): | |
self.fileuri = fileuri | |
self._setup_pipeline() | |
def _setup_surface(self): | |
""" | |
Recreate internal image buffer according to new drawing area | |
dimensions. | |
""" | |
if self.surface: | |
# Nicely close previous cairo surface | |
self.surface.finish() | |
self.height = self.drawingarea.get_allocated_height() | |
self.width = self.drawingarea.get_allocated_width() | |
self.surface = cairo.ImageSurface( | |
cairo.FORMAT_ARGB32, | |
self.width, | |
self.height | |
) | |
self._draw_surface() | |
def _setup_pipeline(self): | |
""" | |
Prepare GStreamer pipeline for sampling | |
""" | |
# Merge decoded audio into one channel and samples different | |
# characteristic values each 10ms | |
self.pipeline = Gst.parse_launch( | |
'uridecodebin uri="{fileuri}" ! audioconvert ! ' | |
'audio/x-raw,channels=1 !' | |
'level name=level interval=10000000 post-messages=true ! ' | |
#'bpmdetect name=bpmdetect ! ' | |
'fakesink qos=false name=faked"'.format(fileuri=self.fileuri) | |
) | |
self.level_element = self.pipeline.get_by_name('level') | |
#self.bpmdetect_element = self.pipeline.get_by_name('bpmdetect') | |
bus = self.pipeline.get_bus() | |
bus.add_signal_watch() | |
bus.connect("message", self.on_bus_message) | |
# Start sampling | |
self.pipeline.set_state(Gst.State.PLAYING) | |
def on_bus_message(self, bus, message): | |
""" | |
Catch events on the GStreamer pipeline: | |
- Levels events (samples) | |
- End of stream | |
- Errors | |
""" | |
# Samples from level element | |
if message.src is self.level_element: | |
structure = message.get_structure() | |
if structure: | |
peakdBs = structure.get_value("peak") | |
if peakdBs: | |
peakdBchan0 = peakdBs[0] | |
# Convert from dB to linear | |
peak = 10 ** (peakdBchan0 / 20) | |
self.samples.append(peak) | |
return | |
## Retrieve BPM information when they are emitted | |
#if message.type == Gst.MessageType.TAG: | |
# tag_list = message.parse_tag() | |
# if tag_list: | |
# # Tag processing is done with a foreach function | |
# def on_tag(tag_list, tag, data=None): | |
# if tag == "beats-per-minute": | |
# bpm = tag_list.get_value_index(tag, 0) | |
# self.bpms.append(bpm) | |
# tag_list.foreach(on_tag, None) | |
# return | |
# Retrieve tags information when they are emitted | |
if message.type == Gst.MessageType.TAG: | |
tag_list = message.parse_tag() | |
if tag_list: | |
# Tag processing is done with a foreach function | |
def on_tag(tag_list, tag, data=None): | |
if tag == "title": | |
title = tag_list.get_value_index(tag, 0) | |
self.track_title = title | |
self.emit("title_found", title) | |
elif tag == "duration": | |
duration = tag_list.get_value_index(tag, 0) | |
self.track_duration = duration | |
self.emit("duration_found", duration) | |
#elif tag == "beats-per-minute": | |
# bpm = tag_list.get_value_index(tag, 0) | |
# self.bpms.append(bpm) | |
tag_list.foreach(on_tag, None) | |
return | |
# End of stream | |
if message.type == Gst.MessageType.EOS: | |
# Report duration (position at end of stream) | |
success, duration = self.pipeline.query_position(Gst.Format.TIME) | |
if success: | |
self.track_duration = duration | |
self.emit("duration_found", duration) | |
# Stop pipeline | |
self.pipeline.set_state(Gst.State.NULL) | |
self.pipeline.get_state(Gst.CLOCK_TIME_NONE) | |
# Start processing of collected samples | |
self.is_sampled = True | |
self._normalise() | |
# Draw on internal buffer | |
self._draw_surface() | |
# Invalidate the drawing area to force redraw on the widget | |
self.drawingarea.queue_draw() | |
# Signal end of processing | |
self.emit('sampling_finished') | |
return | |
# Error | |
if message.type == Gst.MessageType.ERROR: | |
error, debuginfo = message.parse_error() | |
self.pipeline.set_state(Gst.State.NULL) | |
self.pipeline.get_state(Gst.CLOCK_TIME_NONE) | |
self.emit('error', error.message) | |
return | |
def _normalise(self): | |
""" | |
Normalise the samples | |
""" | |
# Do a ratio between 0 and the maximum value and get values between | |
# 0.0 and 1.0 | |
max_ = max(self.samples) | |
self.samples = [ sample/max_ for sample in self.samples ] | |
def _draw_surface(self): | |
""" | |
Draw symetric waveform on internal buffer from collected samples | |
""" | |
# If not ready, do nothing | |
if not self.is_sampled: | |
return | |
# Initialisations | |
samples = self.samples | |
width = self.width | |
height = self.height | |
# Middle of the drawing | |
middle = width/2 | |
# Half waveform amplitude: | |
# Leave some space (10px) from the borders of the area | |
halfwidth = (width-20)/2 | |
# Pixel/sample ratio | |
length = len(samples) | |
pixelsPerSample = float(height) / float(length) | |
ctx = cairo.Context(self.surface) | |
# There are usually much more samples than pixels, so we will draw the | |
# mean value of the samples that are placed on the same pixel | |
currentPixel = 0. | |
samplesInAccum = 0 | |
accum = 0. | |
x = 0. #< x is the current position in height, currentPixel is a helper | |
# The drawing is done from top to bottom for right side and from | |
# bottom to top for the left side. This produces one single closed | |
# polyline. Not sure if the double for loop can be avoided. | |
# Top to bottom right side, start from top middle. | |
ctx.move_to(middle, 0) | |
for sample in samples: | |
currentPixel += pixelsPerSample | |
samplesInAccum += 1 | |
accum += sample | |
# We advanced of one pixel | |
if currentPixel > 1.0: | |
# Do the mean of samples, this gives a number 0<..<1 | |
accum /= samplesInAccum | |
# Draw to computed amplitude from middle width | |
ctx.line_to(middle+(accum*halfwidth), x) | |
# Reset accumulators | |
accum = 0. | |
samplesInAccum = 0 | |
currentPixel -= 1.0 | |
# Advance a little bit on height axis | |
x += pixelsPerSample | |
# Reset | |
samplesInAccum = 0 | |
accum = 0. | |
x = float(height) | |
# Bottom to top left side, start from bottom middle | |
ctx.line_to(middle, height) | |
for sample in reversed(samples): | |
currentPixel += pixelsPerSample | |
samplesInAccum += 1 | |
accum += sample | |
# We advanced of one pixel | |
if currentPixel > 1.0: | |
# Do the mean of samples, this gives a number 0<..<1 | |
accum /= samplesInAccum | |
# Draw to computed amplitude from middle width | |
ctx.line_to(middle-(accum*halfwidth), x) | |
# Reset accumulators | |
accum = 0. | |
samplesInAccum = 0 | |
currentPixel -= 1.0 | |
# Step back a little bit on height axis | |
x -= pixelsPerSample | |
ctx.close_path() | |
# Draw contour in dark green | |
ctx.set_source_rgb(0.31, 0.60, 0.02) | |
ctx.set_line_width(2.0) | |
ctx.stroke_preserve() | |
# Fill using light green gradient | |
gradient = cairo.LinearGradient(0, 0, width, 0) | |
gradient.add_color_stop_rgb(0.15, 0.45, 0.82, 0.09) | |
gradient.add_color_stop_rgb(0.45, 0.42, 0.76, 0.08) | |
gradient.add_color_stop_rgb(0.55, 0.42, 0.76, 0.08) | |
gradient.add_color_stop_rgb(0.85, 0.45, 0.82, 0.09) | |
ctx.set_source(gradient) | |
ctx.fill() | |
def on_drawingarea_draw(self, widget, ctx, data=None): | |
""" | |
When drawing area needs to be re-drawn, copy the internal buffer | |
to the drawing area surface. | |
""" | |
# ctx is already clipped to the invalidated region | |
# The invalidated region has been already reset to the default style's | |
# background color by the GTK machinery. | |
# If sampling is not finished, leave the region with the default | |
# background color (empty) | |
if not self.is_sampled: | |
return | |
# Copy internal buffer to drawing area surface, preserving the | |
# transparency of the source surface. | |
ctx.set_operator(cairo.OPERATOR_OVER) | |
ctx.set_source_surface(self.surface, 0, 0) | |
ctx.paint() | |
# Draw the cursor at current position | |
if self.track_duration is None: | |
xpos = 0 | |
else: | |
xpos = int((self.read_position * self.height)/self.track_duration) | |
ctx.set_source_rgb(0.93, 0.83, 0.0) | |
ctx.set_line_width(1.0) | |
ctx.move_to(5, xpos+0.5) | |
ctx.line_to(self.height-5, xpos+0.5) | |
ctx.stroke() | |
ctx.set_source_rgb(0.77, 0.63, 0.0) | |
ctx.move_to(5, xpos+1.5) | |
ctx.line_to(self.height-5, xpos+1.5) | |
ctx.stroke() | |
def on_drawingarea_changed(self, widget, event, data=None): | |
""" | |
When configuration of the drawing area has changed (only size is | |
relevant for us), we need to redraw the envelop using the new | |
pixel per sample ratio. | |
""" | |
if event.width != self.width or event.height != self.height: | |
# Recreate internal buffer using new size and resolution | |
self._setup_surface() | |
# Forces the drwaing area to be updated (calls on_drawingarea_draw) | |
self.drawingarea.queue_draw() | |
def read_position_update(self, read_position): | |
# If the track duration is not known, do nothing special | |
if self.track_duration is None: | |
self.read_position = read_position | |
return | |
# Old position | |
xpos_old = int((self.read_position * self.height)/self.track_duration) | |
# New position | |
self.read_position = read_position | |
xpos_new = int((self.read_position * self.height)/self.track_duration) | |
# Fool proof the parameters | |
xpos_start = min(xpos_old, xpos_new) | |
xpos_end = max(xpos_old, xpos_new) + 1 | |
height = xpos_end - xpos_start + 1 | |
# Invalidate corresponding drawing area | |
self.drawingarea.queue_draw_area(0, xpos_start, self.width, height) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment