Skip to content

Instantly share code, notes, and snippets.

@roccomuso
Created February 11, 2021 10:26
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save roccomuso/e65e80aecbe700db2576e751fa1a22c4 to your computer and use it in GitHub Desktop.
Save roccomuso/e65e80aecbe700db2576e751fa1a22c4 to your computer and use it in GitHub Desktop.
Aws Kinesis stream upload example in JS - get rtsp stream with ffmpeg and use putMedia to upload it on Amazon Kinesis
const ffmpeg = require('fluent-ffmpeg')
const parseUrl = require('url').parse
// Polyfill, modifying the global Object
require('es6-object-assign').polyfill()
global.Promise = require('es6-promise').Promise
const aws4 = require('aws4')
const axios = require('axios')
const CancelToken = axios.CancelToken
const fs = require('fs')
const PipeViewer = require('pv')
function noop(){}
const HANGING_TIMEOUT = 20 * 1000
/*
const STREAM_NAME = 'test-stream'
const DATA_ENDPOINT = 's-1e413xxx.kinesisvideo.us-east-1.amazonaws.com' // returned by getDataEndpoint
const pwd = {
endpoint: 'kinesisvideo.us-east-1.amazonaws.com',
region: 'us-east-1',
accessKeyId: " ... ",
secretAccessKey: " ... ",
}
*/
function sendStream({mkvFile, streamName, dataEndpoint, pwd}, onStart, onFinish){
onFinish = onFinish || noop
const opts = {
service: 'kinesisvideo',
host: dataEndpoint,
path: '/putMedia',
method: 'POST', // NB. required!
// body: Buffer.from('UNSIGNED-PAYLOAD'),
headers: {
'Content-Type': 'application/json',
'x-amzn-stream-name': streamName,
"x-amzn-fragment-timecode-type": "ABSOLUTE",
"x-amz-content-sha256":"UNSIGNED-PAYLOAD"
}
}
const signed = aws4.sign(opts, pwd)
console.log(signed)
const reqUri = 'https://' + opts.host+opts.path
let cancel;
axios({
method:'POST',
timeout: 40*1000,
url: reqUri,
headers: signed.headers,
data: mkvFile,
responseType: 'stream',
maxContentLength: Infinity, // required for stream!
cancelToken: new CancelToken(function executor(c) {
// An executor function receives a cancel function as a parameter
cancel = c
})
}).then(function(res) {
onStart && onStart(cancel)
res.data.on('data', function(fragRes){
console.log(fragRes.toString())
fragRes = JSON.parse(fragRes.toString())
const EventType = fragRes.EventType
const ErrorCode = fragRes.ErrorCode
if (EventType === 'ERROR') {
// request aborted
onFinish(new Error('Fragment event type: ERROR - ' + ErrorCode))
}
})
}).catch(onFinish)
}
function startUpload ({rtspUri, streamName, dataEndpoint, pwd}){
// get stream
const ffmpegVideo = getMkvStream(rtspUri, null, function(err){
// rtsp error
if (err) {
startUpload({rtspUri, streamName, dataEndpoint}) // start again
}
})
const pv = PipeViewer()
pv.on('info', function(str){
console.log('Speed: '+str.speed+' - Transferred: '+str.transferred)
})
// start send stream
sendStream({mkvFile: ffmpegVideo.pipe(pv), streamName, dataEndpoint, pwd}, function started(cancel){
setTimeout(function () {
cancel() // stop upload
ffmpegVideo.kill() // stop stream and raise err cb above
}, 40*60*1000) // 40 min
}, function finished(err) {
if (err) console.error(err)
})
}
function getMkvStream(uri, onStart, onFinished){
let processHanging = null
const isRtsp = parseUrl(uri).protocol === 'rtsp:'
const video = ffmpeg(uri) // also file '/media/desk/Test.mkv'
//.videoBitrate(500)
.inputOptions(isRtsp ? ['-rtsp_transport tcp', '-re'] : [])
.format('matroska')
.videoCodec('libx264').fps(15)
.outputOptions(['-map 0:0', '-an', '-g 15', '-vsync 1', '-profile:v baseline',
'-reset_timestamps 1',
'-quality good',
'-cpu-used 2',
'-pix_fmt yuv420p'
])
.on('progress', function(progress) {
console.log('Processing:', progress)
clearTimeout(processHanging)
processHanging = setTimeout(function(){
console.error('Hanging for more than 20 sec.')
video.kill()
}, HANGING_TIMEOUT) // hanging for more than x sec
})
.on('start', function(commandLine) {
console.log('Spawned Ffmpeg with command: ' + commandLine)
onStart && onStart()
})
.on('end', function() {
console.log('Finished processing')
onFinished && onFinished()
})
.on('error', function(err, stdout, stderr) {
console.log('Cannot process video: ' + err.message)
clearTimeout(processHanging)
onFinished && onFinished(err) // eventually restart
})
return video
}
/*
// save stream on file:
// getMkvStream('rtsp://admin:@192.168.69.103:554/live1.sdp').pipe().pipe(require('fs').createWriteStream('./test.mkv'), null, console.error)
// Kinesis upload USAGE:
startUpload({
rtspUri: 'rtsp://admin:@192.168.88.9:554/live1.sdp',
streamName: 'demo-stream',
dataEndpoint: 's-4010bf70.kinesisvideo.us-west-2.amazonaws.com',
pwd: {
endpoint: 'kinesisvideo.us-west-2.amazonaws.com',
region: 'us-west-2',
accessKeyId: " ... ",
secretAccessKey: " ... ",
}
})
// it spawns ffmpeg and upload stream
*/
@Trungmaster5
Copy link

Thank you for sharing this great solution.

@caesarisme
Copy link

Thank you so much

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment