Skip to content

Instantly share code, notes, and snippets.

@charneykaye
Last active December 8, 2023 06:18
Show Gist options
  • Save charneykaye/b1e21c7d048226c7d66fe948536cb487 to your computer and use it in GitHub Desktop.
Save charneykaye/b1e21c7d048226c7d66fe948536cb487 to your computer and use it in GitHub Desktop.
Java class to HLS live stream via inner ffmpeg
// by Charney Kaye <charneykaye.com>
package com.charneykaye;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sound.sampled.AudioFileFormat;
import javax.sound.sampled.AudioFormat;
import javax.sound.sampled.AudioInputStream;
import javax.sound.sampled.AudioSystem;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
//
// See also: Supporting Classes
//
// AudioSample.java
// https://gist.github.com/charneykaye/1d67f4996469752572cce8d8d2f5979a
//
// StreamLogger.java
// https://gist.github.com/charneykaye/780b97305e0094b6126666859da08589
//
public class StreamEncoder {
private static final Logger LOG = LoggerFactory.getLogger(StreamEncoder.class);
private static final String THREAD_NAME = "ffmpeg";
private final AudioFormat format;
private final ConcurrentLinkedQueue<ByteBuffer> queue = new ConcurrentLinkedQueue<>();
private final String playlistPath;
private Process ffmpeg;
private volatile boolean active = true;
public StreamEncoder(
AudioFormat format,
String shipKey,
String tempFilePathPrefix, // eg /tmp/
int bitrate, // eg 128000
int hlsSegmentSeconds, // eg 10
int initialOffset
) {
this.format = format;
playlistPath = String.format("%s%s.m3u8", tempFilePathPrefix, shipKey);
CompletableFuture.runAsync(() -> {
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
currentThread.setName(THREAD_NAME);
try {
ProcessBuilder builder = new ProcessBuilder(List.of(
"ffmpeg",
"-v", "verbose",
"-i", "pipe:0",
"-f", "hls",
"-ac", "2",
"-c:a", "mp2",
"-b:a", kilos(bitrate),
"-minrate", kilos(bitrate),
"-maxrate", kilos(bitrate),
"-hls_playlist_type", "event",
"-hls_segment_filename", String.format("%s%s-%%d.ts", tempFilePathPrefix, shipKey),
"-hls_time", String.valueOf(hlsSegmentSeconds),
"-initial_offset", String.valueOf(initialOffset),
"-start_number", String.valueOf(initialOffset),
playlistPath
));
builder.redirectErrorStream(true);
ffmpeg = builder.start();
// Start consumer to read the error and output streams
CompletableFuture.runAsync(new StreamLogger(LOG, ffmpeg.getInputStream()));
// Start the audio stream of data to ffmpeg. Write the WAV header once
try (AudioInputStream ais = new AudioInputStream(new ByteArrayInputStream(new byte[0]), format, 0)) {
AudioSystem.write(ais, AudioFileFormat.Type.WAVE, ffmpeg.getOutputStream());
}
while (active) {
var bytes = queue.poll();
if (Objects.isNull(bytes)) continue;
if (!ffmpeg.isAlive()) {
LOG.error("Exited with code {}", ffmpeg.exitValue());
active = false;
continue;
}
ffmpeg.getOutputStream().write(bytes.array());
LOG.info("received {} bytes of audio data", bytes.array().length);
}
} catch (IOException e) {
LOG.error("Failed while streaming bytes to ffmpeg!", e);
} finally {
currentThread.setName(oldName);
if (Objects.nonNull(ffmpeg)) ffmpeg.destroy();
}
});
}
/**
Get the "kilos" representation of an integer, as in 128k for 128000
@param value for which to get kilos
@return kilos representation
*/
static String kilos(int value) {
return String.format("%d%s", (int) Math.floor((double) value / 1000), "k");
}
public double[][] append(double[][] samples) throws Exception {
if (!active)
return samples;
queue.add(AudioSample.byteBufferOf(format, samples));
return samples;
}
public void close() {
ffmpeg.destroy();
active = false;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment