Skip to content

Instantly share code, notes, and snippets.

@aromanenko-dev
Created November 27, 2019 11:09
Show Gist options
  • Save aromanenko-dev/b99ad6eb9d7b6de31b8e9607b37aaefb to your computer and use it in GitHub Desktop.
Save aromanenko-dev/b99ad6eb9d7b6de31b8e9607b37aaefb to your computer and use it in GitHub Desktop.
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/AmazonKinesisMock.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java 2019-10-26 20:23:24.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/AmazonKinesisMock.java 2019-10-27 12:35:16.000000000 -0700
@@ -15,83 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
import static java.lang.Integer.parseInt;
import static java.lang.Math.min;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.transform;
import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode;
-import com.amazonaws.AmazonWebServiceRequest;
-import com.amazonaws.ResponseMetadata;
-import com.amazonaws.http.HttpResponse;
-import com.amazonaws.http.SdkHttpMetadata;
-import com.amazonaws.regions.Region;
-import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
-import com.amazonaws.services.kinesis.AmazonKinesis;
-import com.amazonaws.services.kinesis.model.AddTagsToStreamRequest;
-import com.amazonaws.services.kinesis.model.AddTagsToStreamResult;
-import com.amazonaws.services.kinesis.model.CreateStreamRequest;
-import com.amazonaws.services.kinesis.model.CreateStreamResult;
-import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodRequest;
-import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodResult;
-import com.amazonaws.services.kinesis.model.DeleteStreamRequest;
-import com.amazonaws.services.kinesis.model.DeleteStreamResult;
-import com.amazonaws.services.kinesis.model.DeregisterStreamConsumerRequest;
-import com.amazonaws.services.kinesis.model.DeregisterStreamConsumerResult;
-import com.amazonaws.services.kinesis.model.DescribeLimitsRequest;
-import com.amazonaws.services.kinesis.model.DescribeLimitsResult;
-import com.amazonaws.services.kinesis.model.DescribeStreamConsumerRequest;
-import com.amazonaws.services.kinesis.model.DescribeStreamConsumerResult;
-import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
-import com.amazonaws.services.kinesis.model.DescribeStreamSummaryRequest;
-import com.amazonaws.services.kinesis.model.DescribeStreamSummaryResult;
-import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringRequest;
-import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringResult;
-import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringRequest;
-import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringResult;
-import com.amazonaws.services.kinesis.model.GetRecordsRequest;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
-import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
-import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodRequest;
-import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodResult;
-import com.amazonaws.services.kinesis.model.ListShardsRequest;
-import com.amazonaws.services.kinesis.model.ListShardsResult;
-import com.amazonaws.services.kinesis.model.ListStreamConsumersRequest;
-import com.amazonaws.services.kinesis.model.ListStreamConsumersResult;
-import com.amazonaws.services.kinesis.model.ListStreamsRequest;
-import com.amazonaws.services.kinesis.model.ListStreamsResult;
-import com.amazonaws.services.kinesis.model.ListTagsForStreamRequest;
-import com.amazonaws.services.kinesis.model.ListTagsForStreamResult;
-import com.amazonaws.services.kinesis.model.MergeShardsRequest;
-import com.amazonaws.services.kinesis.model.MergeShardsResult;
-import com.amazonaws.services.kinesis.model.PutRecordRequest;
-import com.amazonaws.services.kinesis.model.PutRecordResult;
-import com.amazonaws.services.kinesis.model.PutRecordsRequest;
-import com.amazonaws.services.kinesis.model.PutRecordsResult;
-import com.amazonaws.services.kinesis.model.Record;
-import com.amazonaws.services.kinesis.model.RegisterStreamConsumerRequest;
-import com.amazonaws.services.kinesis.model.RegisterStreamConsumerResult;
-import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamRequest;
-import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamResult;
-import com.amazonaws.services.kinesis.model.Shard;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import com.amazonaws.services.kinesis.model.SplitShardRequest;
-import com.amazonaws.services.kinesis.model.SplitShardResult;
-import com.amazonaws.services.kinesis.model.StartStreamEncryptionRequest;
-import com.amazonaws.services.kinesis.model.StartStreamEncryptionResult;
-import com.amazonaws.services.kinesis.model.StopStreamEncryptionRequest;
-import com.amazonaws.services.kinesis.model.StopStreamEncryptionResult;
-import com.amazonaws.services.kinesis.model.StreamDescription;
-import com.amazonaws.services.kinesis.model.UpdateShardCountRequest;
-import com.amazonaws.services.kinesis.model.UpdateShardCountResult;
-import com.amazonaws.services.kinesis.producer.IKinesisProducer;
-import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
-import com.amazonaws.services.kinesis.waiters.AmazonKinesisWaiters;
import java.io.Serializable;
-import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@@ -100,9 +31,69 @@
import org.apache.commons.lang.builder.EqualsBuilder;
import org.joda.time.Instant;
import org.mockito.Mockito;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.http.SdkHttpResponse;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.AddTagsToStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.AddTagsToStreamResponse;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse;
+import software.amazon.awssdk.services.kinesis.model.DecreaseStreamRetentionPeriodRequest;
+import software.amazon.awssdk.services.kinesis.model.DecreaseStreamRetentionPeriodResponse;
+import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DeleteStreamResponse;
+import software.amazon.awssdk.services.kinesis.model.DescribeLimitsRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeLimitsResponse;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
+import software.amazon.awssdk.services.kinesis.model.DisableEnhancedMonitoringRequest;
+import software.amazon.awssdk.services.kinesis.model.DisableEnhancedMonitoringResponse;
+import software.amazon.awssdk.services.kinesis.model.EnableEnhancedMonitoringRequest;
+import software.amazon.awssdk.services.kinesis.model.EnableEnhancedMonitoringResponse;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.IncreaseStreamRetentionPeriodRequest;
+import software.amazon.awssdk.services.kinesis.model.IncreaseStreamRetentionPeriodResponse;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
+import software.amazon.awssdk.services.kinesis.model.ListStreamConsumersRequest;
+import software.amazon.awssdk.services.kinesis.model.ListStreamConsumersResponse;
+import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
+import software.amazon.awssdk.services.kinesis.model.ListTagsForStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.ListTagsForStreamResponse;
+import software.amazon.awssdk.services.kinesis.model.MergeShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.MergeShardsResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest;
+import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse;
+import software.amazon.awssdk.services.kinesis.model.RemoveTagsFromStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.RemoveTagsFromStreamResponse;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.SplitShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SplitShardResponse;
+import software.amazon.awssdk.services.kinesis.model.StartStreamEncryptionRequest;
+import software.amazon.awssdk.services.kinesis.model.StartStreamEncryptionResponse;
+import software.amazon.awssdk.services.kinesis.model.StopStreamEncryptionRequest;
+import software.amazon.awssdk.services.kinesis.model.StopStreamEncryptionResponse;
+import software.amazon.awssdk.services.kinesis.model.UpdateShardCountRequest;
+import software.amazon.awssdk.services.kinesis.model.UpdateShardCountResponse;
-/** Mock implemenation of {@link AmazonKinesis} for testing. */
-class AmazonKinesisMock implements AmazonKinesis {
+/** Mock implementation of {@link KinesisClient} for testing. */
+class AmazonKinesisMock implements KinesisClient {
static class TestData implements Serializable {
@@ -124,11 +115,12 @@
}
public Record convertToRecord() {
- return new Record()
- .withApproximateArrivalTimestamp(arrivalTimestamp.toDate())
- .withData(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)))
- .withSequenceNumber(sequenceNumber)
- .withPartitionKey("");
+ return Record.builder()
+ .approximateArrivalTimestamp(TimeUtil.toJava(arrivalTimestamp))
+ .data(SdkBytes.fromByteArray(data.getBytes(StandardCharsets.UTF_8)))
+ .sequenceNumber(sequenceNumber)
+ .partitionKey("")
+ .build();
}
@Override
@@ -167,7 +159,7 @@
}
@Override
- public AmazonKinesis getKinesisClient() {
+ public KinesisClient getKinesisClient() {
return new AmazonKinesisMock(
shardedData.stream()
.map(testDatas -> transform(testDatas, TestData::convertToRecord))
@@ -176,13 +168,13 @@
}
@Override
- public AmazonCloudWatch getCloudWatchClient() {
- return Mockito.mock(AmazonCloudWatch.class);
+ public KinesisAsyncClient getKinesisAsyncClient() {
+ return Mockito.mock(KinesisAsyncClient.class);
}
@Override
- public IKinesisProducer createKinesisProducer(KinesisProducerConfiguration config) {
- throw new RuntimeException("Not implemented");
+ public CloudWatchClient getCloudWatchClient() {
+ return Mockito.mock(CloudWatchClient.class);
}
}
@@ -195,286 +187,199 @@
}
@Override
- public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
+ public String serviceName() {
+ return null;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public GetRecordsResponse getRecords(GetRecordsRequest getRecordsRequest) {
List<String> shardIteratorParts =
- Splitter.on(':').splitToList(getRecordsRequest.getShardIterator());
+ Splitter.on(':').splitToList(getRecordsRequest.shardIterator());
int shardId = parseInt(shardIteratorParts.get(0));
int startingRecord = parseInt(shardIteratorParts.get(1));
List<Record> shardData = shardedData.get(shardId);
int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
int fromIndex = min(startingRecord, toIndex);
- return new GetRecordsResult()
- .withRecords(shardData.subList(fromIndex, toIndex))
- .withNextShardIterator(String.format("%s:%s", shardId, toIndex))
- .withMillisBehindLatest(0L);
+ return GetRecordsResponse.builder()
+ .records(shardData.subList(fromIndex, toIndex))
+ .nextShardIterator(String.format("%s:%s", shardId, toIndex))
+ .millisBehindLatest(0L)
+ .build();
}
@Override
- public GetShardIteratorResult getShardIterator(GetShardIteratorRequest getShardIteratorRequest) {
- ShardIteratorType shardIteratorType =
- ShardIteratorType.fromValue(getShardIteratorRequest.getShardIteratorType());
+ public GetShardIteratorResponse getShardIterator(
+ GetShardIteratorRequest getShardIteratorRequest) {
+ ShardIteratorType shardIteratorType = getShardIteratorRequest.shardIteratorType();
String shardIterator;
if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
- shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
+ shardIterator = String.format("%s:%s", getShardIteratorRequest.shardId(), 0);
} else {
throw new RuntimeException("Not implemented");
}
- return new GetShardIteratorResult().withShardIterator(shardIterator);
+ return GetShardIteratorResponse.builder().shardIterator(shardIterator).build();
}
@Override
- public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
+ public DescribeStreamResponse describeStream(DescribeStreamRequest describeStreamRequest) {
int nextShardId = 0;
- if (exclusiveStartShardId != null) {
- nextShardId = parseInt(exclusiveStartShardId) + 1;
+ if (describeStreamRequest.exclusiveStartShardId() != null) {
+ nextShardId = parseInt(describeStreamRequest.exclusiveStartShardId()) + 1;
}
boolean hasMoreShards = nextShardId + 1 < shardedData.size();
List<Shard> shards = new ArrayList<>();
if (nextShardId < shardedData.size()) {
- shards.add(new Shard().withShardId(Integer.toString(nextShardId)));
+ shards.add(Shard.builder().shardId(Integer.toString(nextShardId)).build());
}
- HttpResponse response = new HttpResponse(null, null);
- response.setStatusCode(200);
- DescribeStreamResult result = new DescribeStreamResult();
- result.setSdkHttpMetadata(SdkHttpMetadata.from(response));
- result.withStreamDescription(
- new StreamDescription()
- .withHasMoreShards(hasMoreShards)
- .withShards(shards)
- .withStreamName(streamName));
- return result;
+ DescribeStreamResponse.Builder builder =
+ DescribeStreamResponse.builder()
+ .streamDescription(
+ s ->
+ s.hasMoreShards(hasMoreShards)
+ .shards(shards)
+ .streamName(describeStreamRequest.streamName()));
+ builder.sdkHttpResponse(SdkHttpResponse.builder().statusCode(200).build());
+ return builder.build();
}
@Override
- public void setEndpoint(String endpoint) {}
-
- @Override
- public void setRegion(Region region) {}
-
- @Override
- public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) {
+ public AddTagsToStreamResponse addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) {
+ public CreateStreamResponse createStream(CreateStreamRequest createStreamRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public CreateStreamResult createStream(String streamName, Integer shardCount) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod(
+ public DecreaseStreamRetentionPeriodResponse decreaseStreamRetentionPeriod(
DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DeleteStreamResult deleteStream(String streamName) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DeregisterStreamConsumerResult deregisterStreamConsumer(
- DeregisterStreamConsumerRequest deregisterStreamConsumerRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DescribeLimitsResult describeLimits(DescribeLimitsRequest describeLimitsRequest) {
+ public DeleteStreamResponse deleteStream(DeleteStreamRequest deleteStreamRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) {
+ public DescribeLimitsResponse describeLimits(DescribeLimitsRequest describeLimitsRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public DescribeStreamResult describeStream(String streamName) {
- return describeStream(streamName, null);
- }
-
- @Override
- public DescribeStreamResult describeStream(
- String streamName, Integer limit, String exclusiveStartShardId) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DescribeStreamConsumerResult describeStreamConsumer(
+ public DescribeStreamConsumerResponse describeStreamConsumer(
DescribeStreamConsumerRequest describeStreamConsumerRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public DescribeStreamSummaryResult describeStreamSummary(
+ public DescribeStreamSummaryResponse describeStreamSummary(
DescribeStreamSummaryRequest describeStreamSummaryRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public DisableEnhancedMonitoringResult disableEnhancedMonitoring(
+ public DisableEnhancedMonitoringResponse disableEnhancedMonitoring(
DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public EnableEnhancedMonitoringResult enableEnhancedMonitoring(
+ public EnableEnhancedMonitoringResponse enableEnhancedMonitoring(
EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public GetShardIteratorResult getShardIterator(
- String streamName, String shardId, String shardIteratorType) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public GetShardIteratorResult getShardIterator(
- String streamName, String shardId, String shardIteratorType, String startingSequenceNumber) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod(
+ public IncreaseStreamRetentionPeriodResponse increaseStreamRetentionPeriod(
IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public ListShardsResult listShards(ListShardsRequest listShardsRequest) {
+ public ListShardsResponse listShards(ListShardsRequest listShardsRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public ListStreamConsumersResult listStreamConsumers(
+ public ListStreamConsumersResponse listStreamConsumers(
ListStreamConsumersRequest listStreamConsumersRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public ListStreamsResult listStreams() {
+ public ListStreamsResponse listStreams(ListStreamsRequest listStreamsRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public ListStreamsResult listStreams(String exclusiveStartStreamName) {
+ public ListStreamsResponse listStreams() {
throw new RuntimeException("Not implemented");
}
@Override
- public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public ListTagsForStreamResult listTagsForStream(
+ public ListTagsForStreamResponse listTagsForStream(
ListTagsForStreamRequest listTagsForStreamRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public MergeShardsResult mergeShards(
- String streamName, String shardToMerge, String adjacentShardToMerge) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
+ public MergeShardsResponse mergeShards(MergeShardsRequest mergeShardsRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) {
+ public PutRecordResponse putRecord(PutRecordRequest putRecordRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public PutRecordResult putRecord(
- String streamName, ByteBuffer data, String partitionKey, String sequenceNumberForOrdering) {
+ public PutRecordsResponse putRecords(PutRecordsRequest putRecordsRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public RegisterStreamConsumerResult registerStreamConsumer(
+ public RegisterStreamConsumerResponse registerStreamConsumer(
RegisterStreamConsumerRequest registerStreamConsumerRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public RemoveTagsFromStreamResult removeTagsFromStream(
+ public RemoveTagsFromStreamResponse removeTagsFromStream(
RemoveTagsFromStreamRequest removeTagsFromStreamRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public SplitShardResult splitShard(SplitShardRequest splitShardRequest) {
+ public SplitShardResponse splitShard(SplitShardRequest splitShardRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public SplitShardResult splitShard(
- String streamName, String shardToSplit, String newStartingHashKey) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public StartStreamEncryptionResult startStreamEncryption(
+ public StartStreamEncryptionResponse startStreamEncryption(
StartStreamEncryptionRequest startStreamEncryptionRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public StopStreamEncryptionResult stopStreamEncryption(
+ public StopStreamEncryptionResponse stopStreamEncryption(
StopStreamEncryptionRequest stopStreamEncryptionRequest) {
throw new RuntimeException("Not implemented");
}
@Override
- public UpdateShardCountResult updateShardCount(UpdateShardCountRequest updateShardCountRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public void shutdown() {}
-
- @Override
- public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public AmazonKinesisWaiters waiters() {
+ public UpdateShardCountResponse updateShardCount(
+ UpdateShardCountRequest updateShardCountRequest) {
throw new RuntimeException("Not implemented");
}
}
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/CustomOptionalTest.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java 2019-10-09 22:46:48.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/CustomOptionalTest.java 2019-10-25 12:08:13.000000000 -0700
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
import com.google.common.testing.EqualsTester;
import java.util.NoSuchElementException;
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/DynamicCheckpointGeneratorTest.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java 2019-10-24 21:25:10.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/DynamicCheckpointGeneratorTest.java 2019-10-27 12:35:16.000000000 -0700
@@ -15,22 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
-import com.amazonaws.services.kinesis.model.Shard;
import java.util.Set;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.kinesis.common.InitialPositionInStream;
/** * */
-@RunWith(MockitoJUnitRunner.class)
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Shard.class)
public class DynamicCheckpointGeneratorTest {
@Mock private SimplifiedKinesisClient kinesisClient;
@@ -39,16 +41,16 @@
@Test
public void shouldMapAllShardsToCheckpoints() throws Exception {
- when(shard1.getShardId()).thenReturn("shard-01");
- when(shard2.getShardId()).thenReturn("shard-02");
- when(shard3.getShardId()).thenReturn("shard-03");
+ when(shard1.shardId()).thenReturn("shard-01");
+ when(shard2.shardId()).thenReturn("shard-02");
+ when(shard3.shardId()).thenReturn("shard-03");
Set<Shard> shards = Sets.newHashSet(shard1, shard2, shard3);
StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
when(startingPointShardsFinder.findShardsAtStartingPoint(
kinesisClient, "stream", startingPoint))
.thenReturn(shards);
DynamicCheckpointGenerator underTest =
- new DynamicCheckpointGenerator("stream", startingPoint, startingPointShardsFinder);
+ new DynamicCheckpointGenerator("stream", null, startingPoint, startingPointShardsFinder);
KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);
@@ -57,9 +59,9 @@
@Test
public void shouldMapAllValidShardsToCheckpoints() throws Exception {
- when(shard1.getShardId()).thenReturn("shard-01");
- when(shard2.getShardId()).thenReturn("shard-02");
- when(shard3.getShardId()).thenReturn("shard-03");
+ when(shard1.shardId()).thenReturn("shard-01");
+ when(shard2.shardId()).thenReturn("shard-02");
+ when(shard3.shardId()).thenReturn("shard-03");
String streamName = "stream";
Set<Shard> shards = Sets.newHashSet(shard1, shard2);
StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
@@ -68,11 +70,11 @@
.thenReturn(shards);
DynamicCheckpointGenerator underTest =
- new DynamicCheckpointGenerator(streamName, startingPoint, startingPointShardsFinder);
+ new DynamicCheckpointGenerator(streamName, null, startingPoint, startingPointShardsFinder);
KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);
assertThat(checkpoint)
.hasSize(2)
- .doesNotContain(new ShardCheckpoint(streamName, shard3.getShardId(), startingPoint));
+ .doesNotContain(new ShardCheckpoint(streamName, shard3.shardId(), null, startingPoint));
}
}
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisIOIT.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java 2019-10-09 22:46:48.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisIOIT.java 2019-10-27 12:35:16.000000000 -0700
@@ -15,16 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.io.kinesis.KinesisPartitioner;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -40,6 +40,8 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.kinesis.common.InitialPositionInStream;
/**
* Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
@@ -79,7 +81,7 @@
.apply("Prepare Kinesis input records", ParDo.of(new ConvertToBytes()))
.apply(
"Write to Kinesis",
- KinesisIO.write()
+ org.apache.beam.sdk.io.kinesis.KinesisIO.write()
.withStreamName(options.getAwsKinesisStream())
.withPartitioner(new RandomPartitioner())
.withAWSClientsProvider(
@@ -99,7 +101,7 @@
.withAWSClientsProvider(
options.getAwsAccessKey(),
options.getAwsSecretKey(),
- Regions.fromName(options.getAwsKinesisRegion()))
+ Region.of(options.getAwsKinesisRegion()))
.withMaxNumRecords(numberOfRows)
// to prevent endless running in case of error
.withMaxReadTime(Duration.standardMinutes(10))
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisMockReadTest.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java 2019-10-09 22:46:48.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisMockReadTest.java 2019-10-27 12:35:16.000000000 -0700
@@ -15,11 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList;
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import java.util.List;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -32,6 +31,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import software.amazon.kinesis.common.InitialPositionInStream;
/** Tests {@link AmazonKinesisMock}. */
@RunWith(JUnit4.class)
Only in sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/: KinesisMockWriteTest.java
Only in sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/: KinesisProducerMock.java
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisReaderCheckpointTest.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java 2019-10-09 22:46:48.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisReaderCheckpointTest.java 2019-10-26 15:27:02.000000000 -0700
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisReaderTest.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java 2019-10-22 13:43:07.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisReaderTest.java 2019-10-26 15:28:03.000000000 -0700
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisRecordCoderTest.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java 2019-10-09 22:46:48.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisRecordCoderTest.java 2019-10-27 12:35:16.000000000 -0700
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisServiceMock.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisServiceMock.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisServiceMock.java 2019-10-09 22:46:48.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisServiceMock.java 2019-10-25 12:08:13.000000000 -0700
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList;
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisTestOptions.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java 2019-10-09 22:46:48.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisTestOptions.java 2019-10-25 12:08:13.000000000 -0700
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/RecordFilterTest.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java 2019-10-09 22:46:48.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/RecordFilterTest.java 2019-10-26 15:29:02.000000000 -0700
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
import static org.mockito.Mockito.when;
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardCheckpointTest.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java 2019-10-09 22:46:48.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardCheckpointTest.java 2019-10-27 12:35:17.000000000 -0700
@@ -15,22 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
-import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.LATEST;
-import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.TRIM_HORIZON;
-import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
-import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
-import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import static software.amazon.kinesis.common.InitialPositionInStream.LATEST;
+import static software.amazon.kinesis.common.InitialPositionInStream.TRIM_HORIZON;
-import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
import java.io.IOException;
import org.joda.time.DateTime;
import org.joda.time.Instant;
@@ -39,6 +37,8 @@
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/** */
@RunWith(MockitoJUnitRunner.class)
@@ -83,12 +83,12 @@
@Test
public void testComparisonWithExtendedSequenceNumber() {
assertThat(
- new ShardCheckpoint("", "", new StartingPoint(LATEST))
+ new ShardCheckpoint("", "", null, new StartingPoint(LATEST))
.isBeforeOrAt(recordWith(new ExtendedSequenceNumber("100", 0L))))
.isTrue();
assertThat(
- new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON))
+ new ShardCheckpoint("", "", null, new StartingPoint(TRIM_HORIZON))
.isBeforeOrAt(recordWith(new ExtendedSequenceNumber("100", 0L))))
.isTrue();
@@ -147,7 +147,7 @@
private ShardCheckpoint checkpoint(
ShardIteratorType iteratorType, String sequenceNumber, Long subSequenceNumber) {
return new ShardCheckpoint(
- STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber, subSequenceNumber);
+ STREAM_NAME, SHARD_ID, null, iteratorType, sequenceNumber, subSequenceNumber);
}
private KinesisRecord recordWith(Instant approximateArrivalTimestamp) {
@@ -157,6 +157,6 @@
}
private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) {
- return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp);
+ return new ShardCheckpoint(STREAM_NAME, SHARD_ID, null, iteratorType, timestamp);
}
}
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardReadersPoolTest.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java 2019-10-22 13:43:07.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardReadersPoolTest.java 2019-10-27 12:35:17.000000000 -0700
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardRecordsIteratorTest.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java 2019-10-09 22:46:48.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardRecordsIteratorTest.java 2019-10-27 12:35:16.000000000 -0700
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
@@ -24,7 +24,6 @@
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Mockito.when;
-import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import java.io.IOException;
import java.util.Collections;
import org.joda.time.Duration;
@@ -36,6 +35,7 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
+import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
/** Tests {@link ShardRecordsIterator}. */
@RunWith(MockitoJUnitRunner.Silent.class)
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/SimplifiedKinesisClientTest.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java 2019-10-09 22:46:48.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/SimplifiedKinesisClientTest.java 2019-10-27 12:35:17.000000000 -0700
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
@@ -25,25 +25,6 @@
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.AmazonServiceException.ErrorType;
-import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
-import com.amazonaws.services.cloudwatch.model.Datapoint;
-import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsRequest;
-import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsResult;
-import com.amazonaws.services.kinesis.AmazonKinesis;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
-import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-import com.amazonaws.services.kinesis.model.GetRecordsRequest;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
-import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
-import com.amazonaws.services.kinesis.model.LimitExceededException;
-import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
-import com.amazonaws.services.kinesis.model.Record;
-import com.amazonaws.services.kinesis.model.Shard;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import com.amazonaws.services.kinesis.model.StreamDescription;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -56,6 +37,27 @@
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.core.exception.SdkServiceException;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
+import software.amazon.awssdk.services.cloudwatch.model.Datapoint;
+import software.amazon.awssdk.services.cloudwatch.model.GetMetricStatisticsRequest;
+import software.amazon.awssdk.services.cloudwatch.model.GetMetricStatisticsResponse;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
+import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
/** * */
@RunWith(MockitoJUnitRunner.class)
@@ -68,19 +70,21 @@
private static final String SHARD_ITERATOR = "iterator";
private static final String SEQUENCE_NUMBER = "abc123";
- @Mock private AmazonKinesis kinesis;
- @Mock private AmazonCloudWatch cloudWatch;
+ @Mock private KinesisClient kinesis;
+ @Mock private KinesisAsyncClient kinesisAsync;
+ @Mock private CloudWatchClient cloudWatch;
@InjectMocks private SimplifiedKinesisClient underTest;
@Test
public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
when(kinesis.getShardIterator(
- new GetShardIteratorRequest()
- .withStreamName(STREAM)
- .withShardId(SHARD_1)
- .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
- .withStartingSequenceNumber(SEQUENCE_NUMBER)))
- .thenReturn(new GetShardIteratorResult().withShardIterator(SHARD_ITERATOR));
+ GetShardIteratorRequest.builder()
+ .streamName(STREAM)
+ .shardId(SHARD_1)
+ .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+ .startingSequenceNumber(SEQUENCE_NUMBER)
+ .build()))
+ .thenReturn(GetShardIteratorResponse.builder().shardIterator(SHARD_ITERATOR).build());
String stream =
underTest.getShardIterator(
@@ -93,12 +97,13 @@
public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
Instant timestamp = Instant.now();
when(kinesis.getShardIterator(
- new GetShardIteratorRequest()
- .withStreamName(STREAM)
- .withShardId(SHARD_1)
- .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
- .withTimestamp(timestamp.toDate())))
- .thenReturn(new GetShardIteratorResult().withShardIterator(SHARD_ITERATOR));
+ GetShardIteratorRequest.builder()
+ .streamName(STREAM)
+ .shardId(SHARD_1)
+ .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+ .timestamp(TimeUtil.toJava(timestamp))
+ .build()))
+ .thenReturn(GetShardIteratorResponse.builder().shardIterator(SHARD_ITERATOR).build());
String stream =
underTest.getShardIterator(
@@ -110,31 +115,30 @@
@Test
public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
shouldHandleGetShardIteratorError(
- new ExpiredIteratorException(""), ExpiredIteratorException.class);
+ ExpiredIteratorException.builder().build(), ExpiredIteratorException.class);
}
@Test
public void shouldHandleLimitExceededExceptionForGetShardIterator() {
shouldHandleGetShardIteratorError(
- new LimitExceededException(""), TransientKinesisException.class);
+ LimitExceededException.builder().build(), TransientKinesisException.class);
}
@Test
public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
shouldHandleGetShardIteratorError(
- new ProvisionedThroughputExceededException(""), TransientKinesisException.class);
+ ProvisionedThroughputExceededException.builder().build(), TransientKinesisException.class);
}
@Test
public void shouldHandleServiceErrorForGetShardIterator() {
shouldHandleGetShardIteratorError(
- newAmazonServiceException(ErrorType.Service), TransientKinesisException.class);
+ SdkServiceException.builder().build(), TransientKinesisException.class);
}
@Test
public void shouldHandleClientErrorForGetShardIterator() {
- shouldHandleGetShardIteratorError(
- newAmazonServiceException(ErrorType.Client), RuntimeException.class);
+ shouldHandleGetShardIteratorError(SdkClientException.builder().build(), RuntimeException.class);
}
@Test
@@ -145,10 +149,11 @@
private void shouldHandleGetShardIteratorError(
Exception thrownException, Class<? extends Exception> expectedExceptionClass) {
GetShardIteratorRequest request =
- new GetShardIteratorRequest()
- .withStreamName(STREAM)
- .withShardId(SHARD_1)
- .withShardIteratorType(ShardIteratorType.LATEST);
+ GetShardIteratorRequest.builder()
+ .streamName(STREAM)
+ .shardId(SHARD_1)
+ .shardIteratorType(ShardIteratorType.LATEST)
+ .build();
when(kinesis.getShardIterator(request)).thenThrow(thrownException);
@@ -164,19 +169,24 @@
@Test
public void shouldListAllShards() throws Exception {
- Shard shard1 = new Shard().withShardId(SHARD_1);
- Shard shard2 = new Shard().withShardId(SHARD_2);
- Shard shard3 = new Shard().withShardId(SHARD_3);
- when(kinesis.describeStream(STREAM, null))
+ Shard shard1 = Shard.builder().shardId(SHARD_1).build();
+ Shard shard2 = Shard.builder().shardId(SHARD_2).build();
+ Shard shard3 = Shard.builder().shardId(SHARD_3).build();
+ when(kinesis.describeStream(
+ DescribeStreamRequest.builder().streamName(STREAM).exclusiveStartShardId(null).build()))
.thenReturn(
- new DescribeStreamResult()
- .withStreamDescription(
- new StreamDescription().withShards(shard1, shard2).withHasMoreShards(true)));
- when(kinesis.describeStream(STREAM, SHARD_2))
+ DescribeStreamResponse.builder()
+ .streamDescription(s -> s.shards(shard1, shard2).hasMoreShards(true))
+ .build());
+ when(kinesis.describeStream(
+ DescribeStreamRequest.builder()
+ .streamName(STREAM)
+ .exclusiveStartShardId(SHARD_2)
+ .build()))
.thenReturn(
- new DescribeStreamResult()
- .withStreamDescription(
- new StreamDescription().withShards(shard3).withHasMoreShards(false)));
+ DescribeStreamResponse.builder()
+ .streamDescription(s -> s.shards(shard3).hasMoreShards(false))
+ .build());
List<Shard> shards = underTest.listShards(STREAM);
@@ -185,30 +195,31 @@
@Test
public void shouldHandleExpiredIterationExceptionForShardListing() {
- shouldHandleShardListingError(new ExpiredIteratorException(""), ExpiredIteratorException.class);
+ shouldHandleShardListingError(
+ ExpiredIteratorException.builder().build(), ExpiredIteratorException.class);
}
@Test
public void shouldHandleLimitExceededExceptionForShardListing() {
- shouldHandleShardListingError(new LimitExceededException(""), TransientKinesisException.class);
+ shouldHandleShardListingError(
+ LimitExceededException.builder().build(), TransientKinesisException.class);
}
@Test
public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
shouldHandleShardListingError(
- new ProvisionedThroughputExceededException(""), TransientKinesisException.class);
+ ProvisionedThroughputExceededException.builder().build(), TransientKinesisException.class);
}
@Test
public void shouldHandleServiceErrorForShardListing() {
shouldHandleShardListingError(
- newAmazonServiceException(ErrorType.Service), TransientKinesisException.class);
+ SdkServiceException.builder().build(), TransientKinesisException.class);
}
@Test
public void shouldHandleClientErrorForShardListing() {
- shouldHandleShardListingError(
- newAmazonServiceException(ErrorType.Client), RuntimeException.class);
+ shouldHandleShardListingError(SdkClientException.builder().build(), RuntimeException.class);
}
@Test
@@ -218,7 +229,7 @@
private void shouldHandleShardListingError(
Exception thrownException, Class<? extends Exception> expectedExceptionClass) {
- when(kinesis.describeStream(STREAM, null)).thenThrow(thrownException);
+ when(kinesis.describeStream(any(DescribeStreamRequest.class))).thenThrow(thrownException);
try {
underTest.listShards(STREAM);
failBecauseExceptionWasNotThrown(expectedExceptionClass);
@@ -236,8 +247,10 @@
Minutes periodTime = Minutes.minutesBetween(countSince, countTo);
GetMetricStatisticsRequest metricStatisticsRequest =
underTest.createMetricStatisticsRequest(STREAM, countSince, countTo, periodTime);
- GetMetricStatisticsResult result =
- new GetMetricStatisticsResult().withDatapoints(new Datapoint().withSum(1.0));
+ GetMetricStatisticsResponse result =
+ GetMetricStatisticsResponse.builder()
+ .datapoints(Datapoint.builder().sum(1.0).build())
+ .build();
when(cloudWatch.getMetricStatistics(metricStatisticsRequest)).thenReturn(result);
@@ -253,12 +266,13 @@
Minutes periodTime = Minutes.minutesBetween(countSince, countTo);
GetMetricStatisticsRequest metricStatisticsRequest =
underTest.createMetricStatisticsRequest(STREAM, countSince, countTo, periodTime);
- GetMetricStatisticsResult result =
- new GetMetricStatisticsResult()
- .withDatapoints(
- new Datapoint().withSum(1.0),
- new Datapoint().withSum(3.0),
- new Datapoint().withSum(2.0));
+ GetMetricStatisticsResponse result =
+ GetMetricStatisticsResponse.builder()
+ .datapoints(
+ Datapoint.builder().sum(1.0).build(),
+ Datapoint.builder().sum(3.0).build(),
+ Datapoint.builder().sum(2.0).build())
+ .build();
when(cloudWatch.getMetricStatistics(metricStatisticsRequest)).thenReturn(result);
@@ -281,25 +295,24 @@
@Test
public void shouldHandleLimitExceededExceptionForGetBacklogBytes() {
shouldHandleGetBacklogBytesError(
- new LimitExceededException(""), TransientKinesisException.class);
+ LimitExceededException.builder().build(), TransientKinesisException.class);
}
@Test
public void shouldHandleProvisionedThroughputExceededExceptionForGetBacklogBytes() {
shouldHandleGetBacklogBytesError(
- new ProvisionedThroughputExceededException(""), TransientKinesisException.class);
+ ProvisionedThroughputExceededException.builder().build(), TransientKinesisException.class);
}
@Test
public void shouldHandleServiceErrorForGetBacklogBytes() {
shouldHandleGetBacklogBytesError(
- newAmazonServiceException(ErrorType.Service), TransientKinesisException.class);
+ SdkServiceException.builder().build(), TransientKinesisException.class);
}
@Test
public void shouldHandleClientErrorForGetBacklogBytes() {
- shouldHandleGetBacklogBytesError(
- newAmazonServiceException(ErrorType.Client), RuntimeException.class);
+ shouldHandleGetBacklogBytesError(SdkClientException.builder().build(), RuntimeException.class);
}
@Test
@@ -326,22 +339,19 @@
}
}
- private AmazonServiceException newAmazonServiceException(ErrorType errorType) {
- AmazonServiceException exception = new AmazonServiceException("");
- exception.setErrorType(errorType);
- return exception;
- }
-
@Test
public void shouldReturnLimitedNumberOfRecords() throws Exception {
final Integer limit = 100;
doAnswer(
- (Answer<GetRecordsResult>)
+ (Answer<GetRecordsResponse>)
invocation -> {
GetRecordsRequest request = (GetRecordsRequest) invocation.getArguments()[0];
- List<Record> records = generateRecords(request.getLimit());
- return new GetRecordsResult().withRecords(records).withMillisBehindLatest(1000L);
+ List<Record> records = generateRecords(request.limit());
+ return GetRecordsResponse.builder()
+ .records(records)
+ .millisBehindLatest(1000L)
+ .build();
})
.when(kinesis)
.getRecords(any(GetRecordsRequest.class));
@@ -356,10 +366,11 @@
byte[] value = new byte[1024];
Arrays.fill(value, (byte) i);
records.add(
- new Record()
- .withSequenceNumber(String.valueOf(i))
- .withPartitionKey("key")
- .withData(ByteBuffer.wrap(value)));
+ Record.builder()
+ .sequenceNumber(String.valueOf(i))
+ .partitionKey("key")
+ .data(SdkBytes.fromByteBuffer(ByteBuffer.wrap(value)))
+ .build());
}
return records;
}
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/StartingPointShardsFinderTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/StartingPointShardsFinderTest.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/StartingPointShardsFinderTest.java 2019-10-09 22:46:48.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/StartingPointShardsFinderTest.java 2019-10-27 12:34:56.000000000 -0700
@@ -15,16 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
-import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
-import com.amazonaws.services.kinesis.model.Shard;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
import java.util.Collections;
import java.util.List;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -32,6 +28,10 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
/** Tests StartingPointShardsFinder. */
@RunWith(JUnit4.class)
@@ -61,18 +61,30 @@
*/
private final Shard shard00 = createClosedShard("0000");
private final Shard shard01 = createClosedShard("0001");
- private final Shard shard02 = createClosedShard("0002").withParentShardId("0000");
- private final Shard shard03 = createClosedShard("0003").withParentShardId("0000");
- private final Shard shard04 = createClosedShard("0004").withParentShardId("0001");
- private final Shard shard05 = createClosedShard("0005").withParentShardId("0001");
+ private final Shard shard02 = createClosedShard("0002").toBuilder().parentShardId("0000").build();
+ private final Shard shard03 = createClosedShard("0003").toBuilder().parentShardId("0000").build();
+ private final Shard shard04 = createClosedShard("0004").toBuilder().parentShardId("0001").build();
+ private final Shard shard05 = createClosedShard("0005").toBuilder().parentShardId("0001").build();
private final Shard shard06 =
- createClosedShard("0006").withParentShardId("0003").withAdjacentParentShardId("0004");
- private final Shard shard07 = createClosedShard("0007").withParentShardId("0006");
- private final Shard shard08 = createClosedShard("0008").withParentShardId("0006");
+ createClosedShard("0006")
+ .toBuilder()
+ .parentShardId("0003")
+ .adjacentParentShardId("0004")
+ .build();
+ private final Shard shard07 = createClosedShard("0007").toBuilder().parentShardId("0006").build();
+ private final Shard shard08 = createClosedShard("0008").toBuilder().parentShardId("0006").build();
private final Shard shard09 =
- createOpenShard("0009").withParentShardId("0002").withAdjacentParentShardId("0007");
+ createOpenShard("0009")
+ .toBuilder()
+ .parentShardId("0002")
+ .adjacentParentShardId("0007")
+ .build();
private final Shard shard10 =
- createOpenShard("0010").withParentShardId("0008").withAdjacentParentShardId("0005");
+ createOpenShard("0010")
+ .toBuilder()
+ .parentShardId("0008")
+ .adjacentParentShardId("0005")
+ .build();
private final List<Shard> allShards =
ImmutableList.of(
@@ -191,7 +203,11 @@
// given
StartingPoint latestStartingPoint = new StartingPoint(InitialPositionInStream.LATEST);
Shard closedShard10 =
- createClosedShard("0010").withParentShardId("0008").withAdjacentParentShardId("0005");
+ createClosedShard("0010")
+ .toBuilder()
+ .parentShardId("0008")
+ .adjacentParentShardId("0005")
+ .build();
List<Shard> shards =
ImmutableList.of(
shard00,
@@ -213,14 +229,14 @@
}
private Shard createClosedShard(String shardId) {
- Shard shard = new Shard().withShardId(shardId);
+ Shard shard = Shard.builder().shardId(shardId).build();
activeAtPoint(shard, ShardIteratorType.TRIM_HORIZON);
expiredAtPoint(shard, ShardIteratorType.LATEST);
return shard;
}
private Shard createOpenShard(String shardId) {
- Shard shard = new Shard().withShardId(shardId);
+ Shard shard = Shard.builder().shardId(shardId).build();
activeAtPoint(shard, ShardIteratorType.TRIM_HORIZON);
activeAtPoint(shard, ShardIteratorType.LATEST);
return shard;
@@ -237,13 +253,13 @@
private void activeAtTimestamp(Shard shard, Instant startTimestamp) {
prepareShard(
shard,
- "timestampIterator-" + shard.getShardId(),
+ "timestampIterator-" + shard.shardId(),
ShardIteratorType.AT_TIMESTAMP,
startTimestamp);
}
private void activeAtPoint(Shard shard, ShardIteratorType shardIteratorType) {
- prepareShard(shard, shardIteratorType.toString() + shard.getShardId(), shardIteratorType, null);
+ prepareShard(shard, shardIteratorType.toString() + shard.shardId(), shardIteratorType, null);
}
private void prepareShard(
@@ -252,28 +268,23 @@
ShardIteratorType shardIteratorType,
Instant startTimestamp) {
try {
- String shardIterator = shardIteratorType + shard.getShardId() + "-current";
+ String shardIterator = shardIteratorType + shard.shardId() + "-current";
if (shardIteratorType == ShardIteratorType.AT_TIMESTAMP) {
when(kinesis.getShardIterator(
- STREAM_NAME,
- shard.getShardId(),
- ShardIteratorType.AT_TIMESTAMP,
- null,
- startTimestamp))
+ STREAM_NAME, shard.shardId(), ShardIteratorType.AT_TIMESTAMP, null, startTimestamp))
.thenReturn(shardIterator);
} else {
- when(kinesis.getShardIterator(
- STREAM_NAME, shard.getShardId(), shardIteratorType, null, null))
+ when(kinesis.getShardIterator(STREAM_NAME, shard.shardId(), shardIteratorType, null, null))
.thenReturn(shardIterator);
}
GetKinesisRecordsResult result =
new GetKinesisRecordsResult(
- Collections.<UserRecord>emptyList(),
+ Collections.<KinesisClientRecord>emptyList(),
nextIterator,
0,
STREAM_NAME,
- shard.getShardId());
- when(kinesis.getRecords(shardIterator, STREAM_NAME, shard.getShardId())).thenReturn(result);
+ shard.shardId());
+ when(kinesis.getRecords(shardIterator, STREAM_NAME, shard.shardId())).thenReturn(result);
} catch (TransientKinesisException e) {
throw new RuntimeException(e);
}
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/WatermarkPolicyTest.java
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java 2019-10-09 22:46:48.000000000 -0700
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/WatermarkPolicyTest.java 2019-10-25 12:08:13.000000000 -0700
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.kinesis;
+package org.apache.beam.sdk.io.kinesis2;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment