Skip to content

Instantly share code, notes, and snippets.

@jaggzh
Created May 8, 2024 21:46
Show Gist options
  • Save jaggzh/2d33c02a03d324bf7cb2b19c92493397 to your computer and use it in GitHub Desktop.
Save jaggzh/2d33c02a03d324bf7cb2b19c92493397 to your computer and use it in GitHub Desktop.
# datagen.py
import numpy as np
import librosa
def load_audio_chunk(audio_path, start_frame, dur_s, sr):
# Load a specific chunk of the audio file
audio, _ = librosa.load(audio_path, sr=sr, mono=True, offset=start_frame/sr, duration=dur_s)
return audio
def preprocess_audio(audio_data):
# Normalize audio to the range [-1, 1]
max_val = np.max(np.abs(audio_data))
return audio_data / max_val if max_val > 0 else audio_data
def gen_audio_clips(audio_path, offset_s=0, dur_s=None, verbose=0, *, sr, chunklen_s, noise_floor_perc, noise_frac, train_frames, train_skip, batch_count):
total_samples = librosa.get_duration(path=audio_path, sr=sr) * sr
start_frame = 0
silence_eval_s = 0.2 # Length of each chunk for evaluating noise floor
silence_eval_samples = int(sr * silence_eval_s)
while start_frame + train_frames < total_samples:
audio_chunk = load_audio_chunk(audio_path, start_frame, chunklen_s, sr)
audio_chunk = preprocess_audio(audio_chunk)
# Calculate noise floor more robustly
max_amplitudes = []
for j in range(0, len(audio_chunk), silence_eval_samples):
chunk = audio_chunk[j:j + silence_eval_samples]
max_amplitudes.append(np.max(np.abs(chunk)))
max_amplitudes.sort()
noise_floor_index = int(len(max_amplitudes) * noise_floor_perc / 100)
noise_floor = max_amplitudes[noise_floor_index] * noise_frac
if verbose>0:
print('')
print(f"Noise frac perc: {float(noise_floor_perc):.1}")
print(f" Noise frac: {noise_frac:.3}")
print(f"Amplitudes : {max_amplitudes}")
print(f"Noise floor idx: {noise_floor_index}")
print(f"Noise floor : {noise_floor}")
batch_data = []
noise_masks = []
voice_masks = []
for i in range(0, len(audio_chunk) - train_frames, train_skip):
end_frame = i + train_frames
if end_frame > len(audio_chunk):
break
batch = audio_chunk[i:end_frame].reshape(1, train_frames, 1)
# Determine masks
max_amplitude = np.max(np.abs(batch))
is_noise = max_amplitude <= noise_floor
noise_mask = 1.0 if is_noise else 0.0
voice_mask = 0.0 if is_noise else 1.0
batch_data.append(batch)
noise_masks.append(noise_mask)
voice_masks.append(voice_mask)
# Yield batch if count reaches batch_count
if len(batch_data) == batch_count:
xd = np.array(batch_data)
xn = np.array(noise_masks).reshape(-1, 1)
xv = np.array(voice_masks).reshape(-1, 1)
yield xd, xn, xv
batch_data = []
noise_masks = []
voice_masks = []
start_frame += train_frames - train_skip
# Handle any remaining batches
if batch_data:
xd = np.array(batch_data)
xn = np.array(noise_masks).reshape(-1, 1)
xv = np.array(voice_masks).reshape(-1, 1)
yield xd, xn, xv
# model.py
from keras.models import Model
from keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, Add, Multiply
def create_model(input_shape):
# Main audio input
audio_input = Input(shape=input_shape, name="audio_input")
# Mask inputs
noise_mask_input = Input(shape=(1,), name="noise_mask_input")
voice_mask_input = Input(shape=(1,), name="voice_mask_input")
# Shared encoder layer
x = Conv2D(16, (1, 3), activation='relu', padding='same')(audio_input)
x = MaxPooling2D((1, 2), padding='same')(x)
# Noise branch
noise_path = Conv2D(8, (1, 3), activation='relu', padding='same')(x)
noise_path = MaxPooling2D((1, 2), padding='same')(noise_path)
noise_path = UpSampling2D((1, 2))(noise_path)
noise_path = UpSampling2D((1, 2))(noise_path) # Upsample back to original dimension
noise_output = Conv2D(1, (1, 3), activation='sigmoid', padding='same')(noise_path)
# Voice branch
voice_path = Conv2D(8, (1, 3), activation='relu', padding='same')(x)
voice_path = MaxPooling2D((1, 2), padding='same')(voice_path)
voice_path = UpSampling2D((1, 2))(voice_path)
voice_path = UpSampling2D((1, 2))(voice_path) # Upsample back to original dimension
voice_output = Conv2D(1, (1, 3), activation='sigmoid', padding='same')(voice_path)
# Apply masks
masked_noise_output = Multiply()([noise_output, noise_mask_input])
masked_voice_output = Multiply()([voice_output, voice_mask_input])
# Combine masked outputs
combined_output = Add()([masked_noise_output, masked_voice_output])
#model = Model(inputs=[audio_input, noise_mask_input, voice_mask_input], outputs=[combined_output, noise_output, voice_output])
model = Model(inputs=[audio_input, noise_mask_input, voice_mask_input], outputs=[combined_output])
print(model.summary())
return model
# train.py
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import logging
logging.getLogger('tensorflow').setLevel(logging.ERROR)
import tensorflow as tf
from stg import Settings
from datagen import gen_audio_clips
from utils import save_model
from model import create_model
from keras.optimizers import Adam
import numpy as np
from keras.utils import Sequence
class AudioDataGenerator(tf.keras.utils.Sequence):
def __init__(self, *, audio_path, batch_size, steps_per_epoch,
epochs, audio_grabber_batch_size):
self.audio_path = audio_path
self.batch_size = batch_size
self.steps_per_epoch = steps_per_epoch
self.epochs = epochs
self.audio_grabber_batch_size = audio_grabber_batch_size
self.generator = self.create_generator()
def create_generator(self):
return gen_audio_clips(self.audio_path,
offset_s=0,
dur_s=Settings.lookahead_secs,
sr=Settings.sr,
chunklen_s=Settings.lookahead_secs,
noise_floor_perc=Settings.noise_floor_perc,
noise_frac=Settings.noise_frac,
train_frames=Settings.train_frames,
train_skip=Settings.train_skip,
batch_count=self.audio_grabber_batch_size)
def __len__(self):
return self.steps_per_epoch
def __getitem__real(self, index): # reserved for constant getitem test
bb = next(self.generator)
# aa=bb[0][0].squeeze(axis=-1)
aa=bb[0][0]
a2=bb[1][0][0]
a3=bb[2][0][0]
for i in range(0,3): print(bb[i].shape)
for i in aa, a2, a3: print(i.shape)
# import ipdb; ipdb.set_trace(context=16); pass
return (aa,a2,a3), aa # Return as ((inputs)[0], (output))
def __getitem__fail(self, index):
# Create dummy data with the correct shape and type
batch_size = 1 # You can adjust this to the desired batch size
train_frames = Settings.train_frames # Assuming this is defined in your Settings
# Dummy audio data: shape (batch_size, 1, train_frames, 1)
audio_data = np.random.random((batch_size, 1, train_frames, 1)).astype(np.float32)
# Dummy noise and voice masks: shape (batch_size, 1)
noise_mask = np.random.random((batch_size, 1)).astype(np.float32)
voice_mask = np.random.random((batch_size, 1)).astype(np.float32)
# Dummy output (same shape as audio data)
output_data = np.random.random((batch_size, 1, train_frames, 1)).astype(np.float32)
# Return in the format required by the output_signature
return (audio_data, noise_mask, voice_mask), output_data
def __getitem__fail2(self, index):
# Specify the batch size and dimensions
batch_size = 1 # This should match the setup of your network and training configuration
train_frames = Settings.train_frames # The number of frames your model expects
# Generate dummy audio data
audio_data = np.random.rand(batch_size, 1, train_frames, 1).astype(np.float32)
# Generate dummy masks
noise_mask = np.random.rand(batch_size, 1).astype(np.float32)
voice_mask = np.random.rand(batch_size, 1).astype(np.float32)
# Generate dummy output data, matching the shape of audio_data
output_data = np.random.rand(batch_size, 1, train_frames, 1).astype(np.float32)
# Return packaged data as ((audio_data, noise_mask, voice_mask), output_data)
return ((audio_data, noise_mask, voice_mask), output_data)
def __getitem__(self, index):
return np.random.rand(2,3)
# Specify the batch size and dimensions
batch_size = 1 # This should match the setup of your network and training configuration
train_frames = Settings.train_frames # The number of frames your model expects
# Generate dummy audio data with shape (batch_size, 1, train_frames, 1)
audio_data = np.random.rand(batch_size, 1, train_frames, 1).astype(np.float32)
# Generate dummy masks with shape (batch_size, 1)
noise_mask = np.random.rand(batch_size, 1).astype(np.float32)
voice_mask = np.random.rand(batch_size, 1).astype(np.float32)
# Generate dummy output data, matching the shape of audio_data
output_data = np.random.rand(batch_size, 1, train_frames, 1).astype(np.float32)
# Properly pack the data into the expected tuple structure
input_tuple = (audio_data, noise_mask, voice_mask)
return_tuple = (input_tuple, output_data)
return (np.random.rand(2,3),)
return return_tuple
def on_epoch_end(self):
self.generator = self.create_generator() # Restart generator at the end of each epoch
# Create a TensorFlow dataset from the generator
def get_dataset(generator):
output_signature = (
(
tf.TensorSpec(shape=(None, 1, Settings.train_frames, 1), dtype=tf.float32), # Audio data
tf.TensorSpec(shape=(None, 1), dtype=tf.float32), # Noise mask
tf.TensorSpec(shape=(None, 1), dtype=tf.float32) # Voice mask
),
tf.TensorSpec(shape=(None, 1, Settings.train_frames, 1), dtype=tf.float32), # Dummy for output
)
return tf.data.Dataset.from_generator(generator.__getitem__,
output_signature=output_signature,
args=(0,)) # Generator function, signature and initial argument
def train_model(audio_path, epochs=50, batch_size=10, steps_per_epoch=100):
model = create_model(input_shape=(1, Settings.train_frames, 1))
model.compile(optimizer=Adam(), loss='mse')
generator = AudioDataGenerator(
audio_path=audio_path,
batch_size=batch_size,
steps_per_epoch=steps_per_epoch,
epochs=epochs,
audio_grabber_batch_size=1,
)
dataset = get_dataset(generator)
model.fit(dataset, epochs=epochs, steps_per_epoch=steps_per_epoch)
save_model(model, f'gen/model_final.keras')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment