Skip to content

Instantly share code, notes, and snippets.

@satori99
Last active April 17, 2024 09:36
Show Gist options
  • Star 24 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save satori99/2cb06938bfe8532ecc00e60906509052 to your computer and use it in GitHub Desktop.
Save satori99/2cb06938bfe8532ecc00e60906509052 to your computer and use it in GitHub Desktop.
This is a proof-of-concept for using ffmpeg as a HTTP video stream proxy that can reduce the volume of ad-breaks
/**
* This is a proof-of-concept for using ffmpeg as a HTTP video stream proxy
* that can reduce ad volume.
*
* It only works on streams containing SCTE35 data packets.
* You can check a stream using:
*
* ffmpeg -hide_banner -i <SOURCE_URL> 2>&1 | grep scte_35
*
* Start the demo:
*
* node app.js
*
* Then open http://localhost:3000 in chrome or whatever...
*
* The volume will be reduced at the start of each ad-break, and restored when
* the break ends.
*
* NB: if using windows, run this in a WSL bash prompt because cmd.exe|powershell
* doesn't support unix domain sockets.
*/
// Channel 9 Sydney - 720p HLS stream
const SOURCE_URL = 'https://9now-live.akamaized.net/hls/live/708951-b/ch9-syd/master1.m3u8'
const net = require( 'net' );
const http = require( 'http' );
const crypto = require( 'crypto' );
const { spawn } = require( 'child_process' );
const { Transform } = require( 'stream' );
// https://www.scte.org/SCTEDocs/Standards/ANSI_SCTE%2035%202019r1.pdf
// Section 9.6, page 29
class SpliceInfoSection {
static validate ( buffer ) {
let crc = 0xffffffff, i = 0;
const l = buffer.length;
for ( ; i < l; i ++ ) {
crc ^= ( ( buffer[ i ] << 24 ) >>> 0 );
for ( let k = 0; k < 8; k ++ )
crc = crc & 0x80000000 ? ( crc << 1 ) ^ 0x04c11db7 : crc << 1;
}
return crc === 0;
}
static from ( buffer ) {
if ( ! SpliceInfoSection.validate( buffer ) )
throw new Error( 'invalid splice info section: CRC error' );
let offset = 0;
const result = Object.create( SpliceInfoSection.prototype, {
table_id: {
value: buffer[ offset ++ ],
enumerable: false
},
section_syntax_indicator: {
value: ( buffer[ offset ] & 0x80 ) === 0x80,
enumerable: false
},
private_indicator: {
value: ( buffer[ offset ] & 0x40 ) === 0x40,
enumerable: false
},
section_length: {
value: ( ( buffer[ offset ++ ] & 0x0f ) << 8 ) | buffer[ offset ++ ],
enumerable: false
},
protocol_version: {
value: buffer[ offset ++ ],
enumerable: false
},
encrypted_packet: {
value: ( buffer[ offset ] & 0x80 ) === 0x80,
enumerable: true
},
encryption_algorithm: {
value: ( buffer[ offset ] & 0x7e ) >> 1,
enumerable: false
},
pts_adjustment: {
value: ( ( ( ( buffer[ offset ++ ] & 0x01 ) << 32 ) | buffer.slice( offset, offset += 4 ).readInt32BE( 0 ) ) >>> 0 ) / 90000,
enumerable: true
},
cw_index: {
value: buffer[ offset ++ ],
enumerable: false
},
tier: {
value: ( buffer[ offset ++ ] << 4 ) | ( ( buffer[ offset ] & 0xf0 ) >> 4 ),
enumerable: false
},
splice_command_length: {
value: ( ( buffer[ offset ++ ] & 0x0f ) << 8 ) | buffer[ offset ++ ],
enumerable: false
},
splice_command_type: {
value: buffer[ offset ++ ],
enumerable: false
},
} );
if ( result.table_id !== 0xfc )
throw new Error( 'invalid splice Info Section: bad table id' );
if ( result.section_length !== buffer.length - 3 )
throw new Error( 'invalid splice Info Section: bad section length' );
if ( result.splice_command_type == 0x05 ) {
Object.defineProperties( result, {
splice_event_id: {
enumerable: true,
value: buffer.slice( offset, offset += 4 ).readInt32BE( 0 )
},
splice_event_cancel_indicator: {
enumerable: true,
value: ( buffer[ offset ++ ] & 0x80 ) === 0x80
},
} );
if ( ! result.splice_event_cancel_indicator ) {
Object.defineProperties( result, {
out_of_network_indicator: {
value: ( buffer[ offset ] & 0x80 ) === 0x80,
enumerable: true
},
program_splice_flag: {
value: ( buffer[ offset ] & 0x40 ) === 0x40,
enumerable: false
},
duration_flag: {
value: ( buffer[ offset ] & 0x20 ) === 0x20,
enumerable: false
},
splice_immediate_flag: {
value: ( buffer[ offset ++ ] & 0x10 ) === 0x10,
enumerable: false
},
} );
}
if ( result.program_splice_flag && ! result.splice_immediate_flag ) {
const timeSpecified = ( buffer[ offset ] & 0x80 ) === 0x80;
Object.defineProperty( result, 'splice_time', {
value: timeSpecified
? ( ( ( ( buffer[ offset ++ ] & 0x01 ) << 32 ) | buffer.slice( offset, offset += 4 ).readInt32BE( 0 ) ) >>> 0 ) / 90000
: -1,
enumerable: true,
} );
}
if ( ! result.program_splice_flag ) {
const components = [];
const componentCount = buffer[ offset ++ ];
for ( let i = 0 ; i < componentCount; i ++ ) {
const component = { tag: buffer[ offset ++ ] };
const timeSpecified = ( buffer[ offset ] & 0x80 ) === 0x80;
Object.defineProperty( component, 'splice_time', {
value: timeSpecified
? ( ( ( ( buffer[ offset ++ ] & 0x01 ) << 32 ) | buffer.slice( offset, offset += 4 ).readInt32BE( 0 ) ) >>> 0 ) / 90000
: -1,
enumerable: true,
} );
}
Object.defineProperty( result, 'components', {
value: Object.freeze( components ),
enumerable: true
} );
}
if ( result.duration_flag ) {
Object.defineProperties( result, {
auto_return: {
value: ( buffer[ offset ] & 0x80 ) === 0x80,
enumerable: true,
},
duration: {
value: ( ( ( ( buffer[ offset ++ ] & 0x01 ) << 32 ) | buffer.slice( offset, offset += 4 ).readInt32BE( 0 ) ) >>> 0 ) / 90000,
enumerable: true
},
} );
}
Object.defineProperties( result, {
unique_program_id: {
value: ( buffer[ offset ++ ] << 8 ) | buffer[ offset ++ ],
enumerable: true
},
avail_num: {
value: buffer[ offset ++ ],
enumerable: false
},
avails_expected: {
value: buffer[ offset ++ ],
enumerable: false
},
} );
} else {
console.error( 'Not a splice_insert command!' );
}
const descriptor_loop_length = ( buffer[ offset ++ ] << 8 ) | buffer[ offset ++ ];
for ( let i = 0; i < descriptor_loop_length; i ++ ) {
const splice_descriptor = {
splice_descriptor_tag: buffer[ offset ++ ],
descriptor_length: buffer[ offset ++ ],
identifier: buffer.slice( offset, offset += 4 ).readInt32BE( 0 ),
private_bytes: []
};
for ( let j = 0 ; j < splice_descriptor.descriptor_length; j ++ )
splice_descriptor.private_bytes.push( buffer[ offset ++ ] );
}
Object.defineProperty( result, 'crc', {
value: buffer.slice( offset, offset += 4 ).readInt32BE( 0 ),
enumerable: false
} )
return result;
}
}
class SpliceInfoStream extends Transform {
constructor ( options ) {
super( Object.assign( options || {}, { objectMode: true } ) )
}
_transform ( buffer, encoding, cb ) {
try {
this.push( SpliceInfoSection.from( buffer ) );
} catch ( err ) {
console.error( 'SCTE35Stream:', err );
this.emit( 'invalid', buffer );
}
cb();
}
}
const createProcess = ( input, output ) => new Promise( ( resolve, reject ) => {
const ffmpeg = spawn( 'ffmpeg', [
'-i', input,
'-hide_banner',
'-loglevel', 'warning',
'-threads', '2',
'-analyzeduration', '5MB',
'-probesize', '5MB',
// video frames are not decoded
'-map', '0:v', '-vcodec', 'copy', '-vsync', '0',
// audio frames are decoded to alter the volume
'-map', '0:a', '-filter:a', 'volume=1.0',
'-acodec', 'aac', '-ab', '128k', '-ac', '2', '-async', '1',
// send video and audio frames to unix domain socket
// '-f', 'mpegts',
'-f', 'mp4',
'-movflags', 'faststart',
'-movflags', 'empty_moov',
'-movflags', 'frag_keyframe',
'-listen', '1', output,
// send scte 35 data packets to stdout
'-map', '0:2', '-dcodec', 'copy',
'-f', 'data', 'pipe:1'
] );
ffmpeg.once( 'error', err => {
console.debug( 'ffmpeg error occurred' );
clearTimeout( timer );
reject( new Error( 'Failed to create ffmpeg child process' ) );
} );
ffmpeg.once( 'exit', code => {
console.debug( 'ffmpeg exit has occurred' );
clearTimeout( timer );
ffmpeg.removeAllListeners( 'error' );
ffmpeg.stderr.once( 'data', buffer => {
reject( buffer.toString( 'utf8' ) );
} );
} );
const timer = setTimeout( () => {
ffmpeg.removeAllListeners( 'error' );
ffmpeg.removeAllListeners( 'exit' );
resolve( ffmpeg );
}, 150 );
} );
const app = ( req, res ) => {
console.log( 'new request', res.connection.remoteAddress, res.connection.remotePort );
// create a hash from the remote conn. info to make a unique socket name
const md5 = crypto.createHash( 'md5' );
md5.update( res.connection.remoteAddress + res.connection.remotePort );
const socketPath = `/tmp/adsoft.${md5.digest('hex')}.sock`;
// create ffmpeg child process
createProcess( SOURCE_URL, 'unix:' + socketPath ).then( ffmpeg => {
ffmpeg.once( 'exit', () => console.debug( `ffmpeg child process ${ffmpeg.pid} has exited` ) );
ffmpeg.stderr.on( 'data', buffer => console.debug( buffer.toString( 'utf8' ) ) );
const scte35Stream = new SpliceInfoStream();
scte35Stream.on( 'data', info => {
console.log( 'scte35 info:', info );
// set volume of stream
ffmpeg.stdin.write( `Cvolume ${info.pts_adjustment} volume ${info.out_of_network_indicator?0.25:1.0}\n` );
} );
ffmpeg.stdout.pipe( scte35Stream );
// wait for ffmpeg to create the socket file...
setTimeout( () => {
const socket = net.createConnection( socketPath, () => {
console.debug( 'connected to domain socket' );
res.setHeader( 'Content-Type', 'video/mp4' );
socket.pipe( res );
res.on( 'close', () => {
console.debug( 'connection closed' )
socket.destroy();
} );
res.on( 'finish', () => {
console.debug( 'response finished' )
socket.destroy();
} );
} );
socket.once( 'error', err => {
console.error( 'unix socket error:', err );
ffmpeg.kill();
} );
}, 500 );
} ).catch( err => {
console.error( err );
res.end( err.toString() );
} );
}
const server = http.createServer( app ).listen( 3000, '0.0.0.0', () => {
console.log( 'Listening...' );
} );
//
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment