Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@jasonbyrne
Last active June 15, 2018 21:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jasonbyrne/cc4a48a4e0ecbbafbace662aeb241f57 to your computer and use it in GitHub Desktop.
Save jasonbyrne/cc4a48a4e0ecbbafbace662aeb241f57 to your computer and use it in GitHub Desktop.
Watch an HLS stream on the best rendition and publish an event whenever a new segment arrives. Then optionally upload it to S3.
let request = require('request');
let validUrl = require('valid-url');
class PubSub {
protected subscribers: { [key: string]: Function } = {};
public subscribe(callback: Function): string {
let key: string = Date.now() + '_' + Math.ceil(Math.random() * 10000);
this.subscribers[key] = callback;
return key;
}
public unsubscribe(key: string): void {
delete this.subscribers[key];
}
public publish(payload: any, thisObject?: any): void {
let me: PubSub = this,
keys: Array<string> = Object.keys(this.subscribers);
keys.forEach(function(key: string) {
if (thisObject) {
me.subscribers[key].call(thisObject, payload);
}
else {
me.subscribers[key](payload);
}
});
}
}
export class Segment {
public rendition: Rendition;
public url: string;
public timestamp: number;
constructor(url: string) {
this.url = url;
this.timestamp = Date.now();
}
public getFileName(): string {
return this.url.split('?')[0].substr(this.url.lastIndexOf('/') + 1);
}
}
class Rendition {
public bitRate: number;
public url: string;
public resolution: string;
constructor(url?: string) {
this.url = url;
}
}
class M3U8 {
protected lines: Array<M3U8Line> = [];
constructor(data: string) {
let me: M3U8 = this;
data.split("\n").forEach(function(line: string) {
me.lines.push(new M3U8Line(line));
});
}
public forEachLine(callback: Function): void {
this.lines.forEach(function(line: M3U8Line) {
callback(line);
});
}
public getLine(i: number): M3U8Line {
return this.lines[i];
}
public isPlaylist(): boolean {
let is: boolean = false;
this.lines.some(function(line: M3U8Line) {
if (line.isM3U8()) {
is = true;
return true;
}
return false;
});
return is;
}
public isChunkList(): boolean {
let is: boolean = false;
this.lines.some(function(line: M3U8Line) {
if (line.isTS()) {
is = true;
return true;
}
return false;
});
return is;
}
public getAllRenditions(): Array<Rendition> {
let renditions: Array<Rendition> = [],
rendition: Rendition = null;
this.lines.forEach(function(line: M3U8Line) {
if (line.isStreamInfo()) {
rendition = new Rendition(null);
rendition.bitRate = parseFloat(line.get('bandwidth'));
rendition.resolution = line.get('resolution');
}
else if (line.isM3U8()) {
rendition.url = line.get('manifest');
renditions.push(rendition);
}
});
return renditions;
}
public getBestRendition(): Rendition {
let lastRendition: Rendition = null,
bestRendition: Rendition = null;
this.lines.forEach(function(line: M3U8Line) {
if (line.isStreamInfo()) {
lastRendition = new Rendition();
lastRendition.bitRate = parseInt(line.get('bandwidth'));
lastRendition.resolution = line.get('resolution');
}
else if (line.isM3U8()) {
if (bestRendition === null || lastRendition.bitRate > bestRendition.bitRate) {
bestRendition = new Rendition();
bestRendition.bitRate = lastRendition.bitRate;
bestRendition.resolution = lastRendition.resolution;
bestRendition.url = line.get('manifest');
}
lastRendition = null;
}
});
return bestRendition;
}
}
class M3U8Line {
protected input: string = null;
protected properties: Object = {};
constructor(line: string) {
this.input = line;
// What is this line?
if (this.isStreamInfo()) {
this.extractStreamInfo();
}
else if (this.isM3U8()) {
this.set('manifest', this.input);
}
else if (this.isTS()) {
this.set('segment', this.input);
}
}
public get(key: string): string {
return this.properties[key];
}
public set(key: string, value: string): void {
this.properties[key] = value;
}
public isMeta(): boolean {
return (this.input.charAt(0) == '#');
}
public isStreamInfo(): boolean {
return !!(this.input.match(/^#EXT-X-STREAM-INF/));
}
public isTS(): boolean {
return !!(this.input.match(/\.ts/));
}
public isM3U8(): boolean {
return !!(this.input.match(/\.m3u8/));
}
protected extractStreamInfo(): void {
// #EXT-X-STREAM-INF:BANDWIDTH=2128000,RESOLUTION=1280x720
let me: M3U8Line = this,
line: string = this.input.substr(this.input.indexOf(':') + 1),
params: Array<string> = line.split(',');
params.forEach(function(param: string) {
let paramPair: Array<string> = param.split('=');
me.set(paramPair[0].toLocaleLowerCase(), paramPair[1]);
});
}
}
export class Listener {
protected streamUrl: string = null;
protected rendition: Rendition = null;
protected lastUpdate: Date = null;
protected checkInterval = null;
protected checkFrequency = 10000;
protected segmentLog: Array<string> = [];
protected _onSegment: PubSub = new PubSub();
constructor(url: string) {
// Check input
if (!validUrl.isUri(url)) {
throw Error("This is not a valid URL.");
}
this.setStreamUrl(url);
}
protected setStreamUrl(url: string): void {
this.streamUrl = url;
}
protected static isAbsolute(url: string): boolean {
return !!(url.match(/^https?:\/\//));
}
protected static getBasePath(url: string): string {
return url.split('?')[0].substr(0, url.lastIndexOf('/') + 1);
}
protected static absolutify(url: string, basePath: string) {
if (!Listener.isAbsolute(url)) {
url = basePath + url;
}
return url;
}
protected read(): void {
let me: Listener = this;
request(this.streamUrl, function(err, res, body) {
let m3u8: M3U8 = new M3U8(body);
// If this is our playlist, containing other m3u8 renditions
if (m3u8.isPlaylist()) {
me.stop();
me.rendition = m3u8.getBestRendition();
me.rendition.url = Listener.absolutify(
me.rendition.url,
Listener.getBasePath(me.streamUrl)
);
me.setStreamUrl(me.rendition.url);
me.check();
}
// If this is our chunklist, containing TS segments
else if (m3u8.isChunkList()) {
m3u8.forEachLine(function(line: M3U8Line) {
if (line.isTS()) {
let segmentUrl: string = Listener.absolutify(line.get('segment'), Listener.getBasePath(me.streamUrl));
if (me.segmentLog.indexOf(segmentUrl) < 0) {
me.segmentLog.push(segmentUrl);
let segment: Segment = new Segment(segmentUrl);
segment.rendition = me.rendition;
me._onSegment.publish(segment);
}
}
});
if (!me.checkInterval) {
me.checkInterval = setInterval(function() {
me.check.call(me);
}, me.checkFrequency);
}
}
// Don't know what the hell this is
else {
me.stop();
}
});
}
protected check(): void {
let me: Listener = this;
request.head(this.streamUrl, function(err, res) {
let fileDate: Date = new Date(res.headers['date']);
if (fileDate != me.lastUpdate) {
me.lastUpdate = fileDate;
me.read();
}
});
}
public onSegment(callback: Function): void {
this._onSegment.subscribe(callback);
}
public start(frequencyMilliseconds?: number): void {
this.stop();
if (typeof frequencyMilliseconds != 'undefined') {
this.checkFrequency = frequencyMilliseconds;
}
this.check();
}
public stop(): void {
clearInterval(this.checkInterval);
this.checkInterval = null;
}
}
import { Uploader } from "./lib/uploader";
import { Listener, Segment } from "./lib/hls";
let weatherChannel: string = 'http://cdnapi.kaltura.com/p/931702/sp/93170200/playManifest/entryId/1_oorxcge2/format/applehttp/protocol/http/uiConfId/28428751/a.m3u8?responseFormat=m3u8';
let bucketName: string = 'stream-archive-input-test';
let listener: Listener = new Listener(weatherChannel);
let uploader: Uploader = new Uploader(bucketName);
listener.onSegment(function(segment: Segment) {
uploader.pushQ(segment);
uploader.upload();
});
listener.start(7000);
import { Segment } from "./hls";
// AWS init
let AWS = require('aws-sdk');
AWS.config.loadFromPath('./config.json');
let request = require('request');
export class Uploader {
private isUploading: boolean = false;
private bucketName: string = null;
private uploadQ: Array<Segment> = [];
constructor(bucketName: string) {
this.bucketName = bucketName;
}
public pushQ(segment: Segment) {
this.uploadQ.push(segment);
}
public upload() {
let me = this;
if (this.uploadQ.length && !this.isUploading) {
this.isUploading = true;
let segment = this.uploadQ.shift();
request(segment.url, function(err, res, body) {
let s3obj = new AWS.S3({
params: {
"Bucket": me.bucketName,
"Key": segment.getFileName()
}
});
console.log('Downloaded ' + segment.getFileName());
s3obj.upload({ Body: body })
.on('httpUploadProgress', function(evt) {
console.log('progress', segment.getFileName());
})
.send(function(err, data) {
if (err) {
console.log('error', err);
}
else {
console.log('success', data);
}
me.isUploading = false;
me.upload();
});
});
}
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment