Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@marcusmueller
Last active May 4, 2022 17:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marcusmueller/71c370fdf1ed74e48d5bc5c928ef0f81 to your computer and use it in GitHub Desktop.
Save marcusmueller/71c370fdf1ed74e48d5bc5c928ef0f81 to your computer and use it in GitHub Desktop.
options:
parameters:
author: "Marcus M\xFCller"
catch_exceptions: 'True'
category: '[GRC Hier Blocks]'
cmake_opt: ''
comment: ''
copyright: '2022'
description: ''
gen_cmake: 'On'
gen_linking: dynamic
generate_options: no_gui
hier_block_src_path: '.:'
id: audio_detector
max_nouts: '0'
output_language: python
placement: (0,0)
qt_qss_theme: ''
realtime_scheduling: ''
run: 'True'
run_command: '{python} -u {filename}'
run_options: prompt
sizing_mode: fixed
thread_safe_setters: ''
title: Audio Activity Detector
window_size: (1000,1000)
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [8, 8]
rotation: 0
state: enabled
blocks:
- name: samp_rate
id: variable
parameters:
comment: "sampling rate\n\nShould be \nsupported by audio\nsystem. 48000 is\n\
generally a good\nchoice."
value: 48e3
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [200, 12.0]
rotation: 0
state: enabled
- name: audio_source_0
id: audio_source
parameters:
affinity: ''
alias: ''
comment: 'Get samples from
sound system
(i.e., microphone)'
device_name: ''
maxoutbuf: '0'
minoutbuf: '0'
num_outputs: '1'
ok_to_block: 'True'
samp_rate: int(samp_rate)
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [40, 260.0]
rotation: 0
state: true
- name: blocks_multiply_xx_0
id: blocks_multiply_xx
parameters:
affinity: ''
alias: ''
comment: 'square the samples
(convert to power)
(uncalibrated!)'
maxoutbuf: '0'
minoutbuf: '0'
num_inputs: '2'
type: float
vlen: '1'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [312, 248.0]
rotation: 0
state: true
- name: blocks_threshold_ff_0
id: blocks_threshold_ff
parameters:
affinity: ''
alias: ''
comment: "Convert values between\n0 and 1 to either 0 or 1, \ndepending on whether\n\
they are below or above\nsome level."
high: level
init: '0'
low: level/2
maxoutbuf: '0'
minoutbuf: '0'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [848, 244.0]
rotation: 0
state: true
- name: command
id: parameter
parameters:
alias: ''
comment: "The command to \nbe executed upon\nsignal detection"
hide: none
label: Command to be executed
short_id: c
type: str
value: /bin/true
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [544, 12.0]
rotation: 0
state: true
- name: cooldown
id: parameter
parameters:
alias: ''
comment: "The time until the detector is \n\"re-armed\""
hide: none
label: Cooldown Time [s]
short_id: T
type: eng_float
value: '1.0'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [776, 12.0]
rotation: 0
state: true
- name: epy_block_0
id: epy_block
parameters:
_source_code: "\"\"\"\nAn activity detector that executes external programs and\
\ has a cooldown period\n\"\"\"\n\nimport numpy as np\nfrom gnuradio import\
\ gr\nimport shlex\nimport subprocess\n\n\nclass blk(gr.sync_block\n \
\ ): # other base classes are basic_block, decim_block, interp_block\n \
\ \"\"\"A simple command executor with a cooldown period\"\"\"\n def __init__(self,\n\
\ command=\"touch /tmp/it_happened\",\n sampling_rate=1.0,\n\
\ cooldown=1.0): # only default arguments here\n \"\"\
\"arguments to this function show up as parameters in GRC\"\"\"\n gr.sync_block.__init__(\n\
\ self,\n name='Embedded Python Block', # will show up\
\ in GRC\n in_sig=[np.float32],\n out_sig=[])\n \
\ self.cooldown_samples = int(cooldown * sampling_rate)\n self.cooldown_left\
\ = 0\n if not command:\n self.command = None\n else:\n\
\ self.command = shlex.split(command)\n\n print(f\"Set up\
\ for a cooldown of {self.cooldown_samples:d} samples\")\n\n def work(self,\
\ input_items, output_items):\n # Check whether we're still cooling down\
\ from the last activation\n if self.cooldown_left > 0:\n \
\ # OK, then just consume as much input as we have, up to the full remainder\
\ of the cooldown period\n to_consume = min(self.cooldown_left, len(input_items[0]))\n\
\ self.cooldown_left -= to_consume\n return to_consume\n\
\n # we don't have any cooldown left, so let's look for a value > 0.5\n\
\ index = np.argwhere(input_items[0] > 0.5)\n if len(index) >\
\ 0:\n first = index[0]\n self.cooldown_left = self.cooldown_samples\n\
\ print(\"activated\")\n if self.command:\n \
\ subprocess.Popen(self.command)\n return first\n # We're\
\ neither in cooldown, nor have we detected any high values \u2013 just consume\
\ all data\n return len(input_items[0])\n"
affinity: ''
alias: ''
command: command
comment: "The block with the logic.\n\nThis calls the command specified\nunder\
\ \"Command\", but only if it\nhasn't happened within the last\n \"cooldown\"\
\ seconds."
cooldown: cooldown
maxoutbuf: '0'
minoutbuf: '0'
sampling_rate: '2000'
states:
_io_cache: ('Embedded Python Block', 'blk', [('command', "'touch /tmp/it_happened'"),
('sampling_rate', '1.0'), ('cooldown', '1.0')], [('0', 'float', 1)], [], 'A
simple command executor with a cooldown period', ['command'])
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [1040, 244.0]
rotation: 0
state: true
- name: level
id: parameter
parameters:
alias: ''
comment: 'Sets the detection threshold.
Range: 0.0 to 1.0
Adjust experimentally!'
hide: none
label: Threshold Level
short_id: l
type: eng_float
value: '0.5'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [344, 12.0]
rotation: 0
state: true
- name: low_pass_filter_0
id: low_pass_filter
parameters:
affinity: ''
alias: ''
beta: '6.76'
comment: "Filter the power signal:\n1. cut off any energy above 1000 Hz\n (loudness\
\ shouldn't change *that* fast),\n really make sure nothing at 2000 Hz or\n\
\ above makes it through\n2. reduce the sampling rate to 2000 Hz:\n for\
\ every 24 input power values we get\n one \"smoothed\" output value"
cutoff_freq: '1000'
decim: int(samp_rate/2000)
gain: '1'
interp: '1'
maxoutbuf: '0'
minoutbuf: '0'
samp_rate: samp_rate
type: fir_filter_fff
width: '1000'
win: window.WIN_HAMMING
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [536, 212.0]
rotation: 0
state: true
connections:
- [audio_source_0, '0', blocks_multiply_xx_0, '0']
- [audio_source_0, '0', blocks_multiply_xx_0, '1']
- [blocks_multiply_xx_0, '0', low_pass_filter_0, '0']
- [blocks_threshold_ff_0, '0', epy_block_0, '0']
- [low_pass_filter_0, '0', blocks_threshold_ff_0, '0']
metadata:
file_format: 1
grc_version: v3.11.0.0git-70-gf44384ae
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# SPDX-License-Identifier: GPL-3.0
#
# GNU Radio Python Flow Graph
# Title: Audio Activity Detector
# Author: Marcus Müller
# Copyright: 2022
# GNU Radio version: v3.11.0.0git-70-gf44384ae
from packaging.version import Version as StrictVersion
if __name__ == '__main__':
import ctypes
import sys
if sys.platform.startswith('linux'):
try:
x11 = ctypes.cdll.LoadLibrary('libX11.so')
x11.XInitThreads()
except:
print("Warning: failed to XInitThreads()")
from PyQt5 import Qt
from gnuradio import qtgui
from gnuradio.filter import firdes
import sip
from gnuradio import audio
from gnuradio import blocks
from gnuradio import filter
from gnuradio import gr
from gnuradio.fft import window
import sys
import signal
from argparse import ArgumentParser
from gnuradio.eng_arg import eng_float, intx
from gnuradio import eng_notation
from gnuradio.qtgui import Range, RangeWidget
from PyQt5 import QtCore
import audio_detector_epy_block_0 as epy_block_0 # embedded python block
from gnuradio import qtgui
class audio_detector(gr.top_block, Qt.QWidget):
def __init__(self):
gr.top_block.__init__(self, "Audio Activity Detector", catch_exceptions=True)
Qt.QWidget.__init__(self)
self.setWindowTitle("Audio Activity Detector")
qtgui.util.check_set_qss()
try:
self.setWindowIcon(Qt.QIcon.fromTheme('gnuradio-grc'))
except:
pass
self.top_scroll_layout = Qt.QVBoxLayout()
self.setLayout(self.top_scroll_layout)
self.top_scroll = Qt.QScrollArea()
self.top_scroll.setFrameStyle(Qt.QFrame.NoFrame)
self.top_scroll_layout.addWidget(self.top_scroll)
self.top_scroll.setWidgetResizable(True)
self.top_widget = Qt.QWidget()
self.top_scroll.setWidget(self.top_widget)
self.top_layout = Qt.QVBoxLayout(self.top_widget)
self.top_grid_layout = Qt.QGridLayout()
self.top_layout.addLayout(self.top_grid_layout)
self.settings = Qt.QSettings("GNU Radio", "audio_detector")
try:
if StrictVersion(Qt.qVersion()) < StrictVersion("5.0.0"):
self.restoreGeometry(self.settings.value("geometry").toByteArray())
else:
self.restoreGeometry(self.settings.value("geometry"))
except:
pass
##################################################
# Variables
##################################################
self.samp_rate = samp_rate = 48e3
self.level = level = 0.5
##################################################
# Blocks
##################################################
self._level_range = Range(0, 1, 1e-2, 0.5, 200)
self._level_win = RangeWidget(self._level_range, self.set_level, "Loudness level", "counter_slider", float, QtCore.Qt.Horizontal)
self.top_layout.addWidget(self._level_win)
self.qtgui_time_sink_x_0 = qtgui.time_sink_f(
1024, #size
samp_rate, #samp_rate
"", #name
1, #number of inputs
None # parent
)
self.qtgui_time_sink_x_0.set_update_time(0.10)
self.qtgui_time_sink_x_0.set_y_axis(-1, 1)
self.qtgui_time_sink_x_0.set_y_label('Amplitude', "")
self.qtgui_time_sink_x_0.enable_tags(True)
self.qtgui_time_sink_x_0.set_trigger_mode(qtgui.TRIG_MODE_FREE, qtgui.TRIG_SLOPE_POS, 0.0, 0, 0, "")
self.qtgui_time_sink_x_0.enable_autoscale(False)
self.qtgui_time_sink_x_0.enable_grid(False)
self.qtgui_time_sink_x_0.enable_axis_labels(True)
self.qtgui_time_sink_x_0.enable_control_panel(False)
self.qtgui_time_sink_x_0.enable_stem_plot(False)
labels = ['Signal 1', 'Signal 2', 'Signal 3', 'Signal 4', 'Signal 5',
'Signal 6', 'Signal 7', 'Signal 8', 'Signal 9', 'Signal 10']
widths = [1, 1, 1, 1, 1,
1, 1, 1, 1, 1]
colors = ['blue', 'red', 'green', 'black', 'cyan',
'magenta', 'yellow', 'dark red', 'dark green', 'dark blue']
alphas = [1.0, 1.0, 1.0, 1.0, 1.0,
1.0, 1.0, 1.0, 1.0, 1.0]
styles = [1, 1, 1, 1, 1,
1, 1, 1, 1, 1]
markers = [-1, -1, -1, -1, -1,
-1, -1, -1, -1, -1]
for i in range(1):
if len(labels[i]) == 0:
self.qtgui_time_sink_x_0.set_line_label(i, "Data {0}".format(i))
else:
self.qtgui_time_sink_x_0.set_line_label(i, labels[i])
self.qtgui_time_sink_x_0.set_line_width(i, widths[i])
self.qtgui_time_sink_x_0.set_line_color(i, colors[i])
self.qtgui_time_sink_x_0.set_line_style(i, styles[i])
self.qtgui_time_sink_x_0.set_line_marker(i, markers[i])
self.qtgui_time_sink_x_0.set_line_alpha(i, alphas[i])
self._qtgui_time_sink_x_0_win = sip.wrapinstance(self.qtgui_time_sink_x_0.qwidget(), Qt.QWidget)
self.top_layout.addWidget(self._qtgui_time_sink_x_0_win)
self.low_pass_filter_0 = filter.fir_filter_fff(
int(samp_rate/2000),
firdes.low_pass(
1,
samp_rate,
1000,
1000,
window.WIN_HAMMING,
6.76))
self.epy_block_0 = epy_block_0.blk(command=None, sampling_rate=2000, cooldown=2)
self.blocks_threshold_ff_0 = blocks.threshold_ff(level/2, level, 0)
self.blocks_multiply_xx_0 = blocks.multiply_vff(1)
self.audio_source_0 = audio.source(int(samp_rate), '', True)
##################################################
# Connections
##################################################
self.connect((self.audio_source_0, 0), (self.blocks_multiply_xx_0, 0))
self.connect((self.audio_source_0, 0), (self.blocks_multiply_xx_0, 1))
self.connect((self.blocks_multiply_xx_0, 0), (self.low_pass_filter_0, 0))
self.connect((self.blocks_threshold_ff_0, 0), (self.epy_block_0, 0))
self.connect((self.blocks_threshold_ff_0, 0), (self.qtgui_time_sink_x_0, 0))
self.connect((self.low_pass_filter_0, 0), (self.blocks_threshold_ff_0, 0))
def closeEvent(self, event):
self.settings = Qt.QSettings("GNU Radio", "audio_detector")
self.settings.setValue("geometry", self.saveGeometry())
self.stop()
self.wait()
event.accept()
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.low_pass_filter_0.set_taps(firdes.low_pass(1, self.samp_rate, 1000, 1000, window.WIN_HAMMING, 6.76))
self.qtgui_time_sink_x_0.set_samp_rate(self.samp_rate)
def get_level(self):
return self.level
def set_level(self, level):
self.level = level
self.blocks_threshold_ff_0.set_hi(self.level)
self.blocks_threshold_ff_0.set_lo(self.level/2)
def main(top_block_cls=audio_detector, options=None):
if StrictVersion("4.5.0") <= StrictVersion(Qt.qVersion()) < StrictVersion("5.0.0"):
style = gr.prefs().get_string('qtgui', 'style', 'raster')
Qt.QApplication.setGraphicsSystem(style)
qapp = Qt.QApplication(sys.argv)
tb = top_block_cls()
tb.start()
tb.show()
def sig_handler(sig=None, frame=None):
tb.stop()
tb.wait()
Qt.QApplication.quit()
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
timer = Qt.QTimer()
timer.start(500)
timer.timeout.connect(lambda: None)
qapp.exec_()
if __name__ == '__main__':
main()
"""
An activity detector that executes external programs and has a cooldown period
"""
import numpy as np
from gnuradio import gr
import shlex
import subprocess
class blk(gr.sync_block
): # other base classes are basic_block, decim_block, interp_block
"""A simple command executor with a cooldown period"""
def __init__(self,
command="touch /tmp/it_happened",
sampling_rate=1.0,
cooldown=1.0): # only default arguments here
"""arguments to this function show up as parameters in GRC"""
gr.sync_block.__init__(
self,
name='Embedded Python Block', # will show up in GRC
in_sig=[np.float32],
out_sig=[])
self.cooldown_samples = int(cooldown * sampling_rate)
self.cooldown_left = 0
if not command:
self.command = None
else:
self.command = shlex.split(command)
print(f"Set up for a cooldown of {self.cooldown_samples:d} samples")
def work(self, input_items, output_items):
# Check whether we're still cooling down from the last activation
if self.cooldown_left > 0:
# OK, then just consume as much input as we have, up to the full remainder of the cooldown period
to_consume = min(self.cooldown_left, len(input_items[0]))
self.cooldown_left -= to_consume
return to_consume
# we don't have any cooldown left, so let's look for a value > 0.5
index = np.argwhere(input_items[0] > 0.5)
if len(index) > 0:
first = index[0]
self.cooldown_left = self.cooldown_samples
print("activated")
if self.command:
subprocess.Popen(self.command)
return first
# We're neither in cooldown, nor have we detected any high values – just consume all data
return len(input_items[0])
"""
An activity detector that executes external programs and has a cooldown period
"""
import numpy as np
from gnuradio import gr
import shlex
import subprocess
class blk(gr.sync_block
): # other base classes are basic_block, decim_block, interp_block
"""A simple command executor with a cooldown period"""
def __init__(self,
command="touch /tmp/it_happened",
sampling_rate=1.0,
cooldown=1.0): # only default arguments here
"""arguments to this function show up as parameters in GRC"""
gr.sync_block.__init__(
self,
name='Embedded Python Block', # will show up in GRC
in_sig=[np.float32],
out_sig=[])
self.cooldown_samples = int(cooldown * sampling_rate)
self.cooldown_left = 0
if not command:
self.command = None
else:
self.command = shlex.split(command)
print(f"Set up for a cooldown of {self.cooldown_samples:d} samples")
def work(self, input_items, output_items):
# Check whether we're still cooling down from the last activation
if self.cooldown_left > 0:
# OK, then just consume as much input as we have, up to the full remainder of the cooldown period
to_consume = min(self.cooldown_left, len(input_items[0]))
self.cooldown_left -= to_consume
return to_consume
# we don't have any cooldown left, so let's look for a value > 0.5
index = np.argwhere(input_items[0] > 0.5)
if len(index) > 0:
first = index[0]
self.cooldown_left = self.cooldown_samples
print("activated")
if self.command:
subprocess.Popen(self.command)
return first
# We're neither in cooldown, nor have we detected any high values – just consume all data
return len(input_items[0])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment