import webrtcvad
import csv
import time
import sys
import wave
import collections
import contextlib
from os import walk
from os.path import join, basename
from pydub import AudioSegment, scipy_effects
from itertools import takewhile
from json import dump
def read_wave(path):
"""Reads a .wav file.
Takes the path, and returns (PCM audio data, sample rate).
with contextlib.closing(, 'rb')) as wf:
sample_rate = wf.getframerate()
sample_width = wf.getsampwidth()
pcm_data = wf.readframes(wf.getnframes())
num_channels = wf.getnchannels()
assert num_channels == 1
assert sample_width == 2
assert sample_rate in (8000, 16000, 32000, 48000)
return pcm_data, sample_rate, wf.getnframes()/float(sample_rate)
class Frame(object):
"""Represents a "frame" of audio data."""
def __init__(self, bytes, timestamp, duration):
self.bytes = bytes
self.timestamp = timestamp
self.duration = duration
def frame_generator(frame_duration_ms, audio, sample_rate):
"""Generates audio frames from PCM audio data.
Takes the desired frame duration in milliseconds, the PCM data, and
the sample rate.
Yields Frames of the requested duration.
n = int(sample_rate * (frame_duration_ms / 1000.0) * 2)
offset = 0
timestamp = 0.0
duration = (float(n) / sample_rate) / 2.0
while offset + n < len(audio):
yield Frame(audio[offset:offset + n], timestamp, duration)
timestamp += duration
offset += n
def vad_collector(
"""Filters out non-voiced audio frames.
Given a webrtcvad.Vad and a source of audio frames, yields only
the voiced audio.
Uses a padded, sliding window algorithm over the audio frames.
When more than 90% of the frames in the window are voiced (as
reported by the VAD), the collector triggers and begins yielding
audio frames. Then the collector waits until 90% of the frames in
the window are unvoiced to detrigger.
The window is padded at the front and back to provide a small
amount of silence or the beginnings/endings of speech around the
voiced frames.
sample_rate - The audio sample rate, in Hz.
frame_duration_ms - The frame duration in milliseconds.
padding_duration_ms - The amount to pad the window, in milliseconds.
vad - An instance of webrtcvad.Vad.
frames - a source of audio frames (sequence or generator).
Returns: A generator that yields PCM audio data.
num_padding_frames = int(padding_duration_ms / frame_duration_ms)
# We use a deque for our sliding window/ring buffer.
ring_buffer = collections.deque(maxlen=num_padding_frames)
# We have two states: TRIGGERED and NOTTRIGGERED. We start in the
triggered = False
voiced_frames = []
for frame in frames:
is_speech = vad.is_speech(frame.bytes, sample_rate)
if not triggered:
ring_buffer.append((frame, is_speech))
num_voiced = len([f for f, speech in ring_buffer if speech])
# If we're NOTTRIGGERED and more than 90% of the frames in
# the ring buffer are voiced frames, then enter the
# TRIGGERED state.
if num_voiced > 0.9 * ring_buffer.maxlen:
triggered = True
# We want to yield all the audio we see from now until
# we are NOTTRIGGERED, but we have to start with the
# audio that's already in the ring buffer.
for f in ring_buffer:
# We're in the TRIGGERED state, so collect the audio data
# and add it to the ring buffer.
ring_buffer.append((frame, is_speech))
num_unvoiced = len([f for f, speech in ring_buffer if not speech])
# If more than 90% of the frames in the ring buffer are
# unvoiced, then enter NOTTRIGGERED and yield whatever
# audio we've collected.
if num_unvoiced > 0.9 * ring_buffer.maxlen:
triggered = False
yield [f for f in voiced_frames]
voiced_frames = []
# If we have any leftover voiced audio when we run out of input,
# yield it.
if voiced_frames:
yield [f for f in voiced_frames]
def knit_segments(segments):
max_split = 1.00 # 1s
# group across short silences
for i, seg in enumerate(segments):
between = 0
if not not segments[i-1]:
last_seg = segments[i-1]
between = seg[0].timestamp - (
last_seg[-1].timestamp + last_seg[-1].duration
if between < max_split:
segments[i-1][0] = segments.pop(i)[0]
return segments
def reduce_to_longest(segments):
# if there are still multiple voiced segments, take the longest
longest_seg = None
longest_length = 0
for seg in segments:
seg_length = seg[0].timestamp + seg[-1].timestamp + seg[-1].duration
if seg_length > longest_length:
longest_seg = seg
longest_length = seg_length
return [longest_seg]
def utterance_length_of(segments, length):
end = segments[-1][-1].timestamp + segments[-1][-1].duration
end = length
start = segments[0][0].timestamp
start = 0
utterance_length = end - start
return utterance_length
def extract_timings(fn, segments, length):
bfn = basename(fn)
length = round(length * 1000, 2)
end = segments[-1][-1].timestamp + segments[-1][-1].duration
end = length
start = segments[0][0].timestamp
start = 0
utterance_length = end - start
assert utterance_length > 0
utterance_length = 0
start = 0
end = 0
leading = round(start * 1000, 2)
# leading should be longer than fastest human RT to visual stimuli
assert leading > 150
leading = "NA"
trailing = round(length - end * 1000, 2)
trailing = "NA"
# extract participant and item from filename
p, item = bfn.split(".", 1)
item = "".join(takewhile(str.isdigit, item))
return {
"Filename": bfn,
"Leading": leading,
"Trailing": trailing,
"Length": length, # full file duration
"isFiller": "F" in bfn,
"Condition_Q": "Q" in bfn,
"Condition_GP": "Y" in bfn,
"Participant": p,
"Item": item,
"Utterance length": round(utterance_length * 1000, 2),
"Reading": 1 if "r1" in bfn else 2
def main(args):
main_start = time.time()
# target folder containg audio files should be provided by CL-arg;
# if not, defauts to 48k in CWD
fdir = args[0]
except IndexError:
fdir = "48k"
dirs = fdir.split("/")
quality = "48k"
subj = "all"
dts = time.strftime("%Y-%m-%d_%Hh%Mm%Ss")
for d in dirs:
if "k" in d:
quality = d
if "k" not in dirs[-1]:
subj = dirs[-1]
# build a list of files across subdirs of fdir
fns = [
join(root, name)
for root, dirs, files in walk(fdir)
for name in files if "wav" in name and "P" not in name
# define output file name
outf = f"rvad-{subj}-{quality}-{dts}.csv"
logf = f"rvadlog-{subj}-{quality}-{dts}.json"
outf = f"rvad-{dts}.csv"
logf = f"rvadlog-{dts}.json"
# intialize values
timings = []
meta = []
c = 0 # number of files processed
f = 0 # number of files that failed after all trials
n = len(fns) # total number of files
frame_dur = 30 # width of "frames" of audio checked with VAD
# script will report progress every ~rep_window sec
rep_window = 15
next_rep = time.time() + rep_window
print(f"\nRunning robust VAD over {n} files.")
print(f"\tWill save result to {outf}...")
for fn in fns:
iter_start = time.time()
if time.time() > next_rep:
avg_time = (time.time()-main_start) / c
f"> {c} of {n} done ({(c/n) * 100:>.1f}%) with {f} failures " +
f"in {time.time()-main_start:>.1f}s." +
f"\n\tEstimated time to completion: {avg_time * (n-c):>.1f}s" +
f"(est. total={avg_time*n:>.1f})."
next_rep = time.time()+rep_window
# reset the things
audioseg = None
segments = None
ct = None
lastError = None
success = False
# if we fail with all HPF values, try less aggressive VAD
for agg in (3, 2, 1, 0):
# instantiate VAD object
vad = webrtcvad.Vad(agg)
# if we fail, ramp up high-pass filter (HPF) cut-off
for hpf in (0, 200, 400):
if hpf > 0:
# Only use slow AudioSegment if we need to HPF
if not audioseg:
# only read from file once
audioseg = AudioSegment.from_file(fn)
sample_rate = audioseg.frame_rate
length = audioseg.duration_seconds
print(f"Couldn't load {basename(fn)}.")
# apply the current vaulue of HPF
audio = audioseg.high_pass_filter(hpf, 8).raw_data
# is faster than AudioSegment.from_file
audio, sample_rate, length = read_wave(fn)
print(f"Couldn't load {basename(fn)}.")
frames = list(frame_generator(frame_dur, audio, sample_rate))
segments = list(vad_collector(
frame_dur * 10, # padding
if len(segments) > 1:
# multiple utterances; eliminate short gaps
segments = knit_segments(segments)
if len(segments) > 1:
# still multiple utterances; pick longest
segments = reduce_to_longest(segments)
# get candidate's timings
ct = extract_timings(fn, segments, length)
# check for valid values of candidate's timings
# should have captured exactly 1 utterance
assert len(segments) == 1, "Wrong number of segments"
# utterance must be at least 1.4s and at most 95% of
# recording length
assert 2500 < ct["Utterance length"], "Utterance too short"
assert 10000 > ct["Utterance length"], "Utterance too long"
# leading and trailing should not be NA
assert ct["Leading"] != "NA", "Leading == NA"
assert ct["Trailing"] != "NA", "Trailing == NA"
# Leading should be more than 250 and less than
# 95% of recording length
assert 120 < ct["Leading"], "Leading too short"
assert 0.95 * length * \
1000 > ct["Leading"], "Leading too long"
# Trailing should be less than 95% of recording length
assert ct["Trailing"] < 0.95 * \
length * 1000, "Trailing too long"
success = True
except AssertionError as e:
lastError = str(e)
segments = None
ct = None
# if a candidate has survived, we're done!
if success:
"file": basename(fn),
"agg": agg,
"hpf": hpf,
"success": success,
"error": lastError if not segments else "NA",
"processing time": round(time.time()-iter_start, 4)
if success:
f += 1
c += 1
with open(outf, "w") as outcsv:
writer = csv.DictWriter(outcsv, fieldnames=timings[0].keys())
for row in timings:
with open(logf, 'w') as outjson:
dump(meta, outjson, indent=2)
f"Completed {n} in {time.time()-main_start:>.2f}s with {f} failures.")
if __name__ == '__main__':
