Last active
May 4, 2022 17:22
-
-
Save marcusmueller/71c370fdf1ed74e48d5bc5c928ef0f81 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
options: | |
parameters: | |
author: "Marcus M\xFCller" | |
catch_exceptions: 'True' | |
category: '[GRC Hier Blocks]' | |
cmake_opt: '' | |
comment: '' | |
copyright: '2022' | |
description: '' | |
gen_cmake: 'On' | |
gen_linking: dynamic | |
generate_options: no_gui | |
hier_block_src_path: '.:' | |
id: audio_detector | |
max_nouts: '0' | |
output_language: python | |
placement: (0,0) | |
qt_qss_theme: '' | |
realtime_scheduling: '' | |
run: 'True' | |
run_command: '{python} -u {filename}' | |
run_options: prompt | |
sizing_mode: fixed | |
thread_safe_setters: '' | |
title: Audio Activity Detector | |
window_size: (1000,1000) | |
states: | |
bus_sink: false | |
bus_source: false | |
bus_structure: null | |
coordinate: [8, 8] | |
rotation: 0 | |
state: enabled | |
blocks: | |
- name: samp_rate | |
id: variable | |
parameters: | |
comment: "sampling rate\n\nShould be \nsupported by audio\nsystem. 48000 is\n\ | |
generally a good\nchoice." | |
value: 48e3 | |
states: | |
bus_sink: false | |
bus_source: false | |
bus_structure: null | |
coordinate: [200, 12.0] | |
rotation: 0 | |
state: enabled | |
- name: audio_source_0 | |
id: audio_source | |
parameters: | |
affinity: '' | |
alias: '' | |
comment: 'Get samples from | |
sound system | |
(i.e., microphone)' | |
device_name: '' | |
maxoutbuf: '0' | |
minoutbuf: '0' | |
num_outputs: '1' | |
ok_to_block: 'True' | |
samp_rate: int(samp_rate) | |
states: | |
bus_sink: false | |
bus_source: false | |
bus_structure: null | |
coordinate: [40, 260.0] | |
rotation: 0 | |
state: true | |
- name: blocks_multiply_xx_0 | |
id: blocks_multiply_xx | |
parameters: | |
affinity: '' | |
alias: '' | |
comment: 'square the samples | |
(convert to power) | |
(uncalibrated!)' | |
maxoutbuf: '0' | |
minoutbuf: '0' | |
num_inputs: '2' | |
type: float | |
vlen: '1' | |
states: | |
bus_sink: false | |
bus_source: false | |
bus_structure: null | |
coordinate: [312, 248.0] | |
rotation: 0 | |
state: true | |
- name: blocks_threshold_ff_0 | |
id: blocks_threshold_ff | |
parameters: | |
affinity: '' | |
alias: '' | |
comment: "Convert values between\n0 and 1 to either 0 or 1, \ndepending on whether\n\ | |
they are below or above\nsome level." | |
high: level | |
init: '0' | |
low: level/2 | |
maxoutbuf: '0' | |
minoutbuf: '0' | |
states: | |
bus_sink: false | |
bus_source: false | |
bus_structure: null | |
coordinate: [848, 244.0] | |
rotation: 0 | |
state: true | |
- name: command | |
id: parameter | |
parameters: | |
alias: '' | |
comment: "The command to \nbe executed upon\nsignal detection" | |
hide: none | |
label: Command to be executed | |
short_id: c | |
type: str | |
value: /bin/true | |
states: | |
bus_sink: false | |
bus_source: false | |
bus_structure: null | |
coordinate: [544, 12.0] | |
rotation: 0 | |
state: true | |
- name: cooldown | |
id: parameter | |
parameters: | |
alias: '' | |
comment: "The time until the detector is \n\"re-armed\"" | |
hide: none | |
label: Cooldown Time [s] | |
short_id: T | |
type: eng_float | |
value: '1.0' | |
states: | |
bus_sink: false | |
bus_source: false | |
bus_structure: null | |
coordinate: [776, 12.0] | |
rotation: 0 | |
state: true | |
- name: epy_block_0 | |
id: epy_block | |
parameters: | |
_source_code: "\"\"\"\nAn activity detector that executes external programs and\ | |
\ has a cooldown period\n\"\"\"\n\nimport numpy as np\nfrom gnuradio import\ | |
\ gr\nimport shlex\nimport subprocess\n\n\nclass blk(gr.sync_block\n \ | |
\ ): # other base classes are basic_block, decim_block, interp_block\n \ | |
\ \"\"\"A simple command executor with a cooldown period\"\"\"\n def __init__(self,\n\ | |
\ command=\"touch /tmp/it_happened\",\n sampling_rate=1.0,\n\ | |
\ cooldown=1.0): # only default arguments here\n \"\"\ | |
\"arguments to this function show up as parameters in GRC\"\"\"\n gr.sync_block.__init__(\n\ | |
\ self,\n name='Embedded Python Block', # will show up\ | |
\ in GRC\n in_sig=[np.float32],\n out_sig=[])\n \ | |
\ self.cooldown_samples = int(cooldown * sampling_rate)\n self.cooldown_left\ | |
\ = 0\n if not command:\n self.command = None\n else:\n\ | |
\ self.command = shlex.split(command)\n\n print(f\"Set up\ | |
\ for a cooldown of {self.cooldown_samples:d} samples\")\n\n def work(self,\ | |
\ input_items, output_items):\n # Check whether we're still cooling down\ | |
\ from the last activation\n if self.cooldown_left > 0:\n \ | |
\ # OK, then just consume as much input as we have, up to the full remainder\ | |
\ of the cooldown period\n to_consume = min(self.cooldown_left, len(input_items[0]))\n\ | |
\ self.cooldown_left -= to_consume\n return to_consume\n\ | |
\n # we don't have any cooldown left, so let's look for a value > 0.5\n\ | |
\ index = np.argwhere(input_items[0] > 0.5)\n if len(index) >\ | |
\ 0:\n first = index[0]\n self.cooldown_left = self.cooldown_samples\n\ | |
\ print(\"activated\")\n if self.command:\n \ | |
\ subprocess.Popen(self.command)\n return first\n # We're\ | |
\ neither in cooldown, nor have we detected any high values \u2013 just consume\ | |
\ all data\n return len(input_items[0])\n" | |
affinity: '' | |
alias: '' | |
command: command | |
comment: "The block with the logic.\n\nThis calls the command specified\nunder\ | |
\ \"Command\", but only if it\nhasn't happened within the last\n \"cooldown\"\ | |
\ seconds." | |
cooldown: cooldown | |
maxoutbuf: '0' | |
minoutbuf: '0' | |
sampling_rate: '2000' | |
states: | |
_io_cache: ('Embedded Python Block', 'blk', [('command', "'touch /tmp/it_happened'"), | |
('sampling_rate', '1.0'), ('cooldown', '1.0')], [('0', 'float', 1)], [], 'A | |
simple command executor with a cooldown period', ['command']) | |
bus_sink: false | |
bus_source: false | |
bus_structure: null | |
coordinate: [1040, 244.0] | |
rotation: 0 | |
state: true | |
- name: level | |
id: parameter | |
parameters: | |
alias: '' | |
comment: 'Sets the detection threshold. | |
Range: 0.0 to 1.0 | |
Adjust experimentally!' | |
hide: none | |
label: Threshold Level | |
short_id: l | |
type: eng_float | |
value: '0.5' | |
states: | |
bus_sink: false | |
bus_source: false | |
bus_structure: null | |
coordinate: [344, 12.0] | |
rotation: 0 | |
state: true | |
- name: low_pass_filter_0 | |
id: low_pass_filter | |
parameters: | |
affinity: '' | |
alias: '' | |
beta: '6.76' | |
comment: "Filter the power signal:\n1. cut off any energy above 1000 Hz\n (loudness\ | |
\ shouldn't change *that* fast),\n really make sure nothing at 2000 Hz or\n\ | |
\ above makes it through\n2. reduce the sampling rate to 2000 Hz:\n for\ | |
\ every 24 input power values we get\n one \"smoothed\" output value" | |
cutoff_freq: '1000' | |
decim: int(samp_rate/2000) | |
gain: '1' | |
interp: '1' | |
maxoutbuf: '0' | |
minoutbuf: '0' | |
samp_rate: samp_rate | |
type: fir_filter_fff | |
width: '1000' | |
win: window.WIN_HAMMING | |
states: | |
bus_sink: false | |
bus_source: false | |
bus_structure: null | |
coordinate: [536, 212.0] | |
rotation: 0 | |
state: true | |
connections: | |
- [audio_source_0, '0', blocks_multiply_xx_0, '0'] | |
- [audio_source_0, '0', blocks_multiply_xx_0, '1'] | |
- [blocks_multiply_xx_0, '0', low_pass_filter_0, '0'] | |
- [blocks_threshold_ff_0, '0', epy_block_0, '0'] | |
- [low_pass_filter_0, '0', blocks_threshold_ff_0, '0'] | |
metadata: | |
file_format: 1 | |
grc_version: v3.11.0.0git-70-gf44384ae |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# | |
# SPDX-License-Identifier: GPL-3.0 | |
# | |
# GNU Radio Python Flow Graph | |
# Title: Audio Activity Detector | |
# Author: Marcus Müller | |
# Copyright: 2022 | |
# GNU Radio version: v3.11.0.0git-70-gf44384ae | |
from packaging.version import Version as StrictVersion | |
if __name__ == '__main__': | |
import ctypes | |
import sys | |
if sys.platform.startswith('linux'): | |
try: | |
x11 = ctypes.cdll.LoadLibrary('libX11.so') | |
x11.XInitThreads() | |
except: | |
print("Warning: failed to XInitThreads()") | |
from PyQt5 import Qt | |
from gnuradio import qtgui | |
from gnuradio.filter import firdes | |
import sip | |
from gnuradio import audio | |
from gnuradio import blocks | |
from gnuradio import filter | |
from gnuradio import gr | |
from gnuradio.fft import window | |
import sys | |
import signal | |
from argparse import ArgumentParser | |
from gnuradio.eng_arg import eng_float, intx | |
from gnuradio import eng_notation | |
from gnuradio.qtgui import Range, RangeWidget | |
from PyQt5 import QtCore | |
import audio_detector_epy_block_0 as epy_block_0 # embedded python block | |
from gnuradio import qtgui | |
class audio_detector(gr.top_block, Qt.QWidget): | |
def __init__(self): | |
gr.top_block.__init__(self, "Audio Activity Detector", catch_exceptions=True) | |
Qt.QWidget.__init__(self) | |
self.setWindowTitle("Audio Activity Detector") | |
qtgui.util.check_set_qss() | |
try: | |
self.setWindowIcon(Qt.QIcon.fromTheme('gnuradio-grc')) | |
except: | |
pass | |
self.top_scroll_layout = Qt.QVBoxLayout() | |
self.setLayout(self.top_scroll_layout) | |
self.top_scroll = Qt.QScrollArea() | |
self.top_scroll.setFrameStyle(Qt.QFrame.NoFrame) | |
self.top_scroll_layout.addWidget(self.top_scroll) | |
self.top_scroll.setWidgetResizable(True) | |
self.top_widget = Qt.QWidget() | |
self.top_scroll.setWidget(self.top_widget) | |
self.top_layout = Qt.QVBoxLayout(self.top_widget) | |
self.top_grid_layout = Qt.QGridLayout() | |
self.top_layout.addLayout(self.top_grid_layout) | |
self.settings = Qt.QSettings("GNU Radio", "audio_detector") | |
try: | |
if StrictVersion(Qt.qVersion()) < StrictVersion("5.0.0"): | |
self.restoreGeometry(self.settings.value("geometry").toByteArray()) | |
else: | |
self.restoreGeometry(self.settings.value("geometry")) | |
except: | |
pass | |
################################################## | |
# Variables | |
################################################## | |
self.samp_rate = samp_rate = 48e3 | |
self.level = level = 0.5 | |
################################################## | |
# Blocks | |
################################################## | |
self._level_range = Range(0, 1, 1e-2, 0.5, 200) | |
self._level_win = RangeWidget(self._level_range, self.set_level, "Loudness level", "counter_slider", float, QtCore.Qt.Horizontal) | |
self.top_layout.addWidget(self._level_win) | |
self.qtgui_time_sink_x_0 = qtgui.time_sink_f( | |
1024, #size | |
samp_rate, #samp_rate | |
"", #name | |
1, #number of inputs | |
None # parent | |
) | |
self.qtgui_time_sink_x_0.set_update_time(0.10) | |
self.qtgui_time_sink_x_0.set_y_axis(-1, 1) | |
self.qtgui_time_sink_x_0.set_y_label('Amplitude', "") | |
self.qtgui_time_sink_x_0.enable_tags(True) | |
self.qtgui_time_sink_x_0.set_trigger_mode(qtgui.TRIG_MODE_FREE, qtgui.TRIG_SLOPE_POS, 0.0, 0, 0, "") | |
self.qtgui_time_sink_x_0.enable_autoscale(False) | |
self.qtgui_time_sink_x_0.enable_grid(False) | |
self.qtgui_time_sink_x_0.enable_axis_labels(True) | |
self.qtgui_time_sink_x_0.enable_control_panel(False) | |
self.qtgui_time_sink_x_0.enable_stem_plot(False) | |
labels = ['Signal 1', 'Signal 2', 'Signal 3', 'Signal 4', 'Signal 5', | |
'Signal 6', 'Signal 7', 'Signal 8', 'Signal 9', 'Signal 10'] | |
widths = [1, 1, 1, 1, 1, | |
1, 1, 1, 1, 1] | |
colors = ['blue', 'red', 'green', 'black', 'cyan', | |
'magenta', 'yellow', 'dark red', 'dark green', 'dark blue'] | |
alphas = [1.0, 1.0, 1.0, 1.0, 1.0, | |
1.0, 1.0, 1.0, 1.0, 1.0] | |
styles = [1, 1, 1, 1, 1, | |
1, 1, 1, 1, 1] | |
markers = [-1, -1, -1, -1, -1, | |
-1, -1, -1, -1, -1] | |
for i in range(1): | |
if len(labels[i]) == 0: | |
self.qtgui_time_sink_x_0.set_line_label(i, "Data {0}".format(i)) | |
else: | |
self.qtgui_time_sink_x_0.set_line_label(i, labels[i]) | |
self.qtgui_time_sink_x_0.set_line_width(i, widths[i]) | |
self.qtgui_time_sink_x_0.set_line_color(i, colors[i]) | |
self.qtgui_time_sink_x_0.set_line_style(i, styles[i]) | |
self.qtgui_time_sink_x_0.set_line_marker(i, markers[i]) | |
self.qtgui_time_sink_x_0.set_line_alpha(i, alphas[i]) | |
self._qtgui_time_sink_x_0_win = sip.wrapinstance(self.qtgui_time_sink_x_0.qwidget(), Qt.QWidget) | |
self.top_layout.addWidget(self._qtgui_time_sink_x_0_win) | |
self.low_pass_filter_0 = filter.fir_filter_fff( | |
int(samp_rate/2000), | |
firdes.low_pass( | |
1, | |
samp_rate, | |
1000, | |
1000, | |
window.WIN_HAMMING, | |
6.76)) | |
self.epy_block_0 = epy_block_0.blk(command=None, sampling_rate=2000, cooldown=2) | |
self.blocks_threshold_ff_0 = blocks.threshold_ff(level/2, level, 0) | |
self.blocks_multiply_xx_0 = blocks.multiply_vff(1) | |
self.audio_source_0 = audio.source(int(samp_rate), '', True) | |
################################################## | |
# Connections | |
################################################## | |
self.connect((self.audio_source_0, 0), (self.blocks_multiply_xx_0, 0)) | |
self.connect((self.audio_source_0, 0), (self.blocks_multiply_xx_0, 1)) | |
self.connect((self.blocks_multiply_xx_0, 0), (self.low_pass_filter_0, 0)) | |
self.connect((self.blocks_threshold_ff_0, 0), (self.epy_block_0, 0)) | |
self.connect((self.blocks_threshold_ff_0, 0), (self.qtgui_time_sink_x_0, 0)) | |
self.connect((self.low_pass_filter_0, 0), (self.blocks_threshold_ff_0, 0)) | |
def closeEvent(self, event): | |
self.settings = Qt.QSettings("GNU Radio", "audio_detector") | |
self.settings.setValue("geometry", self.saveGeometry()) | |
self.stop() | |
self.wait() | |
event.accept() | |
def get_samp_rate(self): | |
return self.samp_rate | |
def set_samp_rate(self, samp_rate): | |
self.samp_rate = samp_rate | |
self.low_pass_filter_0.set_taps(firdes.low_pass(1, self.samp_rate, 1000, 1000, window.WIN_HAMMING, 6.76)) | |
self.qtgui_time_sink_x_0.set_samp_rate(self.samp_rate) | |
def get_level(self): | |
return self.level | |
def set_level(self, level): | |
self.level = level | |
self.blocks_threshold_ff_0.set_hi(self.level) | |
self.blocks_threshold_ff_0.set_lo(self.level/2) | |
def main(top_block_cls=audio_detector, options=None): | |
if StrictVersion("4.5.0") <= StrictVersion(Qt.qVersion()) < StrictVersion("5.0.0"): | |
style = gr.prefs().get_string('qtgui', 'style', 'raster') | |
Qt.QApplication.setGraphicsSystem(style) | |
qapp = Qt.QApplication(sys.argv) | |
tb = top_block_cls() | |
tb.start() | |
tb.show() | |
def sig_handler(sig=None, frame=None): | |
tb.stop() | |
tb.wait() | |
Qt.QApplication.quit() | |
signal.signal(signal.SIGINT, sig_handler) | |
signal.signal(signal.SIGTERM, sig_handler) | |
timer = Qt.QTimer() | |
timer.start(500) | |
timer.timeout.connect(lambda: None) | |
qapp.exec_() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
An activity detector that executes external programs and has a cooldown period | |
""" | |
import numpy as np | |
from gnuradio import gr | |
import shlex | |
import subprocess | |
class blk(gr.sync_block | |
): # other base classes are basic_block, decim_block, interp_block | |
"""A simple command executor with a cooldown period""" | |
def __init__(self, | |
command="touch /tmp/it_happened", | |
sampling_rate=1.0, | |
cooldown=1.0): # only default arguments here | |
"""arguments to this function show up as parameters in GRC""" | |
gr.sync_block.__init__( | |
self, | |
name='Embedded Python Block', # will show up in GRC | |
in_sig=[np.float32], | |
out_sig=[]) | |
self.cooldown_samples = int(cooldown * sampling_rate) | |
self.cooldown_left = 0 | |
if not command: | |
self.command = None | |
else: | |
self.command = shlex.split(command) | |
print(f"Set up for a cooldown of {self.cooldown_samples:d} samples") | |
def work(self, input_items, output_items): | |
# Check whether we're still cooling down from the last activation | |
if self.cooldown_left > 0: | |
# OK, then just consume as much input as we have, up to the full remainder of the cooldown period | |
to_consume = min(self.cooldown_left, len(input_items[0])) | |
self.cooldown_left -= to_consume | |
return to_consume | |
# we don't have any cooldown left, so let's look for a value > 0.5 | |
index = np.argwhere(input_items[0] > 0.5) | |
if len(index) > 0: | |
first = index[0] | |
self.cooldown_left = self.cooldown_samples | |
print("activated") | |
if self.command: | |
subprocess.Popen(self.command) | |
return first | |
# We're neither in cooldown, nor have we detected any high values – just consume all data | |
return len(input_items[0]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
An activity detector that executes external programs and has a cooldown period | |
""" | |
import numpy as np | |
from gnuradio import gr | |
import shlex | |
import subprocess | |
class blk(gr.sync_block | |
): # other base classes are basic_block, decim_block, interp_block | |
"""A simple command executor with a cooldown period""" | |
def __init__(self, | |
command="touch /tmp/it_happened", | |
sampling_rate=1.0, | |
cooldown=1.0): # only default arguments here | |
"""arguments to this function show up as parameters in GRC""" | |
gr.sync_block.__init__( | |
self, | |
name='Embedded Python Block', # will show up in GRC | |
in_sig=[np.float32], | |
out_sig=[]) | |
self.cooldown_samples = int(cooldown * sampling_rate) | |
self.cooldown_left = 0 | |
if not command: | |
self.command = None | |
else: | |
self.command = shlex.split(command) | |
print(f"Set up for a cooldown of {self.cooldown_samples:d} samples") | |
def work(self, input_items, output_items): | |
# Check whether we're still cooling down from the last activation | |
if self.cooldown_left > 0: | |
# OK, then just consume as much input as we have, up to the full remainder of the cooldown period | |
to_consume = min(self.cooldown_left, len(input_items[0])) | |
self.cooldown_left -= to_consume | |
return to_consume | |
# we don't have any cooldown left, so let's look for a value > 0.5 | |
index = np.argwhere(input_items[0] > 0.5) | |
if len(index) > 0: | |
first = index[0] | |
self.cooldown_left = self.cooldown_samples | |
print("activated") | |
if self.command: | |
subprocess.Popen(self.command) | |
return first | |
# We're neither in cooldown, nor have we detected any high values – just consume all data | |
return len(input_items[0]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment