Skip to content

Instantly share code, notes, and snippets.

@misiek08
Created February 14, 2024 00:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save misiek08/ef438d707cdcbc4036e92d9358d5da7e to your computer and use it in GitHub Desktop.
Save misiek08/ef438d707cdcbc4036e92d9358d5da7e to your computer and use it in GitHub Desktop.
ffmpeg alternative input switching from c8e298ab-9614-2cf6-e2c8-0e2729ae5afd@vivanet.hu patch
/*
* Copyright (c) 2018 Bodecs Bela
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
/**
* @file
* API utility for automatic failover switching between main input and secondary
* input in case of input unavailability
* @example alternative_input.c
*/
/**
* Motivation: let's say you have a unreliable live stream source and you want to
* transcode its first video and audio stream in realtime
* but you want to survive the ocasions when
* the source is unavailable. So use a secondary live source but
* the transition should occur seamlessly without breaking/re-starting
* the transcoding processs
*
* You may have a main source as an flv format rtmp://<server>/<stream> or
* an mpegts format udp://<multicast_address:port>/ or a
* hls format http://<server>/stream.m3u8 or whatever similar.
*
* Your original ffmpeg command line may look like this:
* ffmpeg -f <input_format_name> -i <main_input_url> -map 0:v:0 -map 0:a:0
* -c:v x264 -s 640x360 -r 25 -pix_fmt yuv420p
* -c:a aac -ac 2 -ar 44100
* -f hls out.m3u8
*
* Should the source is unavailable you may want to use a secondary source to show a
* color-bar screen with a silent audio. To achive this we virtually cut into
* two halves your original ffmpeg command and insert alternative_input handler
* between them.
*
* Here is the modified output handler command line: (command#1)
* ffmpeg -y -f nut -listen 1 -i unix:output.unix
* -c:v x264 -s 640x360 -r 25 -pix_fmt yuv420p
* -c:a aac -ac 2 -ar 44100
* -f hls out.m3u8
*
* here is the modified main input producer command line: (command#2)
* ffmpeg -y -f <input_format_name> -i <main_input_url> -map 0:v:0 -map 0:a:0
* -c:v rawvideo -s 640x360 -r 25 -pix_fmt yuv420p
* -c:a pcm_s32le -ac 2 -ar 44100
* -write_index 0 -f nut -listen 1 unix:input_main.unix
*
* here is the secondary input producer command line: (command#3)
* ffmpeg -y -re -f lavfi
* -i "aevalsrc=exprs=0:nb_samples=1024:sample_rate=44100:channel_layout=stereo, \
* aformat=sample_fmts=s32"
* -re -f lavfi -i "smptehdbars=size=640x360:rate=25, format=pix_fmts=yuv420p"
* -c:v rawvideo -c:a pcm_s32le
* -map 1 -map 0
* -write_index 0 -f nut -listen 1 unix:input_second.unix
*
* and finally the alternative input handler command line: (command#4)
* alternative_input -im unix:input_main.unix -ifm nut
* -is unix:input_second.unix -ifs nut
* -o unix:output.unix -of nut
* -timeout 150
*
* How to test:
* start modified output handler (command#1), then in a separate window
* start alternative input handler (command#4), then in a separate window
* start main input producer (command#2) and then in a separate window
* start secondary input producer (command#3). You will get on the output
* of output handler the main input. Now stop main input producer
* eg. by pressing q in its window. Now you get the secondary source
* (smpt-colorbars on screen and silence as audio) on the output of output
* handler Now, start the main input producer again. After successfull start
* you will get on the output of output handler the main input again.
*
* some suggestions:
* - use long analyze duration (-analyzeduration 10000000) option
* on main input to reliably collect all input info
* - all corresponding elementary streams on inputs of alternative
* input handler must have matching properties regarding
* stream type, pix format, pix size, audio sample rate
* - expected input format of alternative input handler is always
* intra only video and audio format is pcm_s32le
* - elementary stream number is unlimited in inputs
* - on beginning first start output handler, then alternative input handler,
* then main input and then secondary input because alternative input handler
* will stop immediatly if output is not writeable but try to open
* inputs continously
* - at beginning no output will be produced as long as both of
* main and second input are not opened
* - alternative input handler output video codec is rawvideo and
* output audio codec is pcm_s32le
* - nut muxer/demuxer format was tested successfully for output/input,
* other format may work (e.g. avi with their limitations)
* nut has a new option to make lower format overhead: -syncpoints none
* - only unix protocol was tested successfully for input/output
* - unavailable input will be tested for re-opening in each 1000 ms, even
* the secondary input as well
* - should the main input is avalailable again the switching back occurs
*
*
* Description of command line parameters of alternative input handler:
* -im url of primary/main input
* -ifm (optional) format name of primary input
* -is url of secondary input
* -ifs (optional) format name of secondary input
* -o url of output
* -of (optional) output format name
* -timeout (optional) if main input is not available for this time period,
* switching to the second input will occur (default value 100ms),
* value expressed in milliseconds
* -loglevel (optional) info|debug|warning|error (default level is info)
* -dsc (optional) internally inputs are consumed in real time fashion,
* if data may arrive quicker than relatime according to incoming timestamps,
* reading will be slow down. If consecutive timestamps differ more
* than this threshold value, then input data will be treated as discontinued.
* Value expressed in microseconds, default value is 3000000
* -sf (optional) path of state file to write
*
*
* State file structure
* o:?:z:y 0:x 1:x
*
* There are 3 groups of data, separated by space: output, main input, second input
*
* ?: index of current input switched to output: _ - none (at starting), 0 - main, 1 - secondary
* z: number of failover switches
* y: current output state period length since last input source switching in milliseconds
* x: status of input: 1 - ok, 0 - error
*
*/
#include <unistd.h>
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavfilter/buffersink.h>
#include <libavfilter/buffersrc.h>
#include <libavutil/opt.h>
#include <libavutil/channel_layout.h>
#include <libavutil/frame.h>
#include <libavutil/time.h>
#include <libavutil/mathematics.h>
#include <pthread.h>
#include <signal.h>
#include <stdarg.h>
// how often try to re-open input in case of failover
#define INPUT_TRYING_INTERVAL_USEC 1000000
#define DEFAULT_INPUT_TIMEOUT_MSEC 100
#define DEFAULT_LOG_LEVEL AV_LOG_INFO
#define MAIN_INPUT_INDEX 0
#define SECOND_INPUT_INDEX 1
#define NB_INPUTS 2
#define DEFAULT_INPUT_STREAM_TIME_DISCONTINUITY_THRESHOLD_US 3000000
#define DEFAULT_OUTPUT_AUDIO_CODEC_NAME "pcm_s32le"
#define DEFAULT_EXPECTED_AUDIO_SAMPLE_FORMAT AV_SAMPLE_FMT_S32
#define DEFAULT_OUTPUT_VIDEO_CODEC_NAME "rawvideo"
typedef struct InputStreamStatData {
// these field are written/read by input handler threads
int64_t first_pts; // pts of first encoded active input's frame since the last open in its own input stream timebase
int64_t nb_frames; // nb of forwarded/encoded frames of current active input
} InputStreamStatData;
typedef struct OutputStreamStatData {
int64_t last_pts; // last encoded output frame end pts (pts + dur) in output stream timebase
int64_t first_pts;
int64_t pts_delta; // to adjust by this value the encoded frames pts in output stream timebase
int64_t nb_frames; // total output frames
} OutputStreamStatData;
typedef struct AppContext {
char *input_filenames[NB_INPUTS]; // e.g. "unix:doc/examples/input_main.unix";
char *input_format_names[NB_INPUTS]; // e.g "nut"
AVCodecContext **dec_ctx[NB_INPUTS]; // infinitely many streams in each input
AVFormatContext *input_fmt_ctx[NB_INPUTS];
char *output_filename;
char *output_format_name;
AVCodecContext **enc_ctx; // infinitely many streams as in input
AVFormatContext *output_fmt_ctx;
InputStreamStatData *input_stream_data;
OutputStreamStatData *output_stream_data;
int input_failover_counter; // main->second switchings
pthread_mutex_t encoder_mutex;
int thread_id[NB_INPUTS];
int input_timeout_ms;
int input_stream_time_discnt_thrshd_us;
int64_t start; // start wallclock time of this program
int64_t current_source_index_state_time;
volatile sig_atomic_t input_source_index;
volatile sig_atomic_t to_exit;
volatile sig_atomic_t input_has_new_frame[NB_INPUTS];
char * state_file;
pthread_t input_threads[NB_INPUTS]; // each input has its own reading thread
} AppContext;
static AppContext app_ctx = { {NULL, NULL}, {NULL, NULL}, {NULL, NULL}, {NULL, NULL},
NULL, NULL, NULL, NULL, NULL, NULL,
0, PTHREAD_MUTEX_INITIALIZER,
{MAIN_INPUT_INDEX, SECOND_INPUT_INDEX}, DEFAULT_INPUT_TIMEOUT_MSEC,
DEFAULT_INPUT_STREAM_TIME_DISCONTINUITY_THRESHOLD_US, 0,
0, 0, 0, {0, 0}, NULL };
static const char *output_audio_codec_name = DEFAULT_OUTPUT_AUDIO_CODEC_NAME;
static const char *output_video_codec_name = DEFAULT_OUTPUT_VIDEO_CODEC_NAME;
static void timed_log(int level, const char *fmt, ...)
{
char timed_fmt[2048];
int64_t now_us = av_gettime();
va_list vl;
va_start(vl, fmt);
if (snprintf(timed_fmt, sizeof(timed_fmt), "[%"PRId64"--%"PRId64"] %s", now_us, now_us - app_ctx.start, fmt) > 0)
av_vlog(NULL, level, timed_fmt, vl);
va_end(vl);
}
static int open_single_input(int input_index)
{
int ret, i;
AVInputFormat *input_format = NULL;
AVDictionary * input_options = NULL;
AVFormatContext * input_fmt_ctx = NULL;
if (app_ctx.input_format_names[input_index]) {
if (!(input_format = av_find_input_format(app_ctx.input_format_names[input_index]))) {
timed_log(AV_LOG_ERROR, "Input #%d Unknown input format: '%s'\n", input_index,
app_ctx.input_format_names[input_index]);
return AVERROR(EINVAL);
}
}
av_dict_set(&input_options, "rw_timeout", "2000000", 0);
av_dict_set(&input_options, "timeout", "2000", 0);
if (!(app_ctx.input_fmt_ctx[input_index] = avformat_alloc_context()))
return AVERROR(ENOMEM);
// try to open input several times
while (!app_ctx.to_exit) {
if ((ret = avformat_open_input(&app_ctx.input_fmt_ctx[input_index],
app_ctx.input_filenames[input_index],
input_format, &input_options)) >= 0) {
timed_log(AV_LOG_INFO, "Input #%d File successfully opened: %s\n",
input_index, app_ctx.input_filenames[input_index]);
break;
}
timed_log(AV_LOG_ERROR, "Input #%d Cannot open input file %s, %s\n",
input_index, app_ctx.input_filenames[input_index], av_err2str(ret));
av_usleep(INPUT_TRYING_INTERVAL_USEC);
}
input_fmt_ctx = app_ctx.input_fmt_ctx[input_index];
if ((ret = avformat_find_stream_info(input_fmt_ctx, NULL)) < 0) {
timed_log(AV_LOG_ERROR, "Input #%d Cannot find stream information\n", input_index);
return ret;
}
app_ctx.dec_ctx[input_index] = av_mallocz_array(input_fmt_ctx->nb_streams,
sizeof(*app_ctx.dec_ctx[input_index]));
if (!app_ctx.dec_ctx[input_index]) {
timed_log(AV_LOG_ERROR, "Could not allocate decoding context array for Input #%d\n", input_index);
return AVERROR(ENOMEM);
}
// creating decoding context for each input stream
for (i = 0; i < input_fmt_ctx->nb_streams; i++) {
AVStream *stream = input_fmt_ctx->streams[i];
AVCodec *dec = avcodec_find_decoder(stream->codecpar->codec_id);
AVCodecContext *codec_ctx;
if (!dec) {
timed_log(AV_LOG_ERROR, "Input #%d Failed to find decoder for elementary stream index #%u\n",
input_index, i);
return AVERROR_DECODER_NOT_FOUND;
}
codec_ctx = avcodec_alloc_context3(dec);
if (!codec_ctx) {
timed_log(AV_LOG_ERROR, "Input #%d Failed to allocate the decoder context for "
"elementary stream index #%u\n", input_index, i);
return AVERROR(ENOMEM);
}
ret = avcodec_parameters_to_context(codec_ctx, stream->codecpar);
if (ret < 0) {
timed_log(AV_LOG_ERROR,
"Input #%d Failed to copy decoder parameters to decoder context for stream #%u\n",
input_index, i);
return ret;
}
av_opt_set_int(codec_ctx, "refcounted_frames", 1, 0);
/* Reencode video and audio streams and only remux subtitles, data streams etc. */
if (codec_ctx->codec_type == AVMEDIA_TYPE_VIDEO || codec_ctx->codec_type == AVMEDIA_TYPE_AUDIO) {
if (codec_ctx->codec_type == AVMEDIA_TYPE_VIDEO)
codec_ctx->framerate = av_guess_frame_rate(input_fmt_ctx, stream, NULL);
/* Open decoder */
ret = avcodec_open2(codec_ctx, dec, NULL);
if (ret < 0) {
timed_log(AV_LOG_ERROR, "Input #%d Failed to open decoder for elementary stream #%u\n",
input_index, i);
return ret;
}
} else if (codec_ctx->codec_type == AVMEDIA_TYPE_UNKNOWN) {
timed_log(AV_LOG_FATAL, "Input #%d Elementary stream #%d is of unknown type, cannot proceed\n",
input_index, i);
return AVERROR(EINVAL);
}
app_ctx.dec_ctx[input_index][i] = codec_ctx;
}
av_dump_format(input_fmt_ctx, 0, app_ctx.input_filenames[input_index], 0);
return 0;
}
static int try_to_reopen_input(int input_source_index)
{
int ret;
while (!app_ctx.to_exit) {
if ((ret = open_single_input(input_source_index)) >= 0) { //
timed_log(AV_LOG_INFO, "Input #%d Successfull reopening\n", input_source_index);
// intentionally do not dry the output pipeline here
// but remain in its current state to use other realtime stream as secondary input
return 0;
}
av_usleep(INPUT_TRYING_INTERVAL_USEC);
}
return AVERROR(EIO);
}
// input packet maybe null in case of drying
static int encode_frame(AVFrame *frame, int stream_index, int input_source_index)
{
int ret;
AVCodecContext * enc_ctx = app_ctx.enc_ctx[stream_index];
AVPacket *output_packet;
output_packet = av_packet_alloc();
if (!output_packet) {
timed_log(AV_LOG_ERROR, "Input #%d Stream #%d could not allocate output packet\n",
input_source_index, stream_index);
return AVERROR(ENOMEM);
}
/* send the frame to the encoder */
if (frame) { // frame maybe null
OutputStreamStatData * st_data = &app_ctx.output_stream_data[stream_index];
st_data->last_pts = frame->pts;
if (!st_data->nb_frames)
st_data->first_pts = frame->pts;
st_data->nb_frames++;
// add calculated frame duration to input frame pts
if (enc_ctx->codec_type == AVMEDIA_TYPE_AUDIO && frame->sample_rate)
// calculate frame duration by number of audio samples
st_data->last_pts += av_rescale_q(frame->nb_samples, av_make_q(1, frame->sample_rate), enc_ctx->time_base);
else if (enc_ctx->codec_type == AVMEDIA_TYPE_VIDEO && st_data->nb_frames >= 2)
// use overall mean frame duration (curr_pts/nb_frames-1) * nb_frames
st_data->last_pts = av_rescale(frame->pts - st_data->first_pts, st_data->nb_frames, st_data->nb_frames - 1);
timed_log(AV_LOG_DEBUG, "Input #%d Stream #%d Send frame for encoding, pts: %3"PRId64"\n",
input_source_index, stream_index, frame->pts);
}
ret = avcodec_send_frame(enc_ctx, frame);
if (ret == AVERROR(EAGAIN)) {
} else if (ret < 0) {
timed_log(AV_LOG_ERROR, "Input #%d Error sending a frame for encoding: %s\n",
input_source_index, av_err2str(ret));
return ret;
}
while (ret >= 0) {
ret = avcodec_receive_packet(enc_ctx, output_packet);
if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF)
return ret;
else if (ret < 0) {
timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Error during encoding: %s\n",
input_source_index, stream_index, av_err2str(ret));
return ret;
}
timed_log(AV_LOG_DEBUG, "Input #%d Stream #%d Write output packet, pts: %"PRId64" (size=%d)\n",
input_source_index, stream_index, output_packet->pts, output_packet->size);
output_packet->stream_index = stream_index;
ret = av_interleaved_write_frame(app_ctx.output_fmt_ctx, output_packet);
if (ret < 0) {
timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Error muxing packet, %s\n",
input_source_index, stream_index, av_err2str(ret));
break;
}
av_packet_unref(output_packet);
}
av_packet_free(&output_packet);
return ret;
}
// packet maybe null, so need stream_index
static int handle_received_packet(AVPacket *packet, int stream_index, int input_source_index)
{
int ret = 0;
int64_t new_pts = 0;
AVCodecContext * dec_ctx = app_ctx.dec_ctx[input_source_index][stream_index];
AVFormatContext * input_fmt_ctx = app_ctx.input_fmt_ctx[input_source_index];
AVFrame *frame = av_frame_alloc();
if (!frame) {
timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Could not allocate frame\n",
input_source_index, stream_index);
return AVERROR(ENOMEM);
}
if (packet) {
timed_log(AV_LOG_DEBUG, "Input #%d Stream #%d packet received, pts: %3"PRId64", size: %d\n",
input_source_index, stream_index, packet->pts, packet->size);
}
ret = avcodec_send_packet(dec_ctx, packet);
if (ret == AVERROR(EAGAIN)) {
// nothing to do
} else if (ret == AVERROR_EOF) {
timed_log(AV_LOG_INFO, "Input #%d Stream #%d avcodec_send_packet returned: %s\n",
input_source_index, stream_index, av_err2str(ret));
} else if (ret < 0) {
timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Error while sending a packet to decoder: %s\n",
input_source_index, stream_index, av_err2str(ret));
av_frame_free(&frame);
return ret;
}
while (ret >= 0) {
ret = avcodec_receive_frame(dec_ctx, frame);
if (ret == AVERROR(EAGAIN))
break;
else if (ret == AVERROR_EOF) {
timed_log(AV_LOG_INFO, "Input #%d Stream #%d avcodec_receive_frame returned: %s\n",
input_source_index, stream_index, av_err2str(ret));
break;
} else if (ret < 0) {
timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Error while receiving a frame from decoder: %s\n",
input_source_index, stream_index, av_err2str(ret));
av_frame_free(&frame);
return ret;
}
app_ctx.input_has_new_frame[input_source_index] = 1;
timed_log(AV_LOG_DEBUG, "Input #%d Set input_has_new_frame flag\n", input_source_index);
if (app_ctx.input_source_index == input_source_index && !pthread_mutex_trylock(&app_ctx.encoder_mutex) ) {
InputStreamStatData * in_st_data = &app_ctx.input_stream_data[stream_index];
if (in_st_data->first_pts == AV_NOPTS_VALUE) {
in_st_data->first_pts = frame->pts;
in_st_data->nb_frames = 1;
} else {
int64_t avg_delta_frame_pts = (frame->pts - in_st_data->first_pts) / (double)in_st_data->nb_frames;
int64_t avg_delta_frame_pts_time = av_rescale_q(avg_delta_frame_pts, dec_ctx->time_base, AV_TIME_BASE_Q);
if (in_st_data->nb_frames > 25 && dec_ctx->codec_type == AVMEDIA_TYPE_VIDEO)
timed_log(AV_LOG_DEBUG, "Input #%d stream #%d stream fps: %0.2f, nb_frames: %"PRId64"\n",
input_source_index, stream_index,
(double)1000000/avg_delta_frame_pts_time, in_st_data->nb_frames);
else
timed_log(AV_LOG_DEBUG, "Input #%d stream #%d nb_frames: %"PRId64"\n",
input_source_index, stream_index, in_st_data->nb_frames);
in_st_data->nb_frames ++;
}
new_pts = av_rescale_q_rnd(frame->pts - in_st_data->first_pts,
input_fmt_ctx->streams[stream_index]->time_base,
app_ctx.output_fmt_ctx->streams[stream_index]->time_base,
AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX);
new_pts += app_ctx.output_stream_data[stream_index].pts_delta;
timed_log(AV_LOG_DEBUG, "Input #%d Stream #%d frame received and sending for encoding, "
"pts: %"PRId64" => %"PRId64"\n", input_source_index,
stream_index, frame->pts, new_pts);
frame->pts = new_pts;
ret = encode_frame(frame, stream_index, input_source_index);
if (ret < 0 && ret != AVERROR(EAGAIN)) {
app_ctx.to_exit = 1;
timed_log(AV_LOG_INFO, "encoding terminating\n");
}
pthread_mutex_unlock(&app_ctx.encoder_mutex);
} else
ret = 0;
av_frame_unref(frame);
}
av_frame_free(&frame);
return ret;
}
static void print_usage(const char * program_name)
{
av_log(NULL, AV_LOG_ERROR, "usage: %s -im <primary/main input> [-ifm <format name of primary input>] "
"-is <secondary input> [-ifs <format name of secondary input>] "
"-o <output> [-of <output format name>] [-sf <state_file>] "
"[-timeout <input msec>] [-loglevel info|debug|warning|error] "
"[-dsc <input disconitnuity threshold usec>]\n", program_name);
}
static int read_parameters(int argc, char **argv)
{
int i;
for (i = 1; i < argc; i++) {
if (!strcmp(argv[i], "-im") && i+1 < argc) {
app_ctx.input_filenames[MAIN_INPUT_INDEX] = argv[++i];
} else if (!strcmp(argv[i], "-ifm") && i+1 < argc) {
app_ctx.input_format_names[MAIN_INPUT_INDEX] = argv[++i];
} else if (!strcmp(argv[i], "-is") && i+1 < argc) {
app_ctx.input_filenames[SECOND_INPUT_INDEX] = argv[++i];
} else if (!strcmp(argv[i], "-ifs") && i+1 < argc) {
app_ctx.input_format_names[SECOND_INPUT_INDEX] = argv[++i];
} else if (!strcmp(argv[i], "-o") && i+1 < argc) {
app_ctx.output_filename = argv[++i];
} else if (!strcmp(argv[i], "-of") && i+1 < argc) {
app_ctx.output_format_name = argv[++i];
} else if (!strcmp(argv[i], "-sf") && i+1 < argc) {
app_ctx.state_file = argv[++i];
} else if (!strcmp(argv[i], "-loglevel") && i+1 < argc) {
i++;
if (!strcmp(argv[i], "info")) {
av_log_set_level(AV_LOG_INFO);
} else if (!strcmp(argv[i], "error")) {
av_log_set_level(AV_LOG_ERROR);
} else if (!strcmp(argv[i], "warning")) {
av_log_set_level(AV_LOG_WARNING);
} else if (!strcmp(argv[i], "debug")) {
av_log_set_level(AV_LOG_DEBUG);
} else {
timed_log(AV_LOG_ERROR,
"Unexpected loglevel value: %s\n", argv[i]);
return AVERROR(EINVAL);
}
} else if (!strcmp(argv[i], "-timeout") && i+1 < argc) {
char * tail = NULL;
app_ctx.input_timeout_ms = strtoll(argv[++i], &tail, 10);
if (*tail || app_ctx.input_timeout_ms < 1) {
timed_log(AV_LOG_ERROR,
"Invalid or negative value '%s' for input timeout checking interval\n", argv[i]);
return AVERROR(EINVAL);
}
} else if (!strcmp(argv[i], "-dsc") && i+1 < argc) {
char * tail = NULL;
app_ctx.input_stream_time_discnt_thrshd_us = strtoll(argv[++i], &tail, 10);
if (*tail || app_ctx.input_timeout_ms < 1) {
timed_log(AV_LOG_ERROR,
"Invalid or negative value '%s' for input time discontinuity interval\n", argv[i]);
return AVERROR(EINVAL);
}
} else {
timed_log(AV_LOG_ERROR, "unknown option, or missing parameter: %s\n", argv[i]);
print_usage(argv[0]);
return AVERROR(EINVAL);
}
}
if (!app_ctx.input_filenames[MAIN_INPUT_INDEX] ||
!app_ctx.input_filenames[SECOND_INPUT_INDEX] ||
!app_ctx.output_filename) {
print_usage(argv[0]);
return AVERROR(EINVAL);
}
return 0;
}
static int check_input_streams_matching(void)
{
int i;
if (app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]->nb_streams != app_ctx.input_fmt_ctx[SECOND_INPUT_INDEX]->nb_streams) {
timed_log(AV_LOG_ERROR, "First input has #%d streams but secondary input has #%d streams, "
"but stream numbers should be matching, so aborting\n",
app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]->nb_streams,
app_ctx.input_fmt_ctx[SECOND_INPUT_INDEX]->nb_streams);
return AVERROR(EINVAL);
}
for (i = 0; i < app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]->nb_streams; i++) {
AVCodecContext * main_dec_ctx = app_ctx.dec_ctx[MAIN_INPUT_INDEX][i];
AVCodecContext * second_dec_ctx = app_ctx.dec_ctx[SECOND_INPUT_INDEX][i];
if (main_dec_ctx->codec_type != second_dec_ctx->codec_type) {
timed_log(AV_LOG_ERROR, "Mismatching stream types at #%d elementary stream, aborting\n", i);
return AVERROR(EINVAL);
}
if (main_dec_ctx->codec_type == AVMEDIA_TYPE_VIDEO) {
if (main_dec_ctx->width != second_dec_ctx->width) {
timed_log(AV_LOG_ERROR, "at stream #%d video width mismatch: %d != %d\n", i,
main_dec_ctx->width, second_dec_ctx->width);
return AVERROR(EINVAL);
}
if (main_dec_ctx->height != second_dec_ctx->height) {
timed_log(AV_LOG_ERROR, "at stream #%d video height mismatch: %d != %d\n", i,
main_dec_ctx->height, second_dec_ctx->height);
return AVERROR(EINVAL);
}
if (main_dec_ctx->pix_fmt != second_dec_ctx->pix_fmt) {
timed_log(AV_LOG_ERROR, "at stream #%d video pix_fmt mismatch: %d != %d\n", i,
main_dec_ctx->pix_fmt, second_dec_ctx->pix_fmt);
return AVERROR(EINVAL);
}
// TODO: check more video parameters
}
if (main_dec_ctx->codec_type == AVMEDIA_TYPE_AUDIO) {
if (main_dec_ctx->channels != second_dec_ctx->channels) {
timed_log(AV_LOG_ERROR, "at stream #%d audio channel number mismatch: %d != %d\n", i,
main_dec_ctx->channels, second_dec_ctx->channels);
return AVERROR(EINVAL);
}
if (main_dec_ctx->channel_layout != second_dec_ctx->channel_layout) {
timed_log(AV_LOG_ERROR, "at stream #%d audio channel layout mismatch: %"PRId64" != %"PRId64"\n",
i, main_dec_ctx->channel_layout, second_dec_ctx->channel_layout);
return AVERROR(EINVAL);
}
if (main_dec_ctx->sample_rate != second_dec_ctx->sample_rate) {
timed_log(AV_LOG_ERROR, "at stream #%d audio sample rate mismatch: %d != %d\n", i,
main_dec_ctx->sample_rate, second_dec_ctx->sample_rate);
return AVERROR(EINVAL);
}
if (main_dec_ctx->sample_fmt != DEFAULT_EXPECTED_AUDIO_SAMPLE_FORMAT) {
timed_log(AV_LOG_ERROR, "at elementary stream #%d audio sample format is not as expected (%d)\n",
i, DEFAULT_EXPECTED_AUDIO_SAMPLE_FORMAT);
return AVERROR(EINVAL);
}
if (main_dec_ctx->sample_fmt != second_dec_ctx->sample_fmt) {
timed_log(AV_LOG_ERROR, "at elementary stream #%d audio sample format mismatch: %d != %d\n",
i, main_dec_ctx->sample_fmt, second_dec_ctx->sample_fmt);
return AVERROR(EINVAL);
}
// TODO: check more audio parameters
}
}
return 0;
}
static int allocate_arrays(void)
{
int nb_streams = app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]->nb_streams;
app_ctx.enc_ctx = av_mallocz_array(nb_streams, sizeof(*app_ctx.enc_ctx));
if (!app_ctx.enc_ctx) {
timed_log(AV_LOG_ERROR,"Could not allocate encoder context list\n");
return AVERROR(ENOMEM);
}
app_ctx.input_stream_data = av_mallocz_array(nb_streams, sizeof(*app_ctx.input_stream_data));
if (!app_ctx.input_stream_data) {
timed_log(AV_LOG_ERROR,"Could not allocate input_stream_data list\n");
return AVERROR(ENOMEM);
}
app_ctx.output_stream_data = av_mallocz_array(nb_streams, sizeof(*app_ctx.output_stream_data));
if (!app_ctx.output_stream_data) {
timed_log(AV_LOG_ERROR,"Could not allocate input_stream_data list\n");
return AVERROR(ENOMEM);
}
return 0;
}
static int open_output (void)
{
int i, ret;
AVDictionary * output_options = NULL;
AVOutputFormat * output_format = NULL;
AVStream * out_stream;
AVStream * in_stream;
AVCodecContext * dec_ctx = NULL, * enc_ctx = NULL;
AVCodec * output_video_codec, * output_audio_codec;
AVFormatContext * input_fmt_ctx = app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX];
if (app_ctx.output_format_name) {
if (!(output_format = av_guess_format(app_ctx.output_format_name, NULL, NULL))) {
timed_log(AV_LOG_ERROR, "Unknown output format: '%s'\n", app_ctx.output_format_name);
return AVERROR(EINVAL);
}
}
// allocate the output media context
ret = avformat_alloc_output_context2(&app_ctx.output_fmt_ctx, output_format, NULL,
app_ctx.output_filename);
if (ret < 0 || !app_ctx.output_fmt_ctx) {
timed_log(AV_LOG_ERROR,"Could not deduce output format for %s.\n", app_ctx.output_filename);
return AVERROR(EINVAL);
}
if ((ret = allocate_arrays()) < 0)
return ret;
// find the video encoder for output
output_video_codec = avcodec_find_encoder_by_name(output_video_codec_name);
if (!output_video_codec) {
timed_log(AV_LOG_ERROR, "Output video codec '%s' not found\n", output_video_codec_name);
return AVERROR_ENCODER_NOT_FOUND;
}
// find the audio encoder for output
output_audio_codec = avcodec_find_encoder_by_name(output_audio_codec_name);
if (!output_audio_codec) {
timed_log(AV_LOG_ERROR, "Output audio codec '%s' not found\n", output_audio_codec_name);
return AVERROR_ENCODER_NOT_FOUND;
}
// creating encoding context for each input stream based on main input format
for (i = 0; i < input_fmt_ctx->nb_streams; i++) {
app_ctx.input_stream_data[i].first_pts = AV_NOPTS_VALUE;
app_ctx.output_stream_data[i].first_pts = AV_NOPTS_VALUE;
app_ctx.output_stream_data[i].last_pts = AV_NOPTS_VALUE;
app_ctx.output_stream_data[i].pts_delta = 0;
app_ctx.output_stream_data[i].nb_frames = 0;
in_stream = input_fmt_ctx->streams[i];
dec_ctx = app_ctx.dec_ctx[MAIN_INPUT_INDEX][i]; // based on main input
out_stream = avformat_new_stream(app_ctx.output_fmt_ctx, NULL);
if (!out_stream) {
timed_log(AV_LOG_ERROR, "Failed allocating output stream\n");
return AVERROR_UNKNOWN;
}
enc_ctx = NULL;
if (dec_ctx->codec_type == AVMEDIA_TYPE_VIDEO) {
// create the context for video encoder
enc_ctx = avcodec_alloc_context3(output_video_codec);
if (!enc_ctx) {
timed_log(AV_LOG_ERROR, "Could not allocate output video codec context\n");
return AVERROR(EINVAL);
}
enc_ctx->height = dec_ctx->height;
enc_ctx->width = dec_ctx->width;
enc_ctx->sample_aspect_ratio = dec_ctx->sample_aspect_ratio;
enc_ctx->pix_fmt = dec_ctx->pix_fmt;
// TODO: check wheter pix_format included in output_video_codec->pix_fmts,
// supported format list of video codec
enc_ctx->time_base = av_inv_q(dec_ctx->framerate);
enc_ctx->gop_size = 0; // intra only, but it is useless in case of rawvideo
av_opt_set_int(enc_ctx, "refcounted_frames", 1, 0);
ret = avcodec_open2(enc_ctx, output_video_codec, NULL);
if (ret < 0) {
timed_log(AV_LOG_ERROR, "Could not open output video codec: %s\n", av_err2str(ret));
return ret;
}
} else if (dec_ctx->codec_type == AVMEDIA_TYPE_AUDIO) {
// create the context for audio encoder
enc_ctx = avcodec_alloc_context3(output_audio_codec);
if (!enc_ctx) {
timed_log(AV_LOG_ERROR, "Could not allocate output audio codec context\n");
return AVERROR(EINVAL);
}
enc_ctx->sample_rate = dec_ctx->sample_rate;
enc_ctx->channel_layout = dec_ctx->channel_layout;
enc_ctx->channels = dec_ctx->channels;
// TODO: check by av_get_channel_layout_nb_channels(enc_ctx->channel_layout);
enc_ctx->sample_fmt = DEFAULT_EXPECTED_AUDIO_SAMPLE_FORMAT; // encoder->sample_fmts[0];
enc_ctx->time_base = (AVRational){1, enc_ctx->sample_rate};
ret = avcodec_open2(enc_ctx, output_audio_codec, NULL);
if (ret < 0) {
timed_log(AV_LOG_ERROR, "Could not open output audio codec: %s\n", av_err2str(ret));
return ret;
}
}
if (dec_ctx->codec_type == AVMEDIA_TYPE_VIDEO || dec_ctx->codec_type == AVMEDIA_TYPE_AUDIO) {
ret = avcodec_parameters_from_context(out_stream->codecpar, enc_ctx);
if (ret < 0) {
timed_log(AV_LOG_ERROR, "Failed to copy encoder parameters to output stream #%u\n", i);
return ret;
}
if (app_ctx.output_fmt_ctx->oformat->flags & AVFMT_GLOBALHEADER)
enc_ctx->flags |= AV_CODEC_FLAG_GLOBAL_HEADER;
out_stream->time_base = enc_ctx->time_base; // hint for the muxer
app_ctx.enc_ctx[i] = enc_ctx;
} else if (dec_ctx->codec_type == AVMEDIA_TYPE_UNKNOWN) {
timed_log(AV_LOG_FATAL, "Elementary stream #%d is of unknown type, cannot proceed\n", i);
return AVERROR_INVALIDDATA;
} else {
// this stream will be remuxed only
ret = avcodec_parameters_copy(out_stream->codecpar, in_stream->codecpar);
if (ret < 0) {
timed_log(AV_LOG_ERROR, "Copying codec parameters for elementary stream #%u failed\n", i);
return ret;
}
out_stream->time_base = in_stream->time_base;
}
app_ctx.enc_ctx[i] = enc_ctx;
}
av_dump_format(app_ctx.output_fmt_ctx, 0, app_ctx.output_filename, 1);
// open the output file, if needed by the format
if (!(app_ctx.output_fmt_ctx->oformat->flags & AVFMT_NOFILE)) {
ret = avio_open2(&app_ctx.output_fmt_ctx->pb, app_ctx.output_filename,
AVIO_FLAG_WRITE, NULL, &output_options);
if (ret < 0) {
timed_log(AV_LOG_ERROR, "Could not open '%s': %s\n",
app_ctx.output_filename, av_err2str(ret));
return ret;
}
}
// Write the stream header, if any
ret = avformat_write_header(app_ctx.output_fmt_ctx, &output_options);
if (ret < 0) {
timed_log(AV_LOG_ERROR, "Error occurred when opening output file: %s\n", av_err2str(ret));
return ret;
}
return 0;
}
static int calculate_new_ts_delta_values(void)
{
int i;
int64_t max_last_pts = AV_NOPTS_VALUE;
int max_index = -1;
// find the max last_pts, this will be the old output duration
for(i = 0; i < app_ctx.output_fmt_ctx->nb_streams; i++) {
if (app_ctx.output_stream_data[i].last_pts == AV_NOPTS_VALUE)
continue;
if (max_index == -1) {
max_index = i;
continue;
}
if (av_compare_ts(app_ctx.output_stream_data[i].last_pts,
app_ctx.output_fmt_ctx->streams[i]->time_base,
app_ctx.output_stream_data[max_index].last_pts,
app_ctx.output_fmt_ctx->streams[max_index]->time_base) > 0)
max_index = i;
}
if (max_index == -1) {
timed_log(AV_LOG_ERROR, "could not calculate new max pts\n");
return AVERROR(EINVAL);
}
// save here because we will clear somewhere in the next for loop
max_last_pts = app_ctx.output_stream_data[max_index].last_pts;
// calculate new delta by adding the max and then rescaling to new input time base
for(i = 0; i < app_ctx.output_fmt_ctx->nb_streams; i++) {
app_ctx.output_stream_data[i].pts_delta = av_rescale_q_rnd(max_last_pts,
app_ctx.output_fmt_ctx->streams[max_index]->time_base,
app_ctx.output_fmt_ctx->streams[i]->time_base,
AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX);
app_ctx.input_stream_data[i].first_pts = AV_NOPTS_VALUE;
}
return 0;
}
static int dry_current_input_pipeline(int input_source_index)
{
int i, ret;
for (i = 0; i < app_ctx.output_fmt_ctx->nb_streams; i++)
if ((ret = handle_received_packet(NULL, i, input_source_index)) < 0)
if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF)
timed_log(AV_LOG_WARNING, "Input #%d stream # %d problem on drying the pipeline: %s/n",
input_source_index, i, av_err2str(ret));
return 0;
}
static int handle_input(int input_source_index)
{
int i, ret, eof_input = 0, error_input = 0, input_reopen_counter = 0;
AVPacket input_packet;
int64_t dts_delta_time = 0;
timed_log(AV_LOG_INFO, "Input #%d thread started\n", input_source_index);
while (!app_ctx.to_exit) { // almost for ever
int to_set_dts_delta_time = 1;
// read packets continouosly from input
while (!app_ctx.to_exit) {
ret = av_read_frame(app_ctx.input_fmt_ctx[input_source_index], &input_packet);
if (ret < 0) {
if (ret == AVERROR_EOF) {
eof_input = 1;
timed_log(AV_LOG_INFO, "input #%d eof detected by av_read_frame\n",
input_source_index);
} else {
error_input = 1;
timed_log(AV_LOG_ERROR, "input #%d av_read_frame returned: %s\n",
input_source_index, av_err2str(ret));
}
break;
}
if (input_packet.stream_index >= app_ctx.input_fmt_ctx[input_source_index]->nb_streams)
timed_log(AV_LOG_WARNING, "Input #%d unexpected stream index: %d\n",
input_source_index, input_packet.stream_index);
else {
// ensuring realtime processing
if (input_packet.dts != AV_NOPTS_VALUE) {
int64_t dts_time = av_rescale_q(input_packet.dts,
app_ctx.input_fmt_ctx[input_source_index]->streams[input_packet.stream_index]->time_base,
AV_TIME_BASE_Q);
int64_t now_us = av_gettime_relative();
int64_t sleep_us = dts_time - now_us + dts_delta_time;
if (to_set_dts_delta_time) {
to_set_dts_delta_time = 0;
dts_delta_time = now_us - dts_time;
sleep_us = 0;
}
if (abs(sleep_us) > app_ctx.input_stream_time_discnt_thrshd_us) {
timed_log(AV_LOG_INFO,
"Input #%d time discontinuity detected: %"PRIi64"us (limit: %dus), packet wallclock timestamp: %"PRIi64
", delta: %"PRIi64"us\n",
input_source_index, sleep_us, app_ctx.input_stream_time_discnt_thrshd_us, dts_time, dts_delta_time);
sleep_us = 0;
dts_delta_time = now_us - dts_time;
}
if (sleep_us > app_ctx.input_stream_time_discnt_thrshd_us) {
timed_log(AV_LOG_WARNING, "Input %d Too long sleeping time: %"PRIi64", truncate to %d\n",
input_source_index, sleep_us, app_ctx.input_stream_time_discnt_thrshd_us);
sleep_us = app_ctx.input_stream_time_discnt_thrshd_us;
}
if (sleep_us > 0) {
timed_log(AV_LOG_DEBUG, "Input #%d sleeping %"PRIi64"us to simulate realtime receiving\n",
input_source_index, sleep_us);
for(;sleep_us > app_ctx.input_timeout_ms * 500; sleep_us -= app_ctx.input_timeout_ms * 500) // 500 = 1000/2
av_usleep(sleep_us);
av_usleep(sleep_us);
}
}
if (app_ctx.dec_ctx[input_source_index][input_packet.stream_index]->codec_type == AVMEDIA_TYPE_VIDEO ||
app_ctx.dec_ctx[input_source_index][input_packet.stream_index]->codec_type == AVMEDIA_TYPE_AUDIO) {
if ((ret = handle_received_packet(&input_packet, input_packet.stream_index, input_source_index)) < 0)
if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF)
break;
} else if (app_ctx.input_source_index == input_source_index && !pthread_mutex_trylock(&app_ctx.encoder_mutex) ) {
app_ctx.input_has_new_frame[input_source_index] = 1;
/* remux this frame without reencoding */
av_packet_rescale_ts(&input_packet,
app_ctx.input_fmt_ctx[input_source_index]->streams[input_packet.stream_index]->time_base,
app_ctx.output_fmt_ctx->streams[input_packet.stream_index]->time_base);
ret = av_interleaved_write_frame(app_ctx.output_fmt_ctx, &input_packet);
pthread_mutex_unlock(&app_ctx.encoder_mutex);
if (ret < 0) {
app_ctx.to_exit = 1;
break;
}
}
}
av_packet_unref(&input_packet);
}
if (!app_ctx.to_exit && (eof_input || error_input)) {
timed_log(AV_LOG_INFO, "Gonna reopen Input #%d, ocasion: #%d\n",
input_source_index, ++input_reopen_counter);
// dry current pipeline
dry_current_input_pipeline(input_source_index);
// close input
for (i = 0; i < app_ctx.output_fmt_ctx->nb_streams; i++)
avcodec_free_context(&app_ctx.dec_ctx[input_source_index][i]);
avformat_close_input(&app_ctx.input_fmt_ctx[input_source_index]);
eof_input = 0;
error_input = 0;
if (try_to_reopen_input(input_source_index) < 0) {
break;
}
}
}
if (!app_ctx.to_exit && eof_input) {
// dry current pipeline
dry_current_input_pipeline(input_source_index);
}
return 0;
}
static void *threaded_input_handler(void * arg)
{
int input_source_index = *(int *)arg;
handle_input(input_source_index);
pthread_exit(NULL);
}
static int write_out_new_state_log(void)
{
char tmp_state_file[1024];
int ret, i;
FILE * outfile;
if (!app_ctx.state_file || !app_ctx.state_file[0])
return 0; // no state file was specified
if ((ret = snprintf(tmp_state_file, sizeof(tmp_state_file), "%s.tmp", app_ctx.state_file)) <= 0) {
timed_log(AV_LOG_ERROR, "could not create state file name: %s", av_err2str(ret));
return ret;
}
outfile = fopen(tmp_state_file, "wb");
if (!outfile) {
timed_log(AV_LOG_ERROR, "could not open tmp state file: %s", av_err2str(errno));
return errno;
}
if (app_ctx.input_source_index == -1)
ret = fprintf(outfile, "o:_:");
else
ret = fprintf(outfile, "o:%d:", app_ctx.input_source_index);
if (ret < 0) {
timed_log(AV_LOG_ERROR, "Could not write into tmp state file (%s): %s\n",
tmp_state_file, av_err2str(ret));
fclose(outfile);
return ret;
}
ret = fprintf(outfile, "%d:%"PRId64, app_ctx.input_failover_counter,
(av_gettime() - app_ctx.current_source_index_state_time)/1000);
if (ret < 0) {
timed_log(AV_LOG_ERROR, "Could not write into tmp state file (%s): %s\n",
tmp_state_file, av_err2str(ret));
fclose(outfile);
return ret;
}
for (i = 0; i < NB_INPUTS; i++) {
ret = fprintf(outfile, " %d:%d", i, !app_ctx.input_has_new_frame[i]?0:1);
if (ret < 0) {
timed_log(AV_LOG_ERROR, "Could not write into tmp state file (%s): %s\n",
tmp_state_file, av_err2str(ret));
fclose(outfile);
return ret;
}
}
ret = fprintf(outfile, "\n");
if (ret < 0) {
timed_log(AV_LOG_ERROR, "Could not write into tmp state file (%s): %s\n",
tmp_state_file, av_err2str(ret));
fclose(outfile);
return ret;
}
fclose(outfile);
if (rename(tmp_state_file, app_ctx.state_file) < 0) {
timed_log(AV_LOG_ERROR, "Could not rename state file (%s => %s): %s\n",
tmp_state_file, app_ctx.state_file, av_err2str(errno));
return errno;
}
return 0;
}
static void main_loop(void)
{
int i;
int64_t last_input_check_time = av_gettime_relative();
write_out_new_state_log();
app_ctx.current_source_index_state_time = last_input_check_time;
while (!app_ctx.to_exit) { // almost for ever
int64_t now_us = av_gettime_relative();
int64_t check_interval = now_us - last_input_check_time;
if (check_interval > app_ctx.input_timeout_ms * 1000) {
last_input_check_time = now_us;
if (app_ctx.input_source_index == MAIN_INPUT_INDEX && app_ctx.input_has_new_frame[MAIN_INPUT_INDEX]) {
// normal case
timed_log(AV_LOG_DEBUG, "Checking running main input: ok, in last %"PRIi64"us \n", check_interval);
} else if (app_ctx.input_source_index != MAIN_INPUT_INDEX && app_ctx.input_has_new_frame[MAIN_INPUT_INDEX]) {
if (!pthread_mutex_lock(&app_ctx.encoder_mutex)) {
if (app_ctx.input_source_index >= 0) {
timed_log(AV_LOG_INFO, "#%d switching back to main input because new frame arrived\n",
app_ctx.input_failover_counter);
calculate_new_ts_delta_values();
} else
timed_log(AV_LOG_INFO, "Switching to main input\n");
app_ctx.input_source_index = MAIN_INPUT_INDEX;
app_ctx.current_source_index_state_time = av_gettime();
pthread_mutex_unlock(&app_ctx.encoder_mutex);
} else
timed_log(AV_LOG_ERROR, "Could not lock encoder_mutex for input switching\n");
} else if (app_ctx.input_source_index != SECOND_INPUT_INDEX && app_ctx.input_has_new_frame[SECOND_INPUT_INDEX]) {
if (!pthread_mutex_lock(&app_ctx.encoder_mutex)) {
if (app_ctx.input_source_index >= 0) {
app_ctx.input_failover_counter++;
timed_log(AV_LOG_INFO, "#%d switching to second input, now new frame on Input #%d in last %"PRIi64"us\n",
app_ctx.input_failover_counter, MAIN_INPUT_INDEX, check_interval);
calculate_new_ts_delta_values();
} else
timed_log(AV_LOG_INFO, "Switching to second input\n");
app_ctx.input_source_index = SECOND_INPUT_INDEX;
app_ctx.current_source_index_state_time = av_gettime();
pthread_mutex_unlock(&app_ctx.encoder_mutex);
} else
timed_log(AV_LOG_ERROR, "Could not lock encoder_mutex for input switching\n");
}
write_out_new_state_log();
for (i = 0; i < NB_INPUTS; i++)
app_ctx.input_has_new_frame[i] = 0;
}
av_usleep(app_ctx.input_timeout_ms * 250); // 250 = 1000 / 4
}
}
int main(int argc, char **argv)
{
int ret, i, k;
pthread_attr_t attr;
app_ctx.start = av_gettime();
av_log_set_level(DEFAULT_LOG_LEVEL);
// read and check command line parameters
if (read_parameters(argc, argv) < 0)
exit(1);
avformat_network_init();
avfilter_register_all();
app_ctx.input_source_index = -1; // none
app_ctx.to_exit = 0;
// For portability, explicitly create threads in a joinable state
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for (k = 0; k < NB_INPUTS && !app_ctx.to_exit; k++) {
if ((ret = open_single_input(k)) < 0) // open input
goto end;
app_ctx.input_has_new_frame[k] = 0;
if ((ret = pthread_create(&app_ctx.input_threads[k], &attr, threaded_input_handler, (void *) &app_ctx.thread_id[k]))) {
timed_log(AV_LOG_ERROR, "return code from #%d pthread_create() is %d\n", k, ret);
goto end;
}
}
if ((ret = check_input_streams_matching()) < 0)
goto end;
if (open_output() < 0)
goto end;
main_loop();
if (!pthread_mutex_lock(&app_ctx.encoder_mutex)) {
app_ctx.input_source_index = -1;
pthread_mutex_unlock(&app_ctx.encoder_mutex);
}
av_write_trailer(app_ctx.output_fmt_ctx);
if (!(app_ctx.output_fmt_ctx->oformat->flags & AVFMT_NOFILE))
avio_closep(&app_ctx.output_fmt_ctx->pb);
end:
app_ctx.to_exit = 1;
// wait all input thread to terminate
for (k = 0; k < NB_INPUTS; k++)
pthread_join(app_ctx.input_threads[k], NULL);
for (k = 0; k < NB_INPUTS; k++) {
for (i = 0; app_ctx.output_fmt_ctx &&
i < app_ctx.output_fmt_ctx->nb_streams; i++)
avcodec_free_context(&app_ctx.dec_ctx[k][i]);
avformat_close_input(&app_ctx.input_fmt_ctx[k]);
}
for (i = 0; app_ctx.output_fmt_ctx && i < app_ctx.output_fmt_ctx->nb_streams; i++)
avcodec_free_context(&app_ctx.enc_ctx[i]);
avformat_close_input(&app_ctx.output_fmt_ctx);
avformat_free_context(app_ctx.output_fmt_ctx);
avformat_network_deinit();
pthread_mutex_destroy(&app_ctx.encoder_mutex);
exit(0);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment