Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save sonipranjal/5ef17518adda4f9bf551eb29ddf91d7c to your computer and use it in GitHub Desktop.
Save sonipranjal/5ef17518adda4f9bf551eb29ddf91d7c to your computer and use it in GitHub Desktop.
import shutil, tempfile, modal, uuid, os, boto3, ffmpeg, logging
from enum import Enum
from pydantic import BaseModel
from typing import Optional, Dict, List, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
app_image = (
modal.Image.debian_slim()
.apt_install("ffmpeg")
.pip_install("ffmpeg-python", "boto3")
)
stub = modal.Stub("coc-video-pipeline", image=app_image, secrets=[modal.Secret.from_name("coc-video-pipeline")])
logging.basicConfig(level=logging.INFO)
with app_image.imports():
CLOUDFLARE_ACCOUNT_ID = os.getenv("CLOUDFLARE_ACCOUNT_ID")
CLOUDFLARE_R2_ACCESS_KEY = os.getenv("CLOUDFLARE_R2_ACCESS_KEY")
CLOUDFLARE_R2_SECRET = os.getenv("CLOUDFLARE_R2_SECRET")
CLOUDFLARE_R2_BUCKET_NAME = os.getenv("CLOUDFLARE_R2_BUCKET_NAME")
CLOUDFLARE_R2_DOMAIN = os.getenv("CLOUDFLARE_R2_DOMAIN")
s3_client = boto3.client(
service_name="s3",
endpoint_url=f"https://{CLOUDFLARE_ACCOUNT_ID}.r2.cloudflarestorage.com",
aws_access_key_id=CLOUDFLARE_R2_ACCESS_KEY,
aws_secret_access_key=CLOUDFLARE_R2_SECRET,
region_name="auto"
)
bucket_name = CLOUDFLARE_R2_BUCKET_NAME
class RequestBody(BaseModel):
video_key_r2: str
@stub.function(timeout=4000)
def process_video_in_background(video_key_r2:str):
base_path = f"processed-videos/{video_key_r2.rsplit('.', 1)[0]}-{uuid.uuid4()}"
def download_file_from_r2(key):
temp_dir = 'temp'
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
temp_file_path = os.path.join(temp_dir, f"{uuid.uuid4()}.tmp")
with open(temp_file_path, 'wb') as f:
s3_client.download_fileobj(bucket_name, key, f)
return temp_file_path
def upload_file_to_r2(file_path, key):
try:
with open(file_path, 'rb') as f:
s3_client.upload_fileobj(f, bucket_name, key)
url = f"https://{CLOUDFLARE_R2_DOMAIN}/{key}"
logging.info(f"File uploaded successfully: {url}")
return url
except Exception as e:
logging.error(f"Failed to upload file {file_path} to R2: {e}")
return None
def convert_video(input_path, resolution, res_label):
temp_dir = tempfile.mkdtemp()
output_path = f'{temp_dir}/index.m3u8'
hls_base_url = f"https://{CLOUDFLARE_R2_DOMAIN}/{base_path}/{res_label}/"
try:
ffmpeg.input(input_path).output(
output_path,
format='hls',
hls_time=10,
hls_list_size=0,
start_number=0,
vcodec='libx264',
s=resolution,
acodec='aac',
audio_bitrate='320k',
ar='48000',
hls_base_url=hls_base_url
).run(overwrite_output=True)
print("FFmpeg processing completed successfully.")
return temp_dir
except Exception as e:
# If ffmpeg fails, log the error and clean up the directory
logging.error(f"Error during video conversion: {e}")
shutil.rmtree(temp_dir) # Delete the temp directory since conversion failed
return None
def convert_and_upload(input_path, res_label, res_value):
output_dir = convert_video(input_path, res_value, res_label)
print("uploading to r2 baby!!", output_dir)
playlist_url = None
with ThreadPoolExecutor() as executor:
futures = []
for root, dirs, files in os.walk(output_dir):
for file in files:
full_path = os.path.join(root, file)
file_key = f"{base_path}/{res_label}/{file}"
if 'index_vtt.m3u8' not in file and file.endswith('.m3u8'):
is_playlist = True
else:
is_playlist = False
future = executor.submit(upload_file_to_r2, full_path, file_key)
futures.append((future, is_playlist))
for future, is_playlist in futures:
result = future.result()
if result and is_playlist:
playlist_url = result
elif result:
print(f"Uploaded: {result}")
else:
print("Failed to upload some files.")
return res_label, playlist_url
video_urls = {}
try:
input_path = download_file_from_r2(video_key_r2)
resolutions = {'1080p': '1920x1080', '720p': '1280x720'}
video_urls = {}
with ThreadPoolExecutor(max_workers=len(resolutions)) as executor:
future_to_label = {executor.submit(convert_and_upload, input_path, label, value): label for label, value in resolutions.items()}
for future in as_completed(future_to_label):
res_label = future_to_label[future]
try:
_, playlist_url = future.result()
if playlist_url:
video_urls[res_label] = playlist_url
except Exception as exc:
logging.error(f'Error processing resolution {res_label}: {exc}')
# Clean up the local file
os.remove(input_path)
except Exception as e:
return {"error": str(e)}
print("processed successfully", video_urls)
@stub.function()
@modal.web_endpoint(method="POST")
def process(video: RequestBody):
process_video_in_background.spawn(video_key_r2=video.video_key_r2)
return {"message": "Video is being processed!"}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment