Skip to content

Instantly share code, notes, and snippets.

@sinkers
Created July 14, 2014 04:50
Show Gist options
  • Star 25 You must be signed in to star a gist
  • Fork 12 You must be signed in to fork a gist
  • Save sinkers/d647a80fdb180b4cc3a6 to your computer and use it in GitHub Desktop.
Save sinkers/d647a80fdb180b4cc3a6 to your computer and use it in GitHub Desktop.
Script for live video streaming monitoring with notifications
#!/usr/bin/python
'''
Script to monitor live streams and send an Amazon SNS if the stream is down (and possibly take restorative action)
Future:
Black detect: ffmpeg -i out.mp4 -vf blackdetect -f null -
Note the following doesn't seem to fully work without looking at the debug logs
On live ffmpeg -y -i rtmp://cp30129.live.edgefcs.net/live/videoops-videoops@50541 -vf blackdetect -t 10 -loglevel debug -f null -
'''
from boto import boto
import sys
import os
from subprocess import PIPE, Popen
from threading import Thread
import time
import logging
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty # python 3.x
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(message)s',
handlers=[logging.FileHandler("encoder.log"),
logging.StreamHandler()])
logging.info("Starting")
# You will need to go into your Amazon Console and create a new topic ARN for this
AWS_TOPIC_ARN = ""
AWS_KEY = ""
AWS_SECRET = ""
# Set how often you want to check in seconds
SLEEP_TIME = 30
FFPROBE = "/usr/local/bin/ffprobe"
# List of streams to check
CHECK_STREAMS = ['http://wowsyd.sinclairmediatech.com/live/canberra/playlist.m3u8', 'http://repackage.video.news.com.au/live/canberra/playlist.m3u8']
ON_POSIX = 'posix' in sys.builtin_module_names
def process_line(std, q):
partialLine = ""
tmpLines=[]
end_of_message = False
while (True):
data = std.read(10)
#print repr(data)
#break when there is no more data
if len(data) == 0:
end_of_message = True
#data needs to be added to previous line
if ((not "\n" in data) and (not end_of_message)):
partialLine += data
#lines are terminated in this string
else:
tmpLines = []
#split by \n
split = data.split("\n")
#add the rest of partial line to first item of array
if partialLine != "":
split[0] = partialLine + split[0]
partialLine = ""
#add every item apart from last to tmpLines array
if len(split) > 1:
for i in range(len(split)-1):
tmpLines.append(split[i])
#last item is '' if data string ends in \r
#last line is partial, save for temporary storage
if split[-1] != "":
partialLine = split[-1]
#last line is terminated
else:
tmpLines.append(split[-1])
#print split[0]
q.put(split[0])
if (end_of_message):
#print partialLine
break
def enqueue_output(stdout, queue):
#for line in iter(stdout.readline, b''):
# queue.put(line)
process_line(stdout, queue)
stdout.close()
def run(q, ffcmd, thread_name):
logging.info("Running " + ffcmd)
p = Popen(ffcmd,shell=True, stderr=PIPE, stdin=PIPE, bufsize=1, close_fds=ON_POSIX)
#q = Queue()
#t = Thread(target=enqueue_output, args=(p.stdout, q))
#t.daemon = True # thread dies with the program
#t.start()
t = Thread(target=enqueue_output, name=thread_name, args=(p.stderr, q))
t.daemon = True
t.start()
return p
def probe(stream):
probeq = Queue()
logging.info("Checking " + stream)
probeproc = run(probeq, FFPROBE + " " + stream, "probethread")
# Need to read from the queue until the queue is empty and process has exited
while (True):
#print "Queue not empty"
# print probeq.join()
try:
line = probeq.get_nowait()
logging.info(line)
'''
Possible error responses:
Server error: Failed to play stream
Input/output error
Need a regex to match for video found!
Sample: Stream #0:0: Video: h264 (Baseline), yuv420p, 640x360 [SAR 1:1 DAR 16:9], 655 kb/s, 25 tbr, 1k tbn, 50 tbc
Should also look for audio
'''
if ("Stream #0:0: Video" in line):
logging.info("Found stream " + stream)
logging.info(line)
return True
if ("Stream #0:1: Video" in line):
logging.info("Found stream " + stream)
logging.info(line)
return True
elif ("error" in line):
return False
elif (probeproc.poll() > 0):
return False
# Make sure we don't spin out of control
#time.sleep(1)
except Empty:
pass
return False
def send_message(msg):
sns = boto.connect_sns(aws_access_key_id=AWS_KEY, aws_secret_access_key=AWS_SECRET)
print sns
subj = "Problem with streaming"
res = sns.publish(AWS_TOPIC_ARN, msg, subj)
def live_probe():
pass
# Endless loop for monitoring
while(True):
for stream in CHECK_STREAMS:
if (probe(stream)):
pass
else:
logging.info("Error with " + stream + " sending alert")
'''
This needs to check if there has already been an error message sent
If so it shouldn't send another one for x mins
Also needs to send a message when the problem is cleared
'''
send_message(stream)
time.sleep(SLEEP_TIME)
@Gushtera
Copy link

Hello,
Im having some strange problem can you guide me where Im doing something wrong?

2016-06-15 17:35:28,512 Response headers: [('x-amzn-requestid', 'ab58745e-1265-5954-839d-3b857b95abdf'), ('date', 'Wed, 15 Jun 2016 14:35:27 GMT'), ('content-length', '144'), ('content-type', 'application/json')]
2016-06-15 17:35:28,513 {"Error":{"Code":"InvalidParameter","Message":"Invalid parameter: TopicArn","Type":"Sender"},"RequestId":"ab58745e-1265-5954-839d-3b857b95abdf"}
2016-06-15 17:35:28,513 400 Bad Request
2016-06-15 17:35:28,514 {"Error":{"Code":"InvalidParameter","Message":"Invalid parameter: TopicArn","Type":"Sender"},"RequestId":"ab58745e-1265-5954-839d-3b857b95abdf"}
Traceback (most recent call last):
File "livestream-monitoring.py", line 177, in
send_message(stream)
File "livestream-monitoring.py", line 159, in send_message
res = sns.publish(AWS_TOPIC_ARN, msg, subj)
File "/usr/lib/python2.7/site-packages/boto/sns/connection.py", line 290, in publish
return self._make_request('Publish', params, '/', 'POST')
File "/usr/lib/python2.7/site-packages/boto/sns/connection.py", line 765, in _make_request
raise self.ResponseError(response.status, response.reason, body)
boto.exception.BotoServerError: BotoServerError: 400 Bad Request
{"Error":{"Code":"InvalidParameter","Message":"Invalid parameter: TopicArn","Type":"Sender"},"RequestId":"ab58745e-1265-5954-839d-3b857b95abdf"}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment