Skip to content

Instantly share code, notes, and snippets.

@peterw
Created January 9, 2023 06:53
Show Gist options
  • Save peterw/c9c79858da1b8a2baa53c83008aefed1 to your computer and use it in GitHub Desktop.
Save peterw/c9c79858da1b8a2baa53c83008aefed1 to your computer and use it in GitHub Desktop.
from moviepy.editor import VideoFileClip, ImageClip, CompositeVideoClip
import moviepy.video.fx.all as vfx
import os
import mux_python
from mux_python.rest import ApiException
import requests
import uuid
from moviepy.editor import *
from moviepy.video.compositing.concatenate import concatenate_videoclips
from time import sleep
import re
import json
import gizeh
import tempfile
import os
def lambda_handler(event, context):
with tempfile.TemporaryDirectory() as tmpdir:
VID1 = os.path.join(tmpdir, "loom.mp4")
VID2 = os.path.join(tmpdir, "loom1.mp4")
PIC1 = os.path.join(tmpdir, "image1.png")
PIC2 = os.path.join(tmpdir, "image2.png")
MASK_1_PNG = os.path.join(tmpdir, "mask1.png")
MASK_2_PNG = os.path.join(tmpdir, "mask2.png")
FINAL_VID = os.path.join(tmpdir, "final.mp4")
def generate_video():
print('starting to process the video with moviepy')
# load video 1
vid1 = VideoFileClip(VID1)#.subclip(0, 4)
vid1 = vid1.resize(0.6) # 70% of its size
# load video 2
vid2 = VideoFileClip(VID2)#.subclip(0, 4)
vid2 = vid2.resize(height=vid1.h)
def create_circle(size, name):
WIDTH, HEIGHT = size# vid1.size
surface = gizeh.Surface(WIDTH, HEIGHT)
circle = gizeh.circle(r=min(WIDTH, HEIGHT)//2, fill=(2,2,2), xy=(WIDTH//2, HEIGHT//2))
circle.draw(surface)
# save to png
surface.write_to_png(name)
''' CREATE MASKS AND ADD IT TO VIDEOS '''
# generate mask png with correct size
create_circle(vid1.size, MASK_1_PNG)
create_circle(vid2.size, MASK_2_PNG)
''' FOR FIRST VIDEO '''
# load mask image
MASK_1 = ImageClip(MASK_1_PNG, transparent=True,ismask=True)
MASK_1 = MASK_1.set_duration(vid1.duration) # set duration
MASK_1 = MASK_1.set_fps(15) # set fps
# convert mask img to vido clip
MASK_1 = CompositeVideoClip([MASK_1], bg_color=(0),
size=vid1.size, ismask=True)
# set mask to video
vid1.mask = MASK_1
vid1 = concatenate_videoclips([vid1], method='compose') # combine with video
# vid1.preview(audio=False)
''' FOR SECOND VIDEO '''
# load mask image
MASK_2 = ImageClip(MASK_2_PNG, transparent=True,ismask=True)
MASK_2 = MASK_2.set_duration(vid2.duration) # set duration
MASK_2 = MASK_2.set_fps(15) # set fps
# convert mask img to vido clip
MASK_2 = CompositeVideoClip([MASK_2], bg_color=(0),
size=vid2.size, ismask=True)
# set mask to video
vid2.mask = MASK_2
vid2 = concatenate_videoclips([vid2], method='compose') # combine with video
# vid2.preview(audio=False)
''' ADD IMAGE TO THE VIDEO '''
image_clip1 = ImageClip(PIC1, duration=vid1.duration)#.resize(0.5)
image_clip2 = ImageClip(PIC2, duration=vid2.duration)#.resize(0.5)
image_clip2 = image_clip2.resize(height=image_clip1.h)
result1 = CompositeVideoClip([image_clip1, vid1.set_position(("right", "bottom")) ], use_bgclip=True)
result2 = CompositeVideoClip([image_clip2, vid2.set_position(("right", "bottom")) ], use_bgclip=True)
# add black area to left and right side
def resize_vids_if_not_same_size():
# add black area to left and right side
m = abs(image_clip1.w - image_clip2.w)
if m%2 == 1: l,r = (m//2,m//2 + 1)
else: l,r = (m//2,m//2)
if image_clip1.w > image_clip2.w:
result2 = result2.margin(left=l, right=r)
else:
result1 = result1.margin(left=l, right=r)
print(result1.size, result2.size)
# merge both the videos (vid1 and vid2 must be of same size/resolution before joining them)
final = concatenate_videoclips([result1, result2])
# final.preview(audio=False)
final.write_videofile(FINAL_VID)
def download_pic(url, file_name):
print('downloading picture ' + url)
response = requests.get(url)
if response.status_code == 200:
with open(file_name, "w+b") as file:
response = requests.get(url)
file.write(response.content)
else:
print('picture url was ERRORED')
def download_loom(videoLink, vid_name):
linkList = re.split("/", videoLink)
videoId = linkList[-1]
transcodedURL = "https://www.loom.com/api/campaigns/sessions/" + \
videoId + "/transcoded-url"
postRequest = requests.post(transcodedURL)
deserialize = json.loads(postRequest.text)
videoURL = deserialize['url']
print("Downloading...")
downloadVideo = requests.get(videoURL)
with open(vid_name, 'wb') as f:
for chunk in downloadVideo.iter_content(chunk_size=255):
if chunk:
f.write(chunk)
print('successfully downloaded loom video')
pic_url = 'https://api.apiflash.com/v1/urltoimage?access_key=02e0b0305fdc47d49c6a3bb7179f9358&url=https%3A%2F%2Ftwitter.com'
pic2_url = 'https://api.apiflash.com/v1/urltoimage?access_key=02e0b0305fdc47d49c6a3bb7179f9358&url=https%3A%2F%2Freddit.com'
download_pic(pic_url, PIC1)
download_pic(pic2_url, PIC2)
download_loom('https://www.loom.com/share/2ff13eb033584295b6389de7cdd1ae54', VID1)
download_loom('https://www.loom.com/share/ac0957d8a75a4f9ba0b010be1ea457ae', VID2)
generate_video()
return {
'statusCode': 200,
'body': json.dumps(str(requests.get('https://google.com')))
}
@arfathyahiya
Copy link

Yo! Found your gist from upwork, can you share what error you are getting?

@DICOT4
Copy link

DICOT4 commented Jan 9, 2023

First look issue -
Lambda doesn't have a directory structure like your local system.

# Change directory to /tmp folder
os.chdir('/tmp') _# This is important

Let's discuss this further. Here's my Upwork profile.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment