Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Draws the envelop (waveform) of a track on a Gtk.DrawingArea (vertically), uses PyGI and GStreamer
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Sources:
#
# https://stackoverflow.com/questions/9344888/getting-max-amplitude-for-an-audio-file-per-second
# https://git.gnome.org/browse/pitivi/tree/pitivi/timeline/previewers.py
# http://gstreamer.freedesktop.org/data/doc/gstreamer/head/gst-plugins-base-plugins/html/gst-plugins-base-plugins-audioconvert.html
# http://gstreamer.freedesktop.org/data/doc/gstreamer/head/gst-plugins-good-plugins/html/gst-plugins-good-plugins-level.html
# https://git.gnome.org/browse/pitivi/tree/pitivi/coptimizations/renderer.c
#
# https://git.gnome.org/browse/banshee/tree/src/Backends/Banshee.GStreamer/libbanshee/banshee-bpmdetector.c
from gi.repository import Gtk, Gst, GLib, GObject
import cairo
# Code for BPM analysis is left for possible further use but at the moment
# the information it provides is useless and the filter is unstable.
class SoundTrackOverlay(GObject.GObject):
__gsignals__ = {
'sampling_finished': (GObject.SIGNAL_RUN_FIRST, None, ()),
'title_found': (GObject.SIGNAL_RUN_FIRST, None, (str,)),
'duration_found': (
GObject.SIGNAL_RUN_FIRST, None, (GObject.TYPE_UINT64,)
),
'error': (GObject.SIGNAL_RUN_FIRST, None, (str,))
}
def __init__(self, drawingarea):
"""
Draws the envelop (waveform) of a sound track on GtkDrawingArea.
The sampling is done using GStreamer's level filter, then they are
normalised and plotted vertically on an internal buffer. The
drawing area is then updated from this buffer.
TODO: Follow progress of track playback.
TODO: Allow change of source file.
`drawingarea`: a GtkDrawingArea to paint on.
`fileuri`: URI to audio file to be rendered.
"""
super().__init__()
# Holds Peak samples
self.samples = []
## Holds detected beat per minute
#self.bpms = []
# Indicates when file is finished to be sampled + other state variables
self.is_sampled = False
self.track_duration = None
self.read_position = 0
self.track_title = None
# Internal buffer (placeholder)
self.surface = None
# Input parameters
self.fileuri = None
self.drawingarea = drawingarea
# Connect to drawing area events
self.drawingarea.connect("draw", self.on_drawingarea_draw)
self.drawingarea.connect("configure-event", self.on_drawingarea_changed)
# Setup empty surface
self._setup_surface()
def load(self, fileuri):
self.fileuri = fileuri
self._setup_pipeline()
def _setup_surface(self):
"""
Recreate internal image buffer according to new drawing area
dimensions.
"""
if self.surface:
# Nicely close previous cairo surface
self.surface.finish()
self.height = self.drawingarea.get_allocated_height()
self.width = self.drawingarea.get_allocated_width()
self.surface = cairo.ImageSurface(
cairo.FORMAT_ARGB32,
self.width,
self.height
)
self._draw_surface()
def _setup_pipeline(self):
"""
Prepare GStreamer pipeline for sampling
"""
# Merge decoded audio into one channel and samples different
# characteristic values each 10ms
self.pipeline = Gst.parse_launch(
'uridecodebin uri="{fileuri}" ! audioconvert ! '
'audio/x-raw,channels=1 !'
'level name=level interval=10000000 post-messages=true ! '
#'bpmdetect name=bpmdetect ! '
'fakesink qos=false name=faked"'.format(fileuri=self.fileuri)
)
self.level_element = self.pipeline.get_by_name('level')
#self.bpmdetect_element = self.pipeline.get_by_name('bpmdetect')
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self.on_bus_message)
# Start sampling
self.pipeline.set_state(Gst.State.PLAYING)
def on_bus_message(self, bus, message):
"""
Catch events on the GStreamer pipeline:
- Levels events (samples)
- End of stream
- Errors
"""
# Samples from level element
if message.src is self.level_element:
structure = message.get_structure()
if structure:
peakdBs = structure.get_value("peak")
if peakdBs:
peakdBchan0 = peakdBs[0]
# Convert from dB to linear
peak = 10 ** (peakdBchan0 / 20)
self.samples.append(peak)
return
## Retrieve BPM information when they are emitted
#if message.type == Gst.MessageType.TAG:
# tag_list = message.parse_tag()
# if tag_list:
# # Tag processing is done with a foreach function
# def on_tag(tag_list, tag, data=None):
# if tag == "beats-per-minute":
# bpm = tag_list.get_value_index(tag, 0)
# self.bpms.append(bpm)
# tag_list.foreach(on_tag, None)
# return
# Retrieve tags information when they are emitted
if message.type == Gst.MessageType.TAG:
tag_list = message.parse_tag()
if tag_list:
# Tag processing is done with a foreach function
def on_tag(tag_list, tag, data=None):
if tag == "title":
title = tag_list.get_value_index(tag, 0)
self.track_title = title
self.emit("title_found", title)
elif tag == "duration":
duration = tag_list.get_value_index(tag, 0)
self.track_duration = duration
self.emit("duration_found", duration)
#elif tag == "beats-per-minute":
# bpm = tag_list.get_value_index(tag, 0)
# self.bpms.append(bpm)
tag_list.foreach(on_tag, None)
return
# End of stream
if message.type == Gst.MessageType.EOS:
# Report duration (position at end of stream)
success, duration = self.pipeline.query_position(Gst.Format.TIME)
if success:
self.track_duration = duration
self.emit("duration_found", duration)
# Stop pipeline
self.pipeline.set_state(Gst.State.NULL)
self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
# Start processing of collected samples
self.is_sampled = True
self._normalise()
# Draw on internal buffer
self._draw_surface()
# Invalidate the drawing area to force redraw on the widget
self.drawingarea.queue_draw()
# Signal end of processing
self.emit('sampling_finished')
return
# Error
if message.type == Gst.MessageType.ERROR:
error, debuginfo = message.parse_error()
self.pipeline.set_state(Gst.State.NULL)
self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
self.emit('error', error.message)
return
def _normalise(self):
"""
Normalise the samples
"""
# Do a ratio between 0 and the maximum value and get values between
# 0.0 and 1.0
max_ = max(self.samples)
self.samples = [ sample/max_ for sample in self.samples ]
def _draw_surface(self):
"""
Draw symetric waveform on internal buffer from collected samples
"""
# If not ready, do nothing
if not self.is_sampled:
return
# Initialisations
samples = self.samples
width = self.width
height = self.height
# Middle of the drawing
middle = width/2
# Half waveform amplitude:
# Leave some space (10px) from the borders of the area
halfwidth = (width-20)/2
# Pixel/sample ratio
length = len(samples)
pixelsPerSample = float(height) / float(length)
ctx = cairo.Context(self.surface)
# There are usually much more samples than pixels, so we will draw the
# mean value of the samples that are placed on the same pixel
currentPixel = 0.
samplesInAccum = 0
accum = 0.
x = 0. #< x is the current position in height, currentPixel is a helper
# The drawing is done from top to bottom for right side and from
# bottom to top for the left side. This produces one single closed
# polyline. Not sure if the double for loop can be avoided.
# Top to bottom right side, start from top middle.
ctx.move_to(middle, 0)
for sample in samples:
currentPixel += pixelsPerSample
samplesInAccum += 1
accum += sample
# We advanced of one pixel
if currentPixel > 1.0:
# Do the mean of samples, this gives a number 0<..<1
accum /= samplesInAccum
# Draw to computed amplitude from middle width
ctx.line_to(middle+(accum*halfwidth), x)
# Reset accumulators
accum = 0.
samplesInAccum = 0
currentPixel -= 1.0
# Advance a little bit on height axis
x += pixelsPerSample
# Reset
samplesInAccum = 0
accum = 0.
x = float(height)
# Bottom to top left side, start from bottom middle
ctx.line_to(middle, height)
for sample in reversed(samples):
currentPixel += pixelsPerSample
samplesInAccum += 1
accum += sample
# We advanced of one pixel
if currentPixel > 1.0:
# Do the mean of samples, this gives a number 0<..<1
accum /= samplesInAccum
# Draw to computed amplitude from middle width
ctx.line_to(middle-(accum*halfwidth), x)
# Reset accumulators
accum = 0.
samplesInAccum = 0
currentPixel -= 1.0
# Step back a little bit on height axis
x -= pixelsPerSample
ctx.close_path()
# Draw contour in dark green
ctx.set_source_rgb(0.31, 0.60, 0.02)
ctx.set_line_width(2.0)
ctx.stroke_preserve()
# Fill using light green gradient
gradient = cairo.LinearGradient(0, 0, width, 0)
gradient.add_color_stop_rgb(0.15, 0.45, 0.82, 0.09)
gradient.add_color_stop_rgb(0.45, 0.42, 0.76, 0.08)
gradient.add_color_stop_rgb(0.55, 0.42, 0.76, 0.08)
gradient.add_color_stop_rgb(0.85, 0.45, 0.82, 0.09)
ctx.set_source(gradient)
ctx.fill()
def on_drawingarea_draw(self, widget, ctx, data=None):
"""
When drawing area needs to be re-drawn, copy the internal buffer
to the drawing area surface.
"""
# ctx is already clipped to the invalidated region
# The invalidated region has been already reset to the default style's
# background color by the GTK machinery.
# If sampling is not finished, leave the region with the default
# background color (empty)
if not self.is_sampled:
return
# Copy internal buffer to drawing area surface, preserving the
# transparency of the source surface.
ctx.set_operator(cairo.OPERATOR_OVER)
ctx.set_source_surface(self.surface, 0, 0)
ctx.paint()
# Draw the cursor at current position
if self.track_duration is None:
xpos = 0
else:
xpos = int((self.read_position * self.height)/self.track_duration)
ctx.set_source_rgb(0.93, 0.83, 0.0)
ctx.set_line_width(1.0)
ctx.move_to(5, xpos+0.5)
ctx.line_to(self.height-5, xpos+0.5)
ctx.stroke()
ctx.set_source_rgb(0.77, 0.63, 0.0)
ctx.move_to(5, xpos+1.5)
ctx.line_to(self.height-5, xpos+1.5)
ctx.stroke()
def on_drawingarea_changed(self, widget, event, data=None):
"""
When configuration of the drawing area has changed (only size is
relevant for us), we need to redraw the envelop using the new
pixel per sample ratio.
"""
if event.width != self.width or event.height != self.height:
# Recreate internal buffer using new size and resolution
self._setup_surface()
# Forces the drwaing area to be updated (calls on_drawingarea_draw)
self.drawingarea.queue_draw()
def read_position_update(self, read_position):
# If the track duration is not known, do nothing special
if self.track_duration is None:
self.read_position = read_position
return
# Old position
xpos_old = int((self.read_position * self.height)/self.track_duration)
# New position
self.read_position = read_position
xpos_new = int((self.read_position * self.height)/self.track_duration)
# Fool proof the parameters
xpos_start = min(xpos_old, xpos_new)
xpos_end = max(xpos_old, xpos_new) + 1
height = xpos_end - xpos_start + 1
# Invalidate corresponding drawing area
self.drawingarea.queue_draw_area(0, xpos_start, self.width, height)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.