Last active
June 15, 2018 21:37
-
-
Save jasonbyrne/cc4a48a4e0ecbbafbace662aeb241f57 to your computer and use it in GitHub Desktop.
Watch an HLS stream on the best rendition and publish an event whenever a new segment arrives. Then optionally upload it to S3.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let request = require('request'); | |
let validUrl = require('valid-url'); | |
class PubSub { | |
protected subscribers: { [key: string]: Function } = {}; | |
public subscribe(callback: Function): string { | |
let key: string = Date.now() + '_' + Math.ceil(Math.random() * 10000); | |
this.subscribers[key] = callback; | |
return key; | |
} | |
public unsubscribe(key: string): void { | |
delete this.subscribers[key]; | |
} | |
public publish(payload: any, thisObject?: any): void { | |
let me: PubSub = this, | |
keys: Array<string> = Object.keys(this.subscribers); | |
keys.forEach(function(key: string) { | |
if (thisObject) { | |
me.subscribers[key].call(thisObject, payload); | |
} | |
else { | |
me.subscribers[key](payload); | |
} | |
}); | |
} | |
} | |
export class Segment { | |
public rendition: Rendition; | |
public url: string; | |
public timestamp: number; | |
constructor(url: string) { | |
this.url = url; | |
this.timestamp = Date.now(); | |
} | |
public getFileName(): string { | |
return this.url.split('?')[0].substr(this.url.lastIndexOf('/') + 1); | |
} | |
} | |
class Rendition { | |
public bitRate: number; | |
public url: string; | |
public resolution: string; | |
constructor(url?: string) { | |
this.url = url; | |
} | |
} | |
class M3U8 { | |
protected lines: Array<M3U8Line> = []; | |
constructor(data: string) { | |
let me: M3U8 = this; | |
data.split("\n").forEach(function(line: string) { | |
me.lines.push(new M3U8Line(line)); | |
}); | |
} | |
public forEachLine(callback: Function): void { | |
this.lines.forEach(function(line: M3U8Line) { | |
callback(line); | |
}); | |
} | |
public getLine(i: number): M3U8Line { | |
return this.lines[i]; | |
} | |
public isPlaylist(): boolean { | |
let is: boolean = false; | |
this.lines.some(function(line: M3U8Line) { | |
if (line.isM3U8()) { | |
is = true; | |
return true; | |
} | |
return false; | |
}); | |
return is; | |
} | |
public isChunkList(): boolean { | |
let is: boolean = false; | |
this.lines.some(function(line: M3U8Line) { | |
if (line.isTS()) { | |
is = true; | |
return true; | |
} | |
return false; | |
}); | |
return is; | |
} | |
public getAllRenditions(): Array<Rendition> { | |
let renditions: Array<Rendition> = [], | |
rendition: Rendition = null; | |
this.lines.forEach(function(line: M3U8Line) { | |
if (line.isStreamInfo()) { | |
rendition = new Rendition(null); | |
rendition.bitRate = parseFloat(line.get('bandwidth')); | |
rendition.resolution = line.get('resolution'); | |
} | |
else if (line.isM3U8()) { | |
rendition.url = line.get('manifest'); | |
renditions.push(rendition); | |
} | |
}); | |
return renditions; | |
} | |
public getBestRendition(): Rendition { | |
let lastRendition: Rendition = null, | |
bestRendition: Rendition = null; | |
this.lines.forEach(function(line: M3U8Line) { | |
if (line.isStreamInfo()) { | |
lastRendition = new Rendition(); | |
lastRendition.bitRate = parseInt(line.get('bandwidth')); | |
lastRendition.resolution = line.get('resolution'); | |
} | |
else if (line.isM3U8()) { | |
if (bestRendition === null || lastRendition.bitRate > bestRendition.bitRate) { | |
bestRendition = new Rendition(); | |
bestRendition.bitRate = lastRendition.bitRate; | |
bestRendition.resolution = lastRendition.resolution; | |
bestRendition.url = line.get('manifest'); | |
} | |
lastRendition = null; | |
} | |
}); | |
return bestRendition; | |
} | |
} | |
class M3U8Line { | |
protected input: string = null; | |
protected properties: Object = {}; | |
constructor(line: string) { | |
this.input = line; | |
// What is this line? | |
if (this.isStreamInfo()) { | |
this.extractStreamInfo(); | |
} | |
else if (this.isM3U8()) { | |
this.set('manifest', this.input); | |
} | |
else if (this.isTS()) { | |
this.set('segment', this.input); | |
} | |
} | |
public get(key: string): string { | |
return this.properties[key]; | |
} | |
public set(key: string, value: string): void { | |
this.properties[key] = value; | |
} | |
public isMeta(): boolean { | |
return (this.input.charAt(0) == '#'); | |
} | |
public isStreamInfo(): boolean { | |
return !!(this.input.match(/^#EXT-X-STREAM-INF/)); | |
} | |
public isTS(): boolean { | |
return !!(this.input.match(/\.ts/)); | |
} | |
public isM3U8(): boolean { | |
return !!(this.input.match(/\.m3u8/)); | |
} | |
protected extractStreamInfo(): void { | |
// #EXT-X-STREAM-INF:BANDWIDTH=2128000,RESOLUTION=1280x720 | |
let me: M3U8Line = this, | |
line: string = this.input.substr(this.input.indexOf(':') + 1), | |
params: Array<string> = line.split(','); | |
params.forEach(function(param: string) { | |
let paramPair: Array<string> = param.split('='); | |
me.set(paramPair[0].toLocaleLowerCase(), paramPair[1]); | |
}); | |
} | |
} | |
export class Listener { | |
protected streamUrl: string = null; | |
protected rendition: Rendition = null; | |
protected lastUpdate: Date = null; | |
protected checkInterval = null; | |
protected checkFrequency = 10000; | |
protected segmentLog: Array<string> = []; | |
protected _onSegment: PubSub = new PubSub(); | |
constructor(url: string) { | |
// Check input | |
if (!validUrl.isUri(url)) { | |
throw Error("This is not a valid URL."); | |
} | |
this.setStreamUrl(url); | |
} | |
protected setStreamUrl(url: string): void { | |
this.streamUrl = url; | |
} | |
protected static isAbsolute(url: string): boolean { | |
return !!(url.match(/^https?:\/\//)); | |
} | |
protected static getBasePath(url: string): string { | |
return url.split('?')[0].substr(0, url.lastIndexOf('/') + 1); | |
} | |
protected static absolutify(url: string, basePath: string) { | |
if (!Listener.isAbsolute(url)) { | |
url = basePath + url; | |
} | |
return url; | |
} | |
protected read(): void { | |
let me: Listener = this; | |
request(this.streamUrl, function(err, res, body) { | |
let m3u8: M3U8 = new M3U8(body); | |
// If this is our playlist, containing other m3u8 renditions | |
if (m3u8.isPlaylist()) { | |
me.stop(); | |
me.rendition = m3u8.getBestRendition(); | |
me.rendition.url = Listener.absolutify( | |
me.rendition.url, | |
Listener.getBasePath(me.streamUrl) | |
); | |
me.setStreamUrl(me.rendition.url); | |
me.check(); | |
} | |
// If this is our chunklist, containing TS segments | |
else if (m3u8.isChunkList()) { | |
m3u8.forEachLine(function(line: M3U8Line) { | |
if (line.isTS()) { | |
let segmentUrl: string = Listener.absolutify(line.get('segment'), Listener.getBasePath(me.streamUrl)); | |
if (me.segmentLog.indexOf(segmentUrl) < 0) { | |
me.segmentLog.push(segmentUrl); | |
let segment: Segment = new Segment(segmentUrl); | |
segment.rendition = me.rendition; | |
me._onSegment.publish(segment); | |
} | |
} | |
}); | |
if (!me.checkInterval) { | |
me.checkInterval = setInterval(function() { | |
me.check.call(me); | |
}, me.checkFrequency); | |
} | |
} | |
// Don't know what the hell this is | |
else { | |
me.stop(); | |
} | |
}); | |
} | |
protected check(): void { | |
let me: Listener = this; | |
request.head(this.streamUrl, function(err, res) { | |
let fileDate: Date = new Date(res.headers['date']); | |
if (fileDate != me.lastUpdate) { | |
me.lastUpdate = fileDate; | |
me.read(); | |
} | |
}); | |
} | |
public onSegment(callback: Function): void { | |
this._onSegment.subscribe(callback); | |
} | |
public start(frequencyMilliseconds?: number): void { | |
this.stop(); | |
if (typeof frequencyMilliseconds != 'undefined') { | |
this.checkFrequency = frequencyMilliseconds; | |
} | |
this.check(); | |
} | |
public stop(): void { | |
clearInterval(this.checkInterval); | |
this.checkInterval = null; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Uploader } from "./lib/uploader"; | |
import { Listener, Segment } from "./lib/hls"; | |
let weatherChannel: string = 'http://cdnapi.kaltura.com/p/931702/sp/93170200/playManifest/entryId/1_oorxcge2/format/applehttp/protocol/http/uiConfId/28428751/a.m3u8?responseFormat=m3u8'; | |
let bucketName: string = 'stream-archive-input-test'; | |
let listener: Listener = new Listener(weatherChannel); | |
let uploader: Uploader = new Uploader(bucketName); | |
listener.onSegment(function(segment: Segment) { | |
uploader.pushQ(segment); | |
uploader.upload(); | |
}); | |
listener.start(7000); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Segment } from "./hls"; | |
// AWS init | |
let AWS = require('aws-sdk'); | |
AWS.config.loadFromPath('./config.json'); | |
let request = require('request'); | |
export class Uploader { | |
private isUploading: boolean = false; | |
private bucketName: string = null; | |
private uploadQ: Array<Segment> = []; | |
constructor(bucketName: string) { | |
this.bucketName = bucketName; | |
} | |
public pushQ(segment: Segment) { | |
this.uploadQ.push(segment); | |
} | |
public upload() { | |
let me = this; | |
if (this.uploadQ.length && !this.isUploading) { | |
this.isUploading = true; | |
let segment = this.uploadQ.shift(); | |
request(segment.url, function(err, res, body) { | |
let s3obj = new AWS.S3({ | |
params: { | |
"Bucket": me.bucketName, | |
"Key": segment.getFileName() | |
} | |
}); | |
console.log('Downloaded ' + segment.getFileName()); | |
s3obj.upload({ Body: body }) | |
.on('httpUploadProgress', function(evt) { | |
console.log('progress', segment.getFileName()); | |
}) | |
.send(function(err, data) { | |
if (err) { | |
console.log('error', err); | |
} | |
else { | |
console.log('success', data); | |
} | |
me.isUploading = false; | |
me.upload(); | |
}); | |
}); | |
} | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment