Skip to content

Instantly share code, notes, and snippets.

@tanepiper
Created December 20, 2020 20:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tanepiper/873e5332a2f98ba0c0e5bf876b68b807 to your computer and use it in GitHub Desktop.
Save tanepiper/873e5332a2f98ba0c0e5bf876b68b807 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""
Video Transcoder Daemon
@author Tane Piper
The Transcoder Daemon is an application that runs as a background process
on a unix server. Using web.py, it acts as a web application listening on
a set port. It takes incoming /queue requests which contain POST information.
The simple queue, just takes in a filename for input and output and some
quality information.
A complex request (yet to be implemented) takes a full range of settings in
order to enable more complex encoding in situations such as a file failing
to be read.
The main encoder is a 3rd party application, mencoder, and the daemon creates threads
to handle these requests.
Contents
0. DON'T TOUCH
1. Header Area
2. Thread and Queue Classes & Methods
3. Video Class
4. The Main Daemon class
5. Startup stuff
"""
"""
0. DON'T TOUCH
This section below is where we set up our imports and module information such
as the version and app name
"""
import logging, sys
import os, time, atexit
from signal import SIGTERM
import threading, Queue, time
import web
import subprocess
import urllib
import urllib2
import json
import optparse
__version_info__ = ('0', '3', '0')
__version__ = '.'.join(__version_info__)
__str__ = "VidioWiki Video Transcoder Daemon"
APP_ID = "vwdaemon"
APP_LOGGING = "/var/log/%s.log" % APP_ID
PID_FILE = "/var/run/%s.pid" % APP_ID
# This is a hack to fix the race condition in the subprocess module
subprocess._cleanup = lambda: None
URLS = (
"/queue", "QueueIncoming",
)
# The inputs this line takes, with defaults are: input, output, abr:rb=56,
# flv:bbitrate=500, srate=22050
ENCODER_ONE_PASS ="""
mencoder %s -o %s -of lavf -oac mp3lame -lameopts abr:br=%d -ovc lavc \
-lavcopts vcodec=flv:vbitrate=%d:mbd=2:mv0:trell:v4mv:cbp:last_pred=3 \
-srate %d
"""
# The inputs this line takes, with defaults are: input, output, abr:rb=56,
# flv:bbitrate=500, vpass=2, srate=22050
ENCODER_MULTI_PASS = """
mencoder %s -o %s -of lavf -oac mp3lame -lameopts abr:br=%d -ovc lavc \
-lavcopts vcodec=flv:vbitrate=%d:mbd=2:mv0:trell:v4mv:cbp:last_pred=3:vpass=%d \
-srate %d
"""
###############################################################################
"""
1. Header Area
"""
# The Base path of the video content is
BASE_PATH = '/home/tanep/vwtranscoder/'
# The location of where the incoming videos are
VIDEO_PATH = BASE_PATH + 'videos/'
# The location to put the output video
OUTPUT_PATH = BASE_PATH + 'output/'
# The port this application runs on
PORT = "1337"
# The URL to post to when a video encode is complete
POST_URL = 'http://localhost'
# The maximum number of threads this app should create for encoding
MAX_THREADS = 4
# The Debugging level for output. Debugging is logging.DEBUG while production is
# logging.INFO
LOGGING_LEVEL = logging.DEBUG
"""
The values below are the default values for video encoding in one pass
"""
DEFAULT_ABR_RB = 56
DEFAULT_FLV_BITRATE = 500
DEFAULT_VPASS = 0
DEFAULT_SRATE = 22050
###############################################################################
# TRY NOT TO TOUCH BELOW THIS LINE #
###############################################################################
"""
2. Thread and Queue Classes & Methods
"""
task_queue = Queue.Queue()
done_queue = Queue.Queue()
task_pool = []
done_pool = []
FLAG_INIT = 0
FLAG_STOP = -1
FLAG_STAGE_1 = 1
FLAG_STAGE_2 = 2
def start_threads(in_threads=5, out_threads=3):
"""
This method is called to start the number of threads required
"""
for i in range(in_threads):
thread = threading.Thread(target=process_incoming)
thread.start()
task_pool.append(thread)
for j in range(out_threads):
thread = threading.Thread(target=process_done)
thread.start()
done_pool.append(thread)
def stop_task_threads():
"""
A method to stop all threads running
"""
for i in range(len(task_pool)):
put(task_queue, None, FLAG_STOP)
while task_pool:
time.sleep(1)
for index, the_thread in enumerate(task_pool):
if the_thread.isAlive():
continue
else:
del task_pool[index]
break
def stop_done_threads():
"""
A method to stop all threads running
"""
for i in range(len(done_pool)):
put(done_queue, None, FLAG_STOP)
while done_pool:
time.sleep(1)
for index, the_thread in enumerate(done_pool):
if the_thread.isAlive():
continue
else:
del done_pool[index]
break
def put(target_queue, video, flag=FLAG_INIT):
"""
Method called to put an item into the queue with a flag
"""
try:
logger.debug("Putting item %s in queue at stage %d" % (video.in_file, flag))
target_queue.put( [ flag, video ] )
except:
logger.error("Failed to put item %s into queue" % video.in_file)
def process_incoming():
"""
This method is the main thread method for taking incoming items
in a post request and handing them off to be processed.
"""
flag = FLAG_INIT
while flag != FLAG_STOP:
try:
#Get the flag and the video from the queue
flag, video = task_queue.get()
logger.debug("Processing %s at stage %d" % (video.in_file, flag))
#Ok, now lets check the flag
if flag == FLAG_STAGE_1:
do_subprocess(video)
else:
logger.error("Unknown flag received in incoming queue")
except:
pass
def process_done():
"""
This method is the main thread method for passing out done items
"""
flag = FLAG_INIT
while flag != FLAG_STOP:
try:
#Get the flag and the video from the queue
flag, video = done_queue.get()
logger.debug("Processing %s at stage %d" % (video.in_file, flag))
#Ok, now lets check the flag
if flag == FLAG_STAGE_2:
report_status(video)
else:
logger.error("Unknown flag received in done queue")
except:
pass
def do_subprocess(video, report=True,
video_path=VIDEO_PATH, output_path=OUTPUT_PATH):
video.success = False
try:
in_file = "%s%s" % (video_path, video.in_file)
out_file = "%s%s" % (output_path, video.out_file)
command_line = ENCODER_ONE_PASS % (in_file, out_file,
video.abr_rb, video.flv_bitrate,
video.srate)
logger.debug("%s" % command_line)
ret = subprocess.Popen(command_line,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
close_fds=True)
#ret.wait()
results = ret.communicate()[0]
video.success = True
except:
logger.error("There was an error processing the video using the following: %s" %
command_line)
pass
if report:
put(done_queue, video, FLAG_STAGE_2)
def report_status(video):
if video.report_attempt < 3:
try:
data = urllib.urlencode(video.getObject())
req = urllib2.Request(POST_URL, data)
response = urllib2.urlopen(req)
logger.debug("Successfully reported back %s" % video.in_file)
except:
video.report_attempt += 1
time.sleep(5)
put(done_queue, video, FLAG_STAGE_2)
else:
logger.error("Failed to report back video %s" % video.in_file)
###############################################################################
# 3. Web.py Classes and Methods #
###############################################################################
class QueueIncoming:
def GET(self):
try:
i = web.input(in_file="None")
video = Video(in_file=i.in_file, abr_rb=i.abr_rb,
flv_bitrate=i.flv_bitrate, vpass=i.vpass,
srate=i.srate)
put(task_queue, video, FLAG_STAGE_1)
except:
logger.error("Get Failed")
return "%" % "Received"
def POST(self):
try:
i = web.input(in_file="None", abr_rb=DEFAULT_ABR_RB,
flv_bitrate=DEFAULT_FLV_BITRATE, vpass=DEFAULT_VPASS,
srate=DEFAULT_SRATE)
video = Video(in_file=i.in_file, abr_rb=i.abr_rb,
flv_bitrate=i.flv_bitrate, vpass=i.vpass,
srate=i.srate)
put(task_queue, video, FLAG_STAGE_1)
except:
logger.error("Post Failed")
return "%s" % "Received"
"""
3. Video Class
"""
class Video:
def __init__(self, **kwargs):
self.in_file = kwargs['in_file']
self.success = False
self.report_attempt = 0
self.abr_rb = kwargs['abr_rb']
self.flv_bitrate = kwargs['flv_bitrate']
self.vpass = kwargs['vpass']
self.srate = kwargs['srate']
self.splitInFile()
def __str__(self):
return "%s" % self.getObject()
def getObject(self):
values = {
"in_file": self.in_file,
"out_file": self.out_file,
"success": self.success,
"report_attempt": self.report_attempt,
"abr_rb": self.abr_rb,
"flv_bitrate": self.flv_bitrate,
"vpass": self.vpass,
"srate": self.srate
}
return values
def splitInFile(self):
words = self.in_file.split(".")
ext = words[len(words) - 1]
self.out_file = words[0] + ".flv"
self.in_ext = ext
"""
4. The Main Daemon class
"""
class Daemon(object):
"""
The base vm_encoder deamon
Usage: subclass the Daemon class and override the run() method
"""
def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.pidfile = pidfile
def daemonize(self):
"""
do the UNIX double-fork magic, see Stevens' "Advanced
Programming in the UNIX Environment" for details (ISBN 0201563177)
http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
"""
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = file(self.stdin, 'r')
so = file(self.stdout, 'a+')
se = file(self.stderr, 'a+', 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
# write pidfile
atexit.register(self.delpid)
pid = str(os.getpid())
file(self.pidfile,'w+').write("%s\n" % pid)
def delpid(self):
os.remove(self.pidfile)
def start(self):
"""
Start the daemon
"""
# Check for a pidfile to see if the daemon already runs
try:
pf = file(self.pidfile,'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if pid:
message = "pidfile %s already exist. Daemon already running?\n"
sys.stderr.write(message % self.pidfile)
sys.exit(1)
# Start the daemon
self.daemonize()
self.run()
def stop(self):
"""
Stop the daemon
"""
# Get the pid from the pidfile
try:
pf = file(self.pidfile,'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if not pid:
message = "pidfile %s does not exist. Daemon not running?\n"
sys.stderr.write(message % self.pidfile)
return # not an error in a restart
# Try killing the daemon process
try:
while 1:
os.kill(pid, SIGTERM)
time.sleep(0.1)
except OSError, err:
err = str(err)
if err.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print str(err)
sys.exit(1)
def restart(self):
"""
Restart the daemon
"""
self.stop()
self.start()
def run(self):
"""
You should override this method when you subclass Daemon. It will be called
after the process has been daemonized by start() or restart().
"""
"""
5. Startup stuff
"""
def setup_logging():
"""
This method is called from the line below and sets up a global logger to
use within the application
"""
logger = logging.getLogger(APP_ID)
hdlr = logging.FileHandler(APP_LOGGING)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(LOGGING_LEVEL)
return logger
logger = setup_logging()
class VideoQueue(Daemon):
"""
This class overides the daemon run method and allows us to execute code after
the daemon has been created
"""
def run(self):
"""
In this method we create the threading and web.py app
"""
try:
start_threads(MAX_THREADS, MAX_THREADS)
app = web.run(URLS, globals())
except:
logger.critical("Cannot start web app or threads, daemon not started")
stop_threads()
sys.exit(1)
def start(self):
logger.info("Starting Daemon")
super(VideoQueue, self).start()
def stop(self):
logger.info("Stopping Daemon")
stop_task_threads()
stop_done_threads()
super(VideoQueue, self).stop()
def main():
"""
We deal with the incoming arguments on startup
"""
video_queue = VideoQueue(PID_FILE)
p = optparse.OptionParser("usage: %prog [options] arg", version="%s %s" %
(__str__, __version__))
p.add_option('-d', '--start', help="Starts the application as a Daemon",
action="store_true", default=False)
p.add_option('-s', '--stop', help="Stops the application runnin as a Daemon",
action="store_true", default=False)
p.add_option('-r', '--restart', help="Restarts Daemon", action="store_true",
default=False)
p.add_option('-p', '--port', help="Allows you to overide the port the Daemon runs on",
default=PORT, type="string", dest="port")
(options, args) = p.parse_args()
sys.argv[1] = options.port
if options.start:
video_queue.start()
elif options.stop:
video_queue.stop()
elif options.restart:
video_queue.restart()
elif options.in_file:
do_offline(options.in_file)
else:
p.error("Unknown command, use -h to get help")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment