Created
January 26, 2021 04:50
-
-
Save gai6948/57545d500ee5d28084e5fe8e40a19399 to your computer and use it in GitHub Desktop.
Sample code parsing video frame in KVS using KVS Java Parser Library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.videomonitoring.kvsframeparser; | |
import java.nio.ByteBuffer; | |
import java.nio.charset.StandardCharsets; | |
import java.util.Optional; | |
import javax.imageio.ImageIO; | |
import java.awt.image.BufferedImage; | |
import java.awt.image.DataBufferByte; | |
import java.io.ByteArrayOutputStream; | |
import java.io.File; | |
import java.io.FileOutputStream; | |
import java.io.IOException; | |
import java.io.OutputStream; | |
import com.amazonaws.kinesisvideo.parser.mkv.Frame; | |
import com.amazonaws.kinesisvideo.parser.mkv.FrameProcessException; | |
import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadata; | |
import com.amazonaws.kinesisvideo.parser.utilities.H264FrameDecoder; | |
import com.amazonaws.kinesisvideo.parser.utilities.MkvTrackMetadata; | |
import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadataVisitor.MkvTagProcessor; | |
import com.amazonaws.services.kinesis.AmazonKinesis; | |
import com.amazonaws.services.kinesis.model.PutRecordRequest; | |
// import com.google.common.primitives.Longs; | |
import com.amazonaws.services.kinesis.model.PutRecordResult; | |
import lombok.extern.slf4j.Slf4j; | |
/** | |
* A class provided to the KVS Parser Library that captures the frame and its | |
* producer timestamp, pass them as bytestring to a Kinesis Data Stream | |
* | |
* @param kinesisClient The Kinesis Data Stream Async Client | |
*/ | |
@Slf4j | |
public class KvsH264FrameProcessor extends H264FrameDecoder { | |
final AmazonKinesis kinesisClient; | |
final String dataStreamName; | |
final String videoStreamName; | |
final int processFPS; | |
static long lastProcessedFrameTimeStamp = 0; | |
public KvsH264FrameProcessor(AmazonKinesis kinesisClient, String dataStreamName, String videoStreamName, int processFPS) { | |
this.kinesisClient = kinesisClient; | |
this.dataStreamName = dataStreamName; | |
this.videoStreamName = videoStreamName; | |
this.processFPS = processFPS; | |
} | |
@Override | |
public void process(Frame frame, MkvTrackMetadata trackMetadata, Optional<FragmentMetadata> fragmentMetadata, | |
Optional<MkvTagProcessor> tagProcessor) throws FrameProcessException { | |
// Calculating timestamp of the frame | |
int frameTimeDelta = frame.getTimeCode(); | |
long fragmentStartTime = fragmentMetadata.get().getProducerSideTimestampMillis(); | |
long frameTimeStamp = fragmentStartTime + frameTimeDelta; | |
// Process frame at specified fps, ignore frame that is 1000/fps ms from latest processed timestamp | |
if (frameTimeStamp > lastProcessedFrameTimeStamp + 1000/processFPS) { | |
lastProcessedFrameTimeStamp = frameTimeStamp; | |
// Obtain size of the frame | |
int frameWidth = trackMetadata.getPixelWidth().get().intValue(); | |
int frameHeight = trackMetadata.getPixelHeight().get().intValue(); | |
// Decode frame with H264 codecs | |
final BufferedImage bufferedImage = decodeH264Frame(frame, trackMetadata); | |
// byte[] frameData = ((DataBufferByte)bufferedImage.getData().getDataBuffer()).getData(); | |
// Put record into Kinesis Data Stream | |
try { | |
byte[] frameData = toByteArray(bufferedImage); | |
putRecord(frameData, frameTimeStamp, frameWidth, frameHeight); | |
} catch (IOException e) { | |
System.out.println("Exception: " + e); | |
} | |
// DEBUG ONLY | |
// try { | |
// byte[] frameData = toByteArray(bufferedImage); | |
// String filepath = "/Users/gaiuscyw/Projects/video-monitoring-solution/cdk/src/lambda/ppe-detector-function/raw-java-frame.png"; | |
// File file = new File(filepath); | |
// OutputStream os = new FileOutputStream(file); | |
// os.write(frameData); | |
// os.close(); | |
// System.out.println("Frame written"); | |
// System.out.println(frameData.length); | |
// } catch (IOException e) { | |
// System.out.println("Exception: " + e); | |
// } | |
} | |
} | |
private byte[] toByteArray(BufferedImage img) throws IOException { | |
ByteArrayOutputStream baos = new ByteArrayOutputStream(); | |
ImageIO.write(img, "png", baos); | |
byte[] bytes = baos.toByteArray(); | |
return bytes; | |
} | |
private void putRecord(byte[] frameData, long frameTimeStamp, int frameWidth, int frameHeight) { | |
// Embed timestamp, frame dimension along with frame data in a blob and put to Kinesis | |
// First 13 bytes belong to timestamp (utf-8 encoded) | |
String timeStamp = String.valueOf(frameTimeStamp); | |
byte[] timestampByte = timeStamp.getBytes(StandardCharsets.UTF_8); | |
// Width and Height of the frame each takes 4 bytes | |
ByteBuffer widthBuffer = ByteBuffer.allocate(4); | |
ByteBuffer heightBuffer = ByteBuffer.allocate(4); | |
widthBuffer.putInt(frameWidth); | |
heightBuffer.putInt(frameHeight); | |
byte[] widthByte = widthBuffer.array(); | |
byte[] heightByte = heightBuffer.array(); | |
System.out.println(frameWidth); | |
System.out.println(frameHeight); | |
// Package the blob | |
byte[] data = new byte[timestampByte.length + widthByte.length + heightByte.length + frameData.length]; | |
int offset = 0; | |
System.arraycopy(timestampByte, 0, data, offset, timestampByte.length); | |
offset += timestampByte.length; | |
System.arraycopy(widthByte, 0, data, offset, widthByte.length); | |
offset += widthByte.length; | |
System.arraycopy(heightByte, 0, data, offset, heightByte.length); | |
offset += heightByte.length; | |
System.arraycopy(frameData, 0, data, offset, frameData.length); | |
PutRecordRequest request = new PutRecordRequest() | |
.withStreamName(dataStreamName) | |
.withPartitionKey(videoStreamName) | |
.withSequenceNumberForOrdering(timeStamp) | |
.withData(ByteBuffer.wrap(data)); | |
try { | |
PutRecordResult kinesisResult = kinesisClient.putRecord(request); | |
log.info("Frame of size {} KB and with producer timestamp {} put to Kinesis Data Stream, shard {} sequence number {}", (frameData.length)/1000, frameTimeStamp, kinesisResult.getShardId(), kinesisResult.getSequenceNumber()); | |
} catch (Exception e) { | |
log.error("Error putting record into Kinesis: {}", e.getMessage()); | |
} | |
} | |
@Override | |
public void close() { | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I have recently used Python to produce video into KVS, using the Gstreamer plugin included in the C++ SDK, I highly suggest you try with the C++ SDK first, because the Java SDK is also built on top of the C++ SDK. If your camera works with V4L2 or GStreamer, then it should work.