Skip to content

Instantly share code, notes, and snippets.

@sergio11
Created November 6, 2023 19:56
Show Gist options
  • Save sergio11/d6746b53e20e44fe7794f23f609947f5 to your computer and use it in GitHub Desktop.
Save sergio11/d6746b53e20e44fe7794f23f609947f5 to your computer and use it in GitHub Desktop.
generate_song_operator.py
from airflow.utils.decorators import apply_defaults
from pydub import AudioSegment
from operators.base_custom_operator import BaseCustomOperator
from bson import ObjectId
import tempfile
from datetime import datetime
class GenerateSongOperator(BaseCustomOperator):
"""
Combines a melody and voice audio and stores the combined audio in MinIO.
:param mongo_uri: MongoDB connection URI.
:type mongo_uri: str
:param mongo_db: MongoDB database name.
:type mongo_db: str
:param mongo_db_collection: MongoDB collection name.
:type mongo_db_collection: str
:param minio_endpoint: MinIO server endpoint.
:type minio_endpoint: str
:param minio_access_key: MinIO access key.
:type minio_access_key: str
:param minio_secret_key: MinIO secret key.
:type minio_secret_key: str
:param minio_bucket_name: MinIO bucket name.
:type minio_bucket_name: str
"""
@apply_defaults
def __init__(
self,
*args, **kwargs
):
super().__init__(*args, **kwargs)
def execute(self, context):
self._log_to_mongodb("Starting execution of GenerateSongOperator", context, "INFO")
# Retrieve melody_id from the previous task using XCom
song_id = context['task_instance'].xcom_pull(task_ids='generate_voice_task')['song_id']
self._log_to_mongodb(f"Retrieved song_id: {song_id}", context, "INFO")
# Get a reference to the MongoDB collection
collection = self._get_mongodb_collection()
song_info = collection.find_one({"_id": ObjectId(song_id)})
melody_file_name = song_info.get("melody_file_name")
voice_file_name = song_info.get("voice_file_name")
self._log_to_mongodb(f"Retrieved melody WAV and voice audio paths for song_id: {song_id}", context, "INFO")
# Connect to MinIO and download the Melody and voice audio
minio_client = self._get_minio_client(context)
try:
melody_file_data = minio_client.get_object(self.minio_bucket_name, melody_file_name)
voice_file_data = minio_client.get_object(self.minio_bucket_name, voice_file_name)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as melody_temp_file:
melody_temp_file_path = melody_temp_file.name
melody_temp_file.write(melody_file_data.read())
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as voice_temp_file:
voice_temp_file_path = voice_temp_file.name
voice_temp_file.write(voice_file_data.read())
# Load the files using the file paths
melody = AudioSegment.from_file(melody_temp_file_path, format="wav")
voice = AudioSegment.from_file(voice_temp_file_path, format="wav")
# Try to normalize the voice in order to improve the audio quality
voice = voice.normalize()
fade_duration = 100
voice = voice.fade_in(fade_duration).fade_out(fade_duration)
amplification_factor = 5.0
melody = melody + amplification_factor
voice = voice + amplification_factor
# Resample the audio to match the same sample rate and channels
voice = voice.set_frame_rate(melody.frame_rate)
voice = voice.set_channels(melody.channels)
# Ensure both audio files have the same duration
if len(voice) > len(melody):
melody += AudioSegment.silent(duration=len(voice) - len(melody))
else:
voice += AudioSegment.silent(duration=len(melody) - len(voice))
# Combine the melody and voice
combined_audio = melody.overlay(voice)
self._log_to_mongodb("Audio files combined", context, "INFO")
# Export the combined audio as bytes
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as combined_audio_temp_file:
combined_audio_temp_file_path = combined_audio_temp_file.name
combined_audio.export(combined_audio_temp_file_path, format="mp4")
final_song_name = f"{song_id}_final_song.mp4"
# Store the generated file in MinIO
self._store_file_in_minio(
local_file_path=combined_audio_temp_file_path,
minio_object_name=final_song_name,
context=context,
content_type="audio/mpeg")
self._log_to_mongodb(f"Combined audio stored in MinIO for song_id: {song_id}", context, "INFO")
# Update the document in MongoDB
collection.update_one({"_id": ObjectId(song_id)}, {
"$set": {
"final_song_name": final_song_name,
"song_status": "final_song_generated",
"final_song_generated_at": datetime.now()
}
})
self._log_to_mongodb("GenerateSongOperator execution completed", context, "INFO")
except Exception as e:
self._log_to_mongodb(f"Error storing or retrieving audio from MinIO: {e}", context, "ERROR")
raise
return {"song_id": str(song_id)}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment