Skip to content

Instantly share code, notes, and snippets.

@jboone
Last active March 16, 2022 21:50
Show Gist options
  • Save jboone/836db0cbc8c5cce45ba8a9849c822567 to your computer and use it in GitHub Desktop.
Save jboone/836db0cbc8c5cce45ba8a9849c822567 to your computer and use it in GitHub Desktop.
Goertzel-based DTMF detection.
#!/usr/bin/env python3
import sys
import wave
import numpy
import scipy.signal
def detect_tone(x, f, fs, step_size):
# Apply Goertzel algorithm
# Use a window of a nice round multiple of the period of the frequency we're detecting.
n = 200 #int(round(fs / f * 16))
# Calculate coefficients to multiply the sample window against
w0 = 2 * numpy.pi * f / fs
coeffs = numpy.exp(
numpy.arange(n, dtype=numpy.complex64) * w0 * -1j
) / n
# Output array
output_len = len(x) // step_size
result = numpy.zeros((output_len,), dtype=numpy.float32)
# Step through samples, taking a window each step and applying the Goertzel coefficients.
for i in range(output_len):
chunk = x[i*step_size:i*step_size+n]
if len(chunk) == n:
result[i] = numpy.abs(numpy.dot(coeffs, chunk))
return result
# Expected WAV file sampling rate.
fs = 8000
# Time resolution of our detections. We do a detection at every `step` samples.
step_size = 20
# Read WAV file and check that code assumptions are true of the file.
wav = wave.open(sys.argv[1], mode='rb')
assert(wav.getparams()[:3] == (1, 2, fs))
assert(wav.getparams()[4] == 'NONE')
frames = wav.getparams()[3]
# Convert the WAV int16s into floats scaled to +/-1 full-scale.
samples_bytes = wav.readframes(frames)
samples = numpy.frombuffer(samples_bytes, dtype=numpy.int16)
samples = numpy.array(samples, dtype=numpy.float32)
samples /= 32768
# For each DTMF tone, do detection on the entire WAV file.
mag_map = {}
for f in (697, 770, 852, 941, 1209, 1336, 1477, 1633):
tone_mag = detect_tone(samples, f, fs, step_size)
assert(tone_mag.dtype == numpy.float32)
tone_mag.tofile('{:04d}.f32'.format(f))
mag_map[f] = tone_mag
# Grab the lengths of each detection magnitude array. They should all be the same length.
mag_lengths = [len(d) for d in mag_map.values()]
mag_length = mag_lengths[0]
# Across all frequencies in the low DTMF band, get the maximum magnitude and normalize.
# Then build a [n][4] array where the first dimension is time (in steps) and the
# second dimension is the magnitudes of the four frequencies.
low_band_max = max([max(mag_map[f]) for f in (697, 770, 852, 941)])
low_band = numpy.zeros((mag_length, 4), dtype=numpy.float32)
low_band[:,0] = mag_map[697] / low_band_max
low_band[:,1] = mag_map[770] / low_band_max
low_band[:,2] = mag_map[852] / low_band_max
low_band[:,3] = mag_map[941] / low_band_max
low_band.tofile('lo_x_4.f32')
# Do same for high band as for low band, above.
high_band_max = max([max(mag_map[f]) for f in (1209, 1336, 1477, 1633)])
high_band = numpy.zeros((mag_length, 4), dtype=numpy.float32)
high_band[:,0] = mag_map[1209] / high_band_max
high_band[:,1] = mag_map[1336] / high_band_max
high_band[:,2] = mag_map[1477] / high_band_max
high_band[:,3] = mag_map[1633] / high_band_max
high_band.tofile('hi_x_4.f32')
# Table to map low and high frequency indices to DTMF characters.
dtmf_char_map = (
('1', '4', '7', '*'), # column 1209
('2', '5', '8', '0'), # column 1336
('3', '6', '9', '#'), # column 1477
('A', 'B', 'C', 'D'), # column 1633
)
def detect_digits(samples, detect_threshold=0.25):
# Detection threshold: At least one frequency in the band must be above this
# threshold to be considered a valid DTMF character event.
# Walk through the time steps from the DTMF detection arrays.
raw = []
detections = []
dtmf_char_last = None
dtmf_char_time_start = 0
for step, pair in enumerate(zip(low_band, high_band)):
time_samples = step * step_size
time_seconds = time_samples / fs
lo, hi = pair
# Identify strongest frequency in the band, and if that frequency crosses the threshold.
lo_max = max(lo)
lo_arg = numpy.argmax(lo)
lo_det = lo_max >= detect_threshold
hi_max = max(hi)
hi_arg = numpy.argmax(hi)
hi_det = hi_max >= detect_threshold
# If we detect a frequency in both the low and high bands, we can decode a DTMF character.
dtmf_char = dtmf_char_map[hi_arg][lo_arg] if lo_det and hi_det else None
if dtmf_char_last != dtmf_char:
duration = time_seconds - dtmf_char_time_start
detection = {
'timestamp': dtmf_char_time_start,
'duration': duration,
'character': dtmf_char_last,
}
detections.append(detection)
dtmf_char_last = dtmf_char
dtmf_char_time_start = time_seconds
raw.append(dtmf_char if dtmf_char is not None else '_')
# TODO: The last detected tone is not printed out.
return {
'raw': ''.join(raw),
'detections': detections,
}
def print_detections(detections, minimum_duration=0.0, output_breaks=True):
for detection in detections:
if output_breaks or (detection['character'] is not None):
if detection['duration'] >= minimum_duration:
dtmf_char_str = detection['character'] if detection['character'] is not None else ' '
print('{:8.4f} {:8.4f} {:s}'.format(detection['timestamp'], detection['duration'], dtmf_char_str))
detections_result = detect_digits(samples)
raw = detections_result['raw']
print(raw)
detections = detections_result['detections']
for minimum_duration in numpy.arange(0, 0.03, 0.0025):
chars = [detection['character'] for detection in detections if detection['duration'] >= minimum_duration and detection['character'] is not None]
chars_str = ''.join(chars)
print('>={:6.4f} {:s}'.format(minimum_duration, chars_str))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment