Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

Last active June 15, 2018 21:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jasonbyrne/cc4a48a4e0ecbbafbace662aeb241f57 to your computer and use it in GitHub Desktop.
Save jasonbyrne/cc4a48a4e0ecbbafbace662aeb241f57 to your computer and use it in GitHub Desktop.
Watch an HLS stream on the best rendition and publish an event whenever a new segment arrives. Then optionally upload it to S3.
let request = require('request');
let validUrl = require('valid-url');
class PubSub {
protected subscribers: { [key: string]: Function } = {};
public subscribe(callback: Function): string {
let key: string = + '_' + Math.ceil(Math.random() * 10000);
this.subscribers[key] = callback;
return key;
public unsubscribe(key: string): void {
delete this.subscribers[key];
public publish(payload: any, thisObject?: any): void {
let me: PubSub = this,
keys: Array<string> = Object.keys(this.subscribers);
keys.forEach(function(key: string) {
if (thisObject) {
me.subscribers[key].call(thisObject, payload);
else {
export class Segment {
public rendition: Rendition;
public url: string;
public timestamp: number;
constructor(url: string) {
this.url = url;
this.timestamp =;
public getFileName(): string {
return this.url.split('?')[0].substr(this.url.lastIndexOf('/') + 1);
class Rendition {
public bitRate: number;
public url: string;
public resolution: string;
constructor(url?: string) {
this.url = url;
class M3U8 {
protected lines: Array<M3U8Line> = [];
constructor(data: string) {
let me: M3U8 = this;
data.split("\n").forEach(function(line: string) {
me.lines.push(new M3U8Line(line));
public forEachLine(callback: Function): void {
this.lines.forEach(function(line: M3U8Line) {
public getLine(i: number): M3U8Line {
return this.lines[i];
public isPlaylist(): boolean {
let is: boolean = false;
this.lines.some(function(line: M3U8Line) {
if (line.isM3U8()) {
is = true;
return true;
return false;
return is;
public isChunkList(): boolean {
let is: boolean = false;
this.lines.some(function(line: M3U8Line) {
if (line.isTS()) {
is = true;
return true;
return false;
return is;
public getAllRenditions(): Array<Rendition> {
let renditions: Array<Rendition> = [],
rendition: Rendition = null;
this.lines.forEach(function(line: M3U8Line) {
if (line.isStreamInfo()) {
rendition = new Rendition(null);
rendition.bitRate = parseFloat(line.get('bandwidth'));
rendition.resolution = line.get('resolution');
else if (line.isM3U8()) {
rendition.url = line.get('manifest');
return renditions;
public getBestRendition(): Rendition {
let lastRendition: Rendition = null,
bestRendition: Rendition = null;
this.lines.forEach(function(line: M3U8Line) {
if (line.isStreamInfo()) {
lastRendition = new Rendition();
lastRendition.bitRate = parseInt(line.get('bandwidth'));
lastRendition.resolution = line.get('resolution');
else if (line.isM3U8()) {
if (bestRendition === null || lastRendition.bitRate > bestRendition.bitRate) {
bestRendition = new Rendition();
bestRendition.bitRate = lastRendition.bitRate;
bestRendition.resolution = lastRendition.resolution;
bestRendition.url = line.get('manifest');
lastRendition = null;
return bestRendition;
class M3U8Line {
protected input: string = null;
protected properties: Object = {};
constructor(line: string) {
this.input = line;
// What is this line?
if (this.isStreamInfo()) {
else if (this.isM3U8()) {
this.set('manifest', this.input);
else if (this.isTS()) {
this.set('segment', this.input);
public get(key: string): string {
public set(key: string, value: string): void {[key] = value;
public isMeta(): boolean {
return (this.input.charAt(0) == '#');
public isStreamInfo(): boolean {
return !!(this.input.match(/^#EXT-X-STREAM-INF/));
public isTS(): boolean {
return !!(this.input.match(/\.ts/));
public isM3U8(): boolean {
return !!(this.input.match(/\.m3u8/));
protected extractStreamInfo(): void {
let me: M3U8Line = this,
line: string = this.input.substr(this.input.indexOf(':') + 1),
params: Array<string> = line.split(',');
params.forEach(function(param: string) {
let paramPair: Array<string> = param.split('=');
me.set(paramPair[0].toLocaleLowerCase(), paramPair[1]);
export class Listener {
protected streamUrl: string = null;
protected rendition: Rendition = null;
protected lastUpdate: Date = null;
protected checkInterval = null;
protected checkFrequency = 10000;
protected segmentLog: Array<string> = [];
protected _onSegment: PubSub = new PubSub();
constructor(url: string) {
// Check input
if (!validUrl.isUri(url)) {
throw Error("This is not a valid URL.");
protected setStreamUrl(url: string): void {
this.streamUrl = url;
protected static isAbsolute(url: string): boolean {
return !!(url.match(/^https?:\/\//));
protected static getBasePath(url: string): string {
return url.split('?')[0].substr(0, url.lastIndexOf('/') + 1);
protected static absolutify(url: string, basePath: string) {
if (!Listener.isAbsolute(url)) {
url = basePath + url;
return url;
protected read(): void {
let me: Listener = this;
request(this.streamUrl, function(err, res, body) {
let m3u8: M3U8 = new M3U8(body);
// If this is our playlist, containing other m3u8 renditions
if (m3u8.isPlaylist()) {
me.rendition = m3u8.getBestRendition();
me.rendition.url = Listener.absolutify(
// If this is our chunklist, containing TS segments
else if (m3u8.isChunkList()) {
m3u8.forEachLine(function(line: M3U8Line) {
if (line.isTS()) {
let segmentUrl: string = Listener.absolutify(line.get('segment'), Listener.getBasePath(me.streamUrl));
if (me.segmentLog.indexOf(segmentUrl) < 0) {
let segment: Segment = new Segment(segmentUrl);
segment.rendition = me.rendition;
if (!me.checkInterval) {
me.checkInterval = setInterval(function() {;
}, me.checkFrequency);
// Don't know what the hell this is
else {
protected check(): void {
let me: Listener = this;
request.head(this.streamUrl, function(err, res) {
let fileDate: Date = new Date(res.headers['date']);
if (fileDate != me.lastUpdate) {
me.lastUpdate = fileDate;;
public onSegment(callback: Function): void {
public start(frequencyMilliseconds?: number): void {
if (typeof frequencyMilliseconds != 'undefined') {
this.checkFrequency = frequencyMilliseconds;
public stop(): void {
this.checkInterval = null;
import { Uploader } from "./lib/uploader";
import { Listener, Segment } from "./lib/hls";
let weatherChannel: string = '';
let bucketName: string = 'stream-archive-input-test';
let listener: Listener = new Listener(weatherChannel);
let uploader: Uploader = new Uploader(bucketName);
listener.onSegment(function(segment: Segment) {
import { Segment } from "./hls";
// AWS init
let AWS = require('aws-sdk');
let request = require('request');
export class Uploader {
private isUploading: boolean = false;
private bucketName: string = null;
private uploadQ: Array<Segment> = [];
constructor(bucketName: string) {
this.bucketName = bucketName;
public pushQ(segment: Segment) {
public upload() {
let me = this;
if (this.uploadQ.length && !this.isUploading) {
this.isUploading = true;
let segment = this.uploadQ.shift();
request(segment.url, function(err, res, body) {
let s3obj = new AWS.S3({
params: {
"Bucket": me.bucketName,
"Key": segment.getFileName()
console.log('Downloaded ' + segment.getFileName());
s3obj.upload({ Body: body })
.on('httpUploadProgress', function(evt) {
console.log('progress', segment.getFileName());
.send(function(err, data) {
if (err) {
console.log('error', err);
else {
console.log('success', data);
me.isUploading = false;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment