Created
November 6, 2023 19:56
-
-
Save sergio11/d6746b53e20e44fe7794f23f609947f5 to your computer and use it in GitHub Desktop.
generate_song_operator.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.utils.decorators import apply_defaults | |
from pydub import AudioSegment | |
from operators.base_custom_operator import BaseCustomOperator | |
from bson import ObjectId | |
import tempfile | |
from datetime import datetime | |
class GenerateSongOperator(BaseCustomOperator): | |
""" | |
Combines a melody and voice audio and stores the combined audio in MinIO. | |
:param mongo_uri: MongoDB connection URI. | |
:type mongo_uri: str | |
:param mongo_db: MongoDB database name. | |
:type mongo_db: str | |
:param mongo_db_collection: MongoDB collection name. | |
:type mongo_db_collection: str | |
:param minio_endpoint: MinIO server endpoint. | |
:type minio_endpoint: str | |
:param minio_access_key: MinIO access key. | |
:type minio_access_key: str | |
:param minio_secret_key: MinIO secret key. | |
:type minio_secret_key: str | |
:param minio_bucket_name: MinIO bucket name. | |
:type minio_bucket_name: str | |
""" | |
@apply_defaults | |
def __init__( | |
self, | |
*args, **kwargs | |
): | |
super().__init__(*args, **kwargs) | |
def execute(self, context): | |
self._log_to_mongodb("Starting execution of GenerateSongOperator", context, "INFO") | |
# Retrieve melody_id from the previous task using XCom | |
song_id = context['task_instance'].xcom_pull(task_ids='generate_voice_task')['song_id'] | |
self._log_to_mongodb(f"Retrieved song_id: {song_id}", context, "INFO") | |
# Get a reference to the MongoDB collection | |
collection = self._get_mongodb_collection() | |
song_info = collection.find_one({"_id": ObjectId(song_id)}) | |
melody_file_name = song_info.get("melody_file_name") | |
voice_file_name = song_info.get("voice_file_name") | |
self._log_to_mongodb(f"Retrieved melody WAV and voice audio paths for song_id: {song_id}", context, "INFO") | |
# Connect to MinIO and download the Melody and voice audio | |
minio_client = self._get_minio_client(context) | |
try: | |
melody_file_data = minio_client.get_object(self.minio_bucket_name, melody_file_name) | |
voice_file_data = minio_client.get_object(self.minio_bucket_name, voice_file_name) | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as melody_temp_file: | |
melody_temp_file_path = melody_temp_file.name | |
melody_temp_file.write(melody_file_data.read()) | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as voice_temp_file: | |
voice_temp_file_path = voice_temp_file.name | |
voice_temp_file.write(voice_file_data.read()) | |
# Load the files using the file paths | |
melody = AudioSegment.from_file(melody_temp_file_path, format="wav") | |
voice = AudioSegment.from_file(voice_temp_file_path, format="wav") | |
# Try to normalize the voice in order to improve the audio quality | |
voice = voice.normalize() | |
fade_duration = 100 | |
voice = voice.fade_in(fade_duration).fade_out(fade_duration) | |
amplification_factor = 5.0 | |
melody = melody + amplification_factor | |
voice = voice + amplification_factor | |
# Resample the audio to match the same sample rate and channels | |
voice = voice.set_frame_rate(melody.frame_rate) | |
voice = voice.set_channels(melody.channels) | |
# Ensure both audio files have the same duration | |
if len(voice) > len(melody): | |
melody += AudioSegment.silent(duration=len(voice) - len(melody)) | |
else: | |
voice += AudioSegment.silent(duration=len(melody) - len(voice)) | |
# Combine the melody and voice | |
combined_audio = melody.overlay(voice) | |
self._log_to_mongodb("Audio files combined", context, "INFO") | |
# Export the combined audio as bytes | |
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as combined_audio_temp_file: | |
combined_audio_temp_file_path = combined_audio_temp_file.name | |
combined_audio.export(combined_audio_temp_file_path, format="mp4") | |
final_song_name = f"{song_id}_final_song.mp4" | |
# Store the generated file in MinIO | |
self._store_file_in_minio( | |
local_file_path=combined_audio_temp_file_path, | |
minio_object_name=final_song_name, | |
context=context, | |
content_type="audio/mpeg") | |
self._log_to_mongodb(f"Combined audio stored in MinIO for song_id: {song_id}", context, "INFO") | |
# Update the document in MongoDB | |
collection.update_one({"_id": ObjectId(song_id)}, { | |
"$set": { | |
"final_song_name": final_song_name, | |
"song_status": "final_song_generated", | |
"final_song_generated_at": datetime.now() | |
} | |
}) | |
self._log_to_mongodb("GenerateSongOperator execution completed", context, "INFO") | |
except Exception as e: | |
self._log_to_mongodb(f"Error storing or retrieving audio from MinIO: {e}", context, "ERROR") | |
raise | |
return {"song_id": str(song_id)} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment