Skip to content

Instantly share code, notes, and snippets.

@gai6948
Created January 26, 2021 04:50
Show Gist options
  • Save gai6948/57545d500ee5d28084e5fe8e40a19399 to your computer and use it in GitHub Desktop.
Save gai6948/57545d500ee5d28084e5fe8e40a19399 to your computer and use it in GitHub Desktop.
Sample code parsing video frame in KVS using KVS Java Parser Library
package com.example.videomonitoring.kvsframeparser;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.awt.image.DataBufferByte;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import com.amazonaws.kinesisvideo.parser.mkv.Frame;
import com.amazonaws.kinesisvideo.parser.mkv.FrameProcessException;
import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadata;
import com.amazonaws.kinesisvideo.parser.utilities.H264FrameDecoder;
import com.amazonaws.kinesisvideo.parser.utilities.MkvTrackMetadata;
import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadataVisitor.MkvTagProcessor;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
// import com.google.common.primitives.Longs;
import com.amazonaws.services.kinesis.model.PutRecordResult;
import lombok.extern.slf4j.Slf4j;
/**
* A class provided to the KVS Parser Library that captures the frame and its
* producer timestamp, pass them as bytestring to a Kinesis Data Stream
*
* @param kinesisClient The Kinesis Data Stream Async Client
*/
@Slf4j
public class KvsH264FrameProcessor extends H264FrameDecoder {
final AmazonKinesis kinesisClient;
final String dataStreamName;
final String videoStreamName;
final int processFPS;
static long lastProcessedFrameTimeStamp = 0;
public KvsH264FrameProcessor(AmazonKinesis kinesisClient, String dataStreamName, String videoStreamName, int processFPS) {
this.kinesisClient = kinesisClient;
this.dataStreamName = dataStreamName;
this.videoStreamName = videoStreamName;
this.processFPS = processFPS;
}
@Override
public void process(Frame frame, MkvTrackMetadata trackMetadata, Optional<FragmentMetadata> fragmentMetadata,
Optional<MkvTagProcessor> tagProcessor) throws FrameProcessException {
// Calculating timestamp of the frame
int frameTimeDelta = frame.getTimeCode();
long fragmentStartTime = fragmentMetadata.get().getProducerSideTimestampMillis();
long frameTimeStamp = fragmentStartTime + frameTimeDelta;
// Process frame at specified fps, ignore frame that is 1000/fps ms from latest processed timestamp
if (frameTimeStamp > lastProcessedFrameTimeStamp + 1000/processFPS) {
lastProcessedFrameTimeStamp = frameTimeStamp;
// Obtain size of the frame
int frameWidth = trackMetadata.getPixelWidth().get().intValue();
int frameHeight = trackMetadata.getPixelHeight().get().intValue();
// Decode frame with H264 codecs
final BufferedImage bufferedImage = decodeH264Frame(frame, trackMetadata);
// byte[] frameData = ((DataBufferByte)bufferedImage.getData().getDataBuffer()).getData();
// Put record into Kinesis Data Stream
try {
byte[] frameData = toByteArray(bufferedImage);
putRecord(frameData, frameTimeStamp, frameWidth, frameHeight);
} catch (IOException e) {
System.out.println("Exception: " + e);
}
// DEBUG ONLY
// try {
// byte[] frameData = toByteArray(bufferedImage);
// String filepath = "/Users/gaiuscyw/Projects/video-monitoring-solution/cdk/src/lambda/ppe-detector-function/raw-java-frame.png";
// File file = new File(filepath);
// OutputStream os = new FileOutputStream(file);
// os.write(frameData);
// os.close();
// System.out.println("Frame written");
// System.out.println(frameData.length);
// } catch (IOException e) {
// System.out.println("Exception: " + e);
// }
}
}
private byte[] toByteArray(BufferedImage img) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ImageIO.write(img, "png", baos);
byte[] bytes = baos.toByteArray();
return bytes;
}
private void putRecord(byte[] frameData, long frameTimeStamp, int frameWidth, int frameHeight) {
// Embed timestamp, frame dimension along with frame data in a blob and put to Kinesis
// First 13 bytes belong to timestamp (utf-8 encoded)
String timeStamp = String.valueOf(frameTimeStamp);
byte[] timestampByte = timeStamp.getBytes(StandardCharsets.UTF_8);
// Width and Height of the frame each takes 4 bytes
ByteBuffer widthBuffer = ByteBuffer.allocate(4);
ByteBuffer heightBuffer = ByteBuffer.allocate(4);
widthBuffer.putInt(frameWidth);
heightBuffer.putInt(frameHeight);
byte[] widthByte = widthBuffer.array();
byte[] heightByte = heightBuffer.array();
System.out.println(frameWidth);
System.out.println(frameHeight);
// Package the blob
byte[] data = new byte[timestampByte.length + widthByte.length + heightByte.length + frameData.length];
int offset = 0;
System.arraycopy(timestampByte, 0, data, offset, timestampByte.length);
offset += timestampByte.length;
System.arraycopy(widthByte, 0, data, offset, widthByte.length);
offset += widthByte.length;
System.arraycopy(heightByte, 0, data, offset, heightByte.length);
offset += heightByte.length;
System.arraycopy(frameData, 0, data, offset, frameData.length);
PutRecordRequest request = new PutRecordRequest()
.withStreamName(dataStreamName)
.withPartitionKey(videoStreamName)
.withSequenceNumberForOrdering(timeStamp)
.withData(ByteBuffer.wrap(data));
try {
PutRecordResult kinesisResult = kinesisClient.putRecord(request);
log.info("Frame of size {} KB and with producer timestamp {} put to Kinesis Data Stream, shard {} sequence number {}", (frameData.length)/1000, frameTimeStamp, kinesisResult.getShardId(), kinesisResult.getSequenceNumber());
} catch (Exception e) {
log.error("Error putting record into Kinesis: {}", e.getMessage());
}
}
@Override
public void close() {
}
}
@gai6948
Copy link
Author

gai6948 commented Mar 31, 2021

Thanks for the quick response.
My Requirement is I need to read frame by frame data when live streaming happens in Kinesis video stream.
any suggestion it will be helpful to me

I am solving the same exact problem, my solution is to have a container running a consumer process as long as there is video in KVS.
What I wrote was just two Java classes, the one above (KvsH264FrameProcessor.java) and this one KvsConsumer.java

Congrats for the efforts you put to solve it.
thanks for providing consumer class and i will also try it

Please let me know the update on this once it is successfully implemented

I have recently used Python to produce video into KVS, using the Gstreamer plugin included in the C++ SDK, I highly suggest you try with the C++ SDK first, because the Java SDK is also built on top of the C++ SDK. If your camera works with V4L2 or GStreamer, then it should work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment