Skip to content

Instantly share code, notes, and snippets.

@3dsf
Created June 26, 2021 04:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save 3dsf/7e17d8f69b078464d63f61ffd9a2fa78 to your computer and use it in GitHub Desktop.
Save 3dsf/7e17d8f69b078464d63f61ffd9a2fa78 to your computer and use it in GitHub Desktop.
ffmpeg-python tensorflow implementation --- go to processFrame()
#!/usr/bin/python3
#modified ffmpeg-python tensorflow implentation from https://github.com/kkroening/ffmpeg-python/blob/master/examples/tensorflow_stream.py
### pip install ffmpeg-python numpy
import os
import subprocess # required for ffmpeg-python
import logging as logger # you could clean out the logger if you like
import ffmpeg # ffmpeg-python
import numpy as np # required for ffmpeg-python
def parseFFmpegBanner(inputVideo):
process = subprocess.Popen(['ffmpeg', '-hide_banner', '-i', inputVideo, '-y' ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT,universal_newlines=True)
for line in process.stdout:
print(line)
if ' Video:' in line:
l_split = line.split(',')
#print('---------printing line ", line)
for segment in l_split[1:]:
if 'fps' in segment:
s = segment.strip().split(' ')
fps = float(s[0])
if 'x' in segment:
s = segment.strip().split('x')
width = int(s[0])
s2 = s[1].split(' ')
height = int(s2[0])
if 'Duration:' in line:
s = line.split(',')
ss = s[0].split(' ')
sss = ss[3].strip().split(':')
seconds = float(sss[0])*60*60 + float(sss[1])*60 + float(sss[2])
if 'Audio:' in line:
AUDIO = True
print('fps = ', str(fps))
print('width = ', str(width))
print('height = ', str(height))
print('seconds = ', str(seconds))
print('AUDIO = ', AUDIO)
return fps, width, height, seconds, AUDIO
def readFrameAsNp(ffmpegDecode, width, height):
logger.debug('Reading frame')
# Note: RGB24 == 3 bytes per pixel.
frame_size = width * height * 3
in_bytes = ffmpegDecode.stdout.read(frame_size)
if len(in_bytes) == 0:
frame = None
else:
assert len(in_bytes) == frame_size
frame = (
np
.frombuffer(in_bytes, np.uint8)
.reshape([height, width, 3])
)
return frame
def writeFrameAsByte(ffmpegEncode, frame):
logger.debug('Writing frame')
ffmpegEncode.stdin.write(
frame
.astype(np.uint8)
.tobytes()
)
def vid2np(in_filename):
logger.info('vid2np() -- Decoding to pipe')
codec = 'h264'
args = (
ffmpeg
.input(in_filename,
**{'c:v': codec})
.output('pipe:', format='rawvideo', pix_fmt='rgb24')
.global_args("-hide_banner")
.compile()
)
return subprocess.Popen(args, stdout=subprocess.PIPE)
def np2vid(out_filename, fps_out, in_file, widthOut, heightOut):
logger.info('np2vid() encoding from pipe')
global AUDIO
codec = 'h264'
if AUDIO == True :
pipeline2 = ffmpeg.input(in_file)
audio = pipeline2.audio
args = (
ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='rgb24',
s='{}x{}'.format(widthOut, heightOut),
framerate=fps_out )
.output(audio, out_filename , pix_fmt='yuv420p', **{'c:v': codec},
shortest=None, acodec='copy')
.global_args("-hide_banner")
.overwrite_output()
.compile()
)
else:
args = (
ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='rgb24',
s='{}x{}'.format(width, height),
framerate=fps_out )
.output(out_filename , pix_fmt='yuv420p', **{'c:v': codec})
.global_args("-hide_banner")
.overwrite_output()
.compile()
)
return subprocess.Popen(args, stdin=subprocess.PIPE)
### This is where you do your magic
def processFrame(frame) :
global INCR
rawFrame = frame
processedFrame = rawFrame
return processedFrame
###
if __name__ == '__main__':
INCR = 0
inputVid = 'input.mp4' #change this
outputVid = 'output.mp4'
fps, width, height, seconds, AUDIO = parseFFmpegBanner(inputVid) ### fps, width, height, seconds, AUDIO
print(str(fps))
print(str(width))
print(str(fps))
print(str(seconds))
estimatedFrames = fps * seconds
ffmpegDecode = vid2np(inputVid)
ffmpegEncode = np2vid(outputVid, fps, inputVid, width, height)
while True:
in_frame = readFrameAsNp(ffmpegDecode, width, height)
if in_frame is None:
logger.info('End of input stream')
break
logger.debug('Processing frame')
out_frame = processFrame(in_frame)
writeFrameAsByte(ffmpegEncode, out_frame)
logger.info('Waiting for ffmpegDecode')
ffmpegDecode.wait()
logger.info('Waiting for ffmpegEncode')
ffmpegEncode.stdin.close()
ffmpegEncode.wait()
logger.info('Done')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment