Skip to content

Instantly share code, notes, and snippets.

@gai6948
Created February 2, 2021 12:44
Show Gist options
  • Save gai6948/d0bc35a9875578585747ad1cf9a358e5 to your computer and use it in GitHub Desktop.
Save gai6948/d0bc35a9875578585747ad1cf9a358e5 to your computer and use it in GitHub Desktop.
Java example to parse frame from Kinesis Video Stream using the official KVS parser library, used in conjunction with KvsH264FrameProcessor
package com.example.videomonitoring.kvsframeparser;
import java.io.IOException;
import java.io.InputStream;
import java.util.Optional;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.kinesisvideo.parser.examples.ContinuousGetMediaWorker;
import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitException;
import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadataVisitor;
import com.amazonaws.kinesisvideo.parser.utilities.FrameVisitor;
import com.amazonaws.kinesisvideo.parser.utilities.consumer.FragmentMetadataCallback;
import com.amazonaws.kinesisvideo.parser.utilities.consumer.GetMediaResponseStreamConsumer;
import com.amazonaws.kinesisvideo.parser.utilities.consumer.GetMediaResponseStreamConsumerFactory;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesisvideo.AmazonKinesisVideo;
import com.amazonaws.services.kinesisvideo.AmazonKinesisVideoClientBuilder;
import com.amazonaws.services.kinesisvideo.model.StartSelector;
import com.amazonaws.services.kinesisvideo.model.StartSelectorType;
// import software.amazon.codeguruprofilerjavaagent.Profiler;
public class KvsConsumer {
public static void main(String[] args) {
final String videoStreamName = System.getenv("CAMERA_NAME");
final String dataStreamName = System.getenv("DATA_STREAM_NAME");
final String region = System.getenv("AWS_DEFAULT_REGION");
final int processFPS = Integer.valueOf(System.getenv("PROCESS_RATE_IN_FPS")).intValue();
// final String profilerGroupName = System.getenv("PROFILERGROUP_NAME");
// Profiler.builder().profilingGroupName(profilerGroupName).withHeapSummary(true).build().start();
final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
final AmazonKinesisVideo amazonKinesisVideo = AmazonKinesisVideoClientBuilder.standard().withRegion(region)
.build();
final AmazonKinesis kinesisClient = AmazonKinesisClientBuilder.standard().withRegion(region).build();
GetMediaResponseStreamConsumerFactory consumerFactory = new GetMediaResponseStreamConsumerFactory() {
@Override
public GetMediaResponseStreamConsumer createConsumer() throws IOException {
return new GetMediaResponseStreamConsumer() {
@Override
public void process(InputStream inputStream, FragmentMetadataCallback callback)
throws MkvElementVisitException, IOException {
processWithFragmentEndCallbacks(inputStream, callback,
FrameVisitor.create(
new KvsH264FrameProcessor(kinesisClient, dataStreamName, videoStreamName,
processFPS),
Optional.of(new FragmentMetadataVisitor.BasicMkvTagProcessor())));
}
};
}
};
ContinuousGetMediaWorker getMediaWorker = ContinuousGetMediaWorker.create(Regions.fromName(region),
credentialsProvider, videoStreamName, new StartSelector().withStartSelectorType(StartSelectorType.NOW),
amazonKinesisVideo, consumerFactory);
// Long running task
getMediaWorker.run();
}
}
package com.example.videomonitoring.kvsframeparser;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.awt.image.DataBufferByte;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import com.amazonaws.kinesisvideo.parser.mkv.Frame;
import com.amazonaws.kinesisvideo.parser.mkv.FrameProcessException;
import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadata;
import com.amazonaws.kinesisvideo.parser.utilities.H264FrameDecoder;
import com.amazonaws.kinesisvideo.parser.utilities.MkvTrackMetadata;
import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadataVisitor.MkvTagProcessor;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
// import com.google.common.primitives.Longs;
import com.amazonaws.services.kinesis.model.PutRecordResult;
import lombok.extern.slf4j.Slf4j;
/**
* A class provided to the KVS Parser Library that captures the frame and its
* producer timestamp, pass them as bytestring to a Kinesis Data Stream
*
* @param kinesisClient The Kinesis Data Stream Async Client
*/
@Slf4j
public class KvsH264FrameProcessor extends H264FrameDecoder {
final AmazonKinesis kinesisClient;
final String dataStreamName;
final String videoStreamName;
final int processFPS;
static long lastProcessedFrameTimeStamp = 0;
public KvsH264FrameProcessor(AmazonKinesis kinesisClient, String dataStreamName, String videoStreamName, int processFPS) {
this.kinesisClient = kinesisClient;
this.dataStreamName = dataStreamName;
this.videoStreamName = videoStreamName;
this.processFPS = processFPS;
}
@Override
public void process(Frame frame, MkvTrackMetadata trackMetadata, Optional<FragmentMetadata> fragmentMetadata,
Optional<MkvTagProcessor> tagProcessor) throws FrameProcessException {
// Calculating timestamp of the frame
int frameTimeDelta = frame.getTimeCode();
long fragmentStartTime = fragmentMetadata.get().getProducerSideTimestampMillis();
long frameTimeStamp = fragmentStartTime + frameTimeDelta;
// Process frame at specified fps, ignore frame that is 1000/fps ms from latest processed timestamp
if (frameTimeStamp > lastProcessedFrameTimeStamp + 1000/processFPS) {
lastProcessedFrameTimeStamp = frameTimeStamp;
// Obtain size of the frame
int frameWidth = trackMetadata.getPixelWidth().get().intValue();
int frameHeight = trackMetadata.getPixelHeight().get().intValue();
// Decode frame with H264 codecs
final BufferedImage bufferedImage = decodeH264Frame(frame, trackMetadata);
// byte[] frameData = ((DataBufferByte)bufferedImage.getData().getDataBuffer()).getData();
// Put record into Kinesis Data Stream
try {
byte[] frameData = toByteArray(bufferedImage);
putRecord(frameData, frameTimeStamp, frameWidth, frameHeight);
} catch (IOException e) {
System.out.println("Exception: " + e);
}
// DEBUG ONLY
// try {
// byte[] frameData = toByteArray(bufferedImage);
// String filepath = "/Users/gaiuscyw/Projects/video-monitoring-solution/cdk/src/lambda/ppe-detector-function/raw-java-frame.png";
// File file = new File(filepath);
// OutputStream os = new FileOutputStream(file);
// os.write(frameData);
// os.close();
// System.out.println("Frame written");
// System.out.println(frameData.length);
// } catch (IOException e) {
// System.out.println("Exception: " + e);
// }
}
}
private byte[] toByteArray(BufferedImage img) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ImageIO.write(img, "png", baos);
byte[] bytes = baos.toByteArray();
return bytes;
}
private void putRecord(byte[] frameData, long frameTimeStamp, int frameWidth, int frameHeight) {
// Embed timestamp, frame dimension along with frame data in a blob and put to Kinesis
// First 13 bytes belong to timestamp (utf-8 encoded)
String timeStamp = String.valueOf(frameTimeStamp);
byte[] timestampByte = timeStamp.getBytes(StandardCharsets.UTF_8);
// Width and Height of the frame each takes 4 bytes
ByteBuffer widthBuffer = ByteBuffer.allocate(4);
ByteBuffer heightBuffer = ByteBuffer.allocate(4);
widthBuffer.putInt(frameWidth);
heightBuffer.putInt(frameHeight);
byte[] widthByte = widthBuffer.array();
byte[] heightByte = heightBuffer.array();
System.out.println(frameWidth);
System.out.println(frameHeight);
// Package the blob
byte[] data = new byte[timestampByte.length + widthByte.length + heightByte.length + frameData.length];
int offset = 0;
System.arraycopy(timestampByte, 0, data, offset, timestampByte.length);
offset += timestampByte.length;
System.arraycopy(widthByte, 0, data, offset, widthByte.length);
offset += widthByte.length;
System.arraycopy(heightByte, 0, data, offset, heightByte.length);
offset += heightByte.length;
System.arraycopy(frameData, 0, data, offset, frameData.length);
PutRecordRequest request = new PutRecordRequest()
.withStreamName(dataStreamName)
.withPartitionKey(videoStreamName)
.withSequenceNumberForOrdering(timeStamp)
.withData(ByteBuffer.wrap(data));
try {
PutRecordResult kinesisResult = kinesisClient.putRecord(request);
log.info("Frame of size {} KB and with producer timestamp {} put to Kinesis Data Stream, shard {} sequence number {}", (frameData.length)/1000, frameTimeStamp, kinesisResult.getShardId(), kinesisResult.getSequenceNumber());
} catch (Exception e) {
log.error("Error putting record into Kinesis: {}", e.getMessage());
}
}
@Override
public void close() {
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment