Skip to content

Instantly share code, notes, and snippets.

@jeremyvisser
Created September 12, 2013 12:01
Show Gist options
  • Save jeremyvisser/6536274 to your computer and use it in GitHub Desktop.
Save jeremyvisser/6536274 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import sys
import time
import requests
from urllib.parse import urljoin
class HLSPipe:
""" HLSPipe — HTTP Live Streaming Pipe
Example usage:
h = HLSPipe('http://server/stream.m3u8', sys.stdout)
h.run()
Or from the command line:
$ ./hlspipe.py http://server/stream.m3u8 > stream.ts
"""
queue = []
archive = [] # second queue to catch duplicates
url = ''
output = None
debug = False
def __init__(self, url, output=sys.stdout, debug=False):
""" Constructor
Does practically nothing, except accepting three parameters:
url pass a normal HTTP or HTTPS URL
output pass a stream handle (sys.stdout works well)
debug only specify this if you want trouble
"""
self.url = url
self.output = output
self.debug = debug
def _log(self, message):
sys.stderr.write('%s\n' % message)
def run(self):
""" Download stream and write to output
Only returns if stream finishes for some reason.
"""
while True:
p = self.playlist()
if self.queue_segments(p['segments']) == 0 and len(self.queue) == 0:
# Didn't download many segments -- must be running ahead. Sleep.
self._log("Got no segments, sleeping...")
if '#EXT-X-TARGETDURATION' in p.keys():
time.sleep(int(p['#EXT-X-TARGETDURATION'])/2)
else:
time.sleep(2)
num_processed = 0
while len(self.queue) > 0:
self.process_segment()
num_processed += 1
if num_processed > len(p['segments'])/2:
break
def queue_segments(self, segments):
""" Queues all segments passed as a list in argument.
RFC states that initial connection should discard all but last three segments.
Blissfully ignoring that.
"""
num = 0
for s in segments:
if not s in self.queue and not s in self.archive: # don't add duplicate segments
self._log("Queueing segment URL %s" % s['url'])
self.queue.append(s)
num += 1
while len(self.archive) > len(segments) * 3:
self.archive.pop(0)
return num # number of segments added in this run
def process_segment(self):
if len(self.queue) == 0:
return
segment = self.queue.pop(0)
self.archive.append(segment)
self._log("Processing segment URL %s" % segment['url'])
r = requests.get(segment['url'])
self.output.buffer.write(r.content)
def playlist(self):
""" Downloads and parses the EXTM3U playlist associated with the class
Fairly naïve parsing, but on the other hand, being liberal in what you
accept tends to be a virtue.
"""
output = {
'segments' : []
}
self._log("Polling playlist %s" % self.url)
r = requests.get(self.url)
r.raise_for_status() # raise exception if it didn't work
expecting_url = False
for l in r.text.replace('\r\n','\n').split('\n'):
if l == '':
continue
# This is the line after an "#EXTINF" tag
if expecting_url:
output['segments'].append({
'duration' : expecting_duration,
'title' : expecting_title,
'url' : urljoin(self.url, l) # urljoin() allows for relative URLs
})
expecting_url = False
continue
# Check for standard tags first
# Extended M3U Header
if l == '#EXTM3U':
continue
# Media Segment
# #EXTINF:10,
if l.find('#EXTINF:') == 0:
expecting_url = True
expecting_duration = int(l.split(':')[1].split(',')[0])
expecting_title = l.split(':')[1].split(',')[1]
continue
# Now check for new tags
# New Tags
# #EXT-X-TARGETDURATION:10
# #EXT-X-MEDIA-SEQUENCE:225
if l.find('#EXT-X-') == 0:
(k,v) = l.split(':')
output[k] = v
return output
if __name__=='__main__':
h = HLSPipe(url=sys.argv[1], output=sys.stdout)
try:
h.run()
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment