Created
November 27, 2019 11:09
-
-
Save aromanenko-dev/b99ad6eb9d7b6de31b8e9607b37aaefb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/AmazonKinesisMock.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java 2019-10-26 20:23:24.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/AmazonKinesisMock.java 2019-10-27 12:35:16.000000000 -0700 | |
@@ -15,83 +15,14 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
import static java.lang.Integer.parseInt; | |
import static java.lang.Math.min; | |
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.transform; | |
import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode; | |
-import com.amazonaws.AmazonWebServiceRequest; | |
-import com.amazonaws.ResponseMetadata; | |
-import com.amazonaws.http.HttpResponse; | |
-import com.amazonaws.http.SdkHttpMetadata; | |
-import com.amazonaws.regions.Region; | |
-import com.amazonaws.services.cloudwatch.AmazonCloudWatch; | |
-import com.amazonaws.services.kinesis.AmazonKinesis; | |
-import com.amazonaws.services.kinesis.model.AddTagsToStreamRequest; | |
-import com.amazonaws.services.kinesis.model.AddTagsToStreamResult; | |
-import com.amazonaws.services.kinesis.model.CreateStreamRequest; | |
-import com.amazonaws.services.kinesis.model.CreateStreamResult; | |
-import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodRequest; | |
-import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodResult; | |
-import com.amazonaws.services.kinesis.model.DeleteStreamRequest; | |
-import com.amazonaws.services.kinesis.model.DeleteStreamResult; | |
-import com.amazonaws.services.kinesis.model.DeregisterStreamConsumerRequest; | |
-import com.amazonaws.services.kinesis.model.DeregisterStreamConsumerResult; | |
-import com.amazonaws.services.kinesis.model.DescribeLimitsRequest; | |
-import com.amazonaws.services.kinesis.model.DescribeLimitsResult; | |
-import com.amazonaws.services.kinesis.model.DescribeStreamConsumerRequest; | |
-import com.amazonaws.services.kinesis.model.DescribeStreamConsumerResult; | |
-import com.amazonaws.services.kinesis.model.DescribeStreamRequest; | |
-import com.amazonaws.services.kinesis.model.DescribeStreamResult; | |
-import com.amazonaws.services.kinesis.model.DescribeStreamSummaryRequest; | |
-import com.amazonaws.services.kinesis.model.DescribeStreamSummaryResult; | |
-import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringRequest; | |
-import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringResult; | |
-import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringRequest; | |
-import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringResult; | |
-import com.amazonaws.services.kinesis.model.GetRecordsRequest; | |
-import com.amazonaws.services.kinesis.model.GetRecordsResult; | |
-import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; | |
-import com.amazonaws.services.kinesis.model.GetShardIteratorResult; | |
-import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodRequest; | |
-import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodResult; | |
-import com.amazonaws.services.kinesis.model.ListShardsRequest; | |
-import com.amazonaws.services.kinesis.model.ListShardsResult; | |
-import com.amazonaws.services.kinesis.model.ListStreamConsumersRequest; | |
-import com.amazonaws.services.kinesis.model.ListStreamConsumersResult; | |
-import com.amazonaws.services.kinesis.model.ListStreamsRequest; | |
-import com.amazonaws.services.kinesis.model.ListStreamsResult; | |
-import com.amazonaws.services.kinesis.model.ListTagsForStreamRequest; | |
-import com.amazonaws.services.kinesis.model.ListTagsForStreamResult; | |
-import com.amazonaws.services.kinesis.model.MergeShardsRequest; | |
-import com.amazonaws.services.kinesis.model.MergeShardsResult; | |
-import com.amazonaws.services.kinesis.model.PutRecordRequest; | |
-import com.amazonaws.services.kinesis.model.PutRecordResult; | |
-import com.amazonaws.services.kinesis.model.PutRecordsRequest; | |
-import com.amazonaws.services.kinesis.model.PutRecordsResult; | |
-import com.amazonaws.services.kinesis.model.Record; | |
-import com.amazonaws.services.kinesis.model.RegisterStreamConsumerRequest; | |
-import com.amazonaws.services.kinesis.model.RegisterStreamConsumerResult; | |
-import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamRequest; | |
-import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamResult; | |
-import com.amazonaws.services.kinesis.model.Shard; | |
-import com.amazonaws.services.kinesis.model.ShardIteratorType; | |
-import com.amazonaws.services.kinesis.model.SplitShardRequest; | |
-import com.amazonaws.services.kinesis.model.SplitShardResult; | |
-import com.amazonaws.services.kinesis.model.StartStreamEncryptionRequest; | |
-import com.amazonaws.services.kinesis.model.StartStreamEncryptionResult; | |
-import com.amazonaws.services.kinesis.model.StopStreamEncryptionRequest; | |
-import com.amazonaws.services.kinesis.model.StopStreamEncryptionResult; | |
-import com.amazonaws.services.kinesis.model.StreamDescription; | |
-import com.amazonaws.services.kinesis.model.UpdateShardCountRequest; | |
-import com.amazonaws.services.kinesis.model.UpdateShardCountResult; | |
-import com.amazonaws.services.kinesis.producer.IKinesisProducer; | |
-import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; | |
-import com.amazonaws.services.kinesis.waiters.AmazonKinesisWaiters; | |
import java.io.Serializable; | |
-import java.nio.ByteBuffer; | |
import java.nio.charset.StandardCharsets; | |
import java.util.ArrayList; | |
import java.util.List; | |
@@ -100,9 +31,69 @@ | |
import org.apache.commons.lang.builder.EqualsBuilder; | |
import org.joda.time.Instant; | |
import org.mockito.Mockito; | |
+import software.amazon.awssdk.core.SdkBytes; | |
+import software.amazon.awssdk.http.SdkHttpResponse; | |
+import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; | |
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; | |
+import software.amazon.awssdk.services.kinesis.KinesisClient; | |
+import software.amazon.awssdk.services.kinesis.model.AddTagsToStreamRequest; | |
+import software.amazon.awssdk.services.kinesis.model.AddTagsToStreamResponse; | |
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; | |
+import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse; | |
+import software.amazon.awssdk.services.kinesis.model.DecreaseStreamRetentionPeriodRequest; | |
+import software.amazon.awssdk.services.kinesis.model.DecreaseStreamRetentionPeriodResponse; | |
+import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; | |
+import software.amazon.awssdk.services.kinesis.model.DeleteStreamResponse; | |
+import software.amazon.awssdk.services.kinesis.model.DescribeLimitsRequest; | |
+import software.amazon.awssdk.services.kinesis.model.DescribeLimitsResponse; | |
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest; | |
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse; | |
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; | |
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; | |
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; | |
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; | |
+import software.amazon.awssdk.services.kinesis.model.DisableEnhancedMonitoringRequest; | |
+import software.amazon.awssdk.services.kinesis.model.DisableEnhancedMonitoringResponse; | |
+import software.amazon.awssdk.services.kinesis.model.EnableEnhancedMonitoringRequest; | |
+import software.amazon.awssdk.services.kinesis.model.EnableEnhancedMonitoringResponse; | |
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; | |
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; | |
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; | |
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; | |
+import software.amazon.awssdk.services.kinesis.model.IncreaseStreamRetentionPeriodRequest; | |
+import software.amazon.awssdk.services.kinesis.model.IncreaseStreamRetentionPeriodResponse; | |
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; | |
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; | |
+import software.amazon.awssdk.services.kinesis.model.ListStreamConsumersRequest; | |
+import software.amazon.awssdk.services.kinesis.model.ListStreamConsumersResponse; | |
+import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest; | |
+import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse; | |
+import software.amazon.awssdk.services.kinesis.model.ListTagsForStreamRequest; | |
+import software.amazon.awssdk.services.kinesis.model.ListTagsForStreamResponse; | |
+import software.amazon.awssdk.services.kinesis.model.MergeShardsRequest; | |
+import software.amazon.awssdk.services.kinesis.model.MergeShardsResponse; | |
+import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; | |
+import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; | |
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; | |
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; | |
+import software.amazon.awssdk.services.kinesis.model.Record; | |
+import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest; | |
+import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse; | |
+import software.amazon.awssdk.services.kinesis.model.RemoveTagsFromStreamRequest; | |
+import software.amazon.awssdk.services.kinesis.model.RemoveTagsFromStreamResponse; | |
+import software.amazon.awssdk.services.kinesis.model.Shard; | |
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; | |
+import software.amazon.awssdk.services.kinesis.model.SplitShardRequest; | |
+import software.amazon.awssdk.services.kinesis.model.SplitShardResponse; | |
+import software.amazon.awssdk.services.kinesis.model.StartStreamEncryptionRequest; | |
+import software.amazon.awssdk.services.kinesis.model.StartStreamEncryptionResponse; | |
+import software.amazon.awssdk.services.kinesis.model.StopStreamEncryptionRequest; | |
+import software.amazon.awssdk.services.kinesis.model.StopStreamEncryptionResponse; | |
+import software.amazon.awssdk.services.kinesis.model.UpdateShardCountRequest; | |
+import software.amazon.awssdk.services.kinesis.model.UpdateShardCountResponse; | |
-/** Mock implemenation of {@link AmazonKinesis} for testing. */ | |
-class AmazonKinesisMock implements AmazonKinesis { | |
+/** Mock implementation of {@link KinesisClient} for testing. */ | |
+class AmazonKinesisMock implements KinesisClient { | |
static class TestData implements Serializable { | |
@@ -124,11 +115,12 @@ | |
} | |
public Record convertToRecord() { | |
- return new Record() | |
- .withApproximateArrivalTimestamp(arrivalTimestamp.toDate()) | |
- .withData(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8))) | |
- .withSequenceNumber(sequenceNumber) | |
- .withPartitionKey(""); | |
+ return Record.builder() | |
+ .approximateArrivalTimestamp(TimeUtil.toJava(arrivalTimestamp)) | |
+ .data(SdkBytes.fromByteArray(data.getBytes(StandardCharsets.UTF_8))) | |
+ .sequenceNumber(sequenceNumber) | |
+ .partitionKey("") | |
+ .build(); | |
} | |
@Override | |
@@ -167,7 +159,7 @@ | |
} | |
@Override | |
- public AmazonKinesis getKinesisClient() { | |
+ public KinesisClient getKinesisClient() { | |
return new AmazonKinesisMock( | |
shardedData.stream() | |
.map(testDatas -> transform(testDatas, TestData::convertToRecord)) | |
@@ -176,13 +168,13 @@ | |
} | |
@Override | |
- public AmazonCloudWatch getCloudWatchClient() { | |
- return Mockito.mock(AmazonCloudWatch.class); | |
+ public KinesisAsyncClient getKinesisAsyncClient() { | |
+ return Mockito.mock(KinesisAsyncClient.class); | |
} | |
@Override | |
- public IKinesisProducer createKinesisProducer(KinesisProducerConfiguration config) { | |
- throw new RuntimeException("Not implemented"); | |
+ public CloudWatchClient getCloudWatchClient() { | |
+ return Mockito.mock(CloudWatchClient.class); | |
} | |
} | |
@@ -195,286 +187,199 @@ | |
} | |
@Override | |
- public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) { | |
+ public String serviceName() { | |
+ return null; | |
+ } | |
+ | |
+ @Override | |
+ public void close() {} | |
+ | |
+ @Override | |
+ public GetRecordsResponse getRecords(GetRecordsRequest getRecordsRequest) { | |
List<String> shardIteratorParts = | |
- Splitter.on(':').splitToList(getRecordsRequest.getShardIterator()); | |
+ Splitter.on(':').splitToList(getRecordsRequest.shardIterator()); | |
int shardId = parseInt(shardIteratorParts.get(0)); | |
int startingRecord = parseInt(shardIteratorParts.get(1)); | |
List<Record> shardData = shardedData.get(shardId); | |
int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size()); | |
int fromIndex = min(startingRecord, toIndex); | |
- return new GetRecordsResult() | |
- .withRecords(shardData.subList(fromIndex, toIndex)) | |
- .withNextShardIterator(String.format("%s:%s", shardId, toIndex)) | |
- .withMillisBehindLatest(0L); | |
+ return GetRecordsResponse.builder() | |
+ .records(shardData.subList(fromIndex, toIndex)) | |
+ .nextShardIterator(String.format("%s:%s", shardId, toIndex)) | |
+ .millisBehindLatest(0L) | |
+ .build(); | |
} | |
@Override | |
- public GetShardIteratorResult getShardIterator(GetShardIteratorRequest getShardIteratorRequest) { | |
- ShardIteratorType shardIteratorType = | |
- ShardIteratorType.fromValue(getShardIteratorRequest.getShardIteratorType()); | |
+ public GetShardIteratorResponse getShardIterator( | |
+ GetShardIteratorRequest getShardIteratorRequest) { | |
+ ShardIteratorType shardIteratorType = getShardIteratorRequest.shardIteratorType(); | |
String shardIterator; | |
if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) { | |
- shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0); | |
+ shardIterator = String.format("%s:%s", getShardIteratorRequest.shardId(), 0); | |
} else { | |
throw new RuntimeException("Not implemented"); | |
} | |
- return new GetShardIteratorResult().withShardIterator(shardIterator); | |
+ return GetShardIteratorResponse.builder().shardIterator(shardIterator).build(); | |
} | |
@Override | |
- public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) { | |
+ public DescribeStreamResponse describeStream(DescribeStreamRequest describeStreamRequest) { | |
int nextShardId = 0; | |
- if (exclusiveStartShardId != null) { | |
- nextShardId = parseInt(exclusiveStartShardId) + 1; | |
+ if (describeStreamRequest.exclusiveStartShardId() != null) { | |
+ nextShardId = parseInt(describeStreamRequest.exclusiveStartShardId()) + 1; | |
} | |
boolean hasMoreShards = nextShardId + 1 < shardedData.size(); | |
List<Shard> shards = new ArrayList<>(); | |
if (nextShardId < shardedData.size()) { | |
- shards.add(new Shard().withShardId(Integer.toString(nextShardId))); | |
+ shards.add(Shard.builder().shardId(Integer.toString(nextShardId)).build()); | |
} | |
- HttpResponse response = new HttpResponse(null, null); | |
- response.setStatusCode(200); | |
- DescribeStreamResult result = new DescribeStreamResult(); | |
- result.setSdkHttpMetadata(SdkHttpMetadata.from(response)); | |
- result.withStreamDescription( | |
- new StreamDescription() | |
- .withHasMoreShards(hasMoreShards) | |
- .withShards(shards) | |
- .withStreamName(streamName)); | |
- return result; | |
+ DescribeStreamResponse.Builder builder = | |
+ DescribeStreamResponse.builder() | |
+ .streamDescription( | |
+ s -> | |
+ s.hasMoreShards(hasMoreShards) | |
+ .shards(shards) | |
+ .streamName(describeStreamRequest.streamName())); | |
+ builder.sdkHttpResponse(SdkHttpResponse.builder().statusCode(200).build()); | |
+ return builder.build(); | |
} | |
@Override | |
- public void setEndpoint(String endpoint) {} | |
- | |
- @Override | |
- public void setRegion(Region region) {} | |
- | |
- @Override | |
- public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) { | |
+ public AddTagsToStreamResponse addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) { | |
+ public CreateStreamResponse createStream(CreateStreamRequest createStreamRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public CreateStreamResult createStream(String streamName, Integer shardCount) { | |
- throw new RuntimeException("Not implemented"); | |
- } | |
- | |
- @Override | |
- public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod( | |
+ public DecreaseStreamRetentionPeriodResponse decreaseStreamRetentionPeriod( | |
DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) { | |
- throw new RuntimeException("Not implemented"); | |
- } | |
- | |
- @Override | |
- public DeleteStreamResult deleteStream(String streamName) { | |
- throw new RuntimeException("Not implemented"); | |
- } | |
- | |
- @Override | |
- public DeregisterStreamConsumerResult deregisterStreamConsumer( | |
- DeregisterStreamConsumerRequest deregisterStreamConsumerRequest) { | |
- throw new RuntimeException("Not implemented"); | |
- } | |
- | |
- @Override | |
- public DescribeLimitsResult describeLimits(DescribeLimitsRequest describeLimitsRequest) { | |
+ public DeleteStreamResponse deleteStream(DeleteStreamRequest deleteStreamRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) { | |
+ public DescribeLimitsResponse describeLimits(DescribeLimitsRequest describeLimitsRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public DescribeStreamResult describeStream(String streamName) { | |
- return describeStream(streamName, null); | |
- } | |
- | |
- @Override | |
- public DescribeStreamResult describeStream( | |
- String streamName, Integer limit, String exclusiveStartShardId) { | |
- throw new RuntimeException("Not implemented"); | |
- } | |
- | |
- @Override | |
- public DescribeStreamConsumerResult describeStreamConsumer( | |
+ public DescribeStreamConsumerResponse describeStreamConsumer( | |
DescribeStreamConsumerRequest describeStreamConsumerRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public DescribeStreamSummaryResult describeStreamSummary( | |
+ public DescribeStreamSummaryResponse describeStreamSummary( | |
DescribeStreamSummaryRequest describeStreamSummaryRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public DisableEnhancedMonitoringResult disableEnhancedMonitoring( | |
+ public DisableEnhancedMonitoringResponse disableEnhancedMonitoring( | |
DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public EnableEnhancedMonitoringResult enableEnhancedMonitoring( | |
+ public EnableEnhancedMonitoringResponse enableEnhancedMonitoring( | |
EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public GetShardIteratorResult getShardIterator( | |
- String streamName, String shardId, String shardIteratorType) { | |
- throw new RuntimeException("Not implemented"); | |
- } | |
- | |
- @Override | |
- public GetShardIteratorResult getShardIterator( | |
- String streamName, String shardId, String shardIteratorType, String startingSequenceNumber) { | |
- throw new RuntimeException("Not implemented"); | |
- } | |
- | |
- @Override | |
- public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod( | |
+ public IncreaseStreamRetentionPeriodResponse increaseStreamRetentionPeriod( | |
IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public ListShardsResult listShards(ListShardsRequest listShardsRequest) { | |
+ public ListShardsResponse listShards(ListShardsRequest listShardsRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public ListStreamConsumersResult listStreamConsumers( | |
+ public ListStreamConsumersResponse listStreamConsumers( | |
ListStreamConsumersRequest listStreamConsumersRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) { | |
- throw new RuntimeException("Not implemented"); | |
- } | |
- | |
- @Override | |
- public ListStreamsResult listStreams() { | |
+ public ListStreamsResponse listStreams(ListStreamsRequest listStreamsRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public ListStreamsResult listStreams(String exclusiveStartStreamName) { | |
+ public ListStreamsResponse listStreams() { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) { | |
- throw new RuntimeException("Not implemented"); | |
- } | |
- | |
- @Override | |
- public ListTagsForStreamResult listTagsForStream( | |
+ public ListTagsForStreamResponse listTagsForStream( | |
ListTagsForStreamRequest listTagsForStreamRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) { | |
- throw new RuntimeException("Not implemented"); | |
- } | |
- | |
- @Override | |
- public MergeShardsResult mergeShards( | |
- String streamName, String shardToMerge, String adjacentShardToMerge) { | |
- throw new RuntimeException("Not implemented"); | |
- } | |
- | |
- @Override | |
- public PutRecordResult putRecord(PutRecordRequest putRecordRequest) { | |
+ public MergeShardsResponse mergeShards(MergeShardsRequest mergeShardsRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) { | |
+ public PutRecordResponse putRecord(PutRecordRequest putRecordRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public PutRecordResult putRecord( | |
- String streamName, ByteBuffer data, String partitionKey, String sequenceNumberForOrdering) { | |
+ public PutRecordsResponse putRecords(PutRecordsRequest putRecordsRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) { | |
- throw new RuntimeException("Not implemented"); | |
- } | |
- | |
- @Override | |
- public RegisterStreamConsumerResult registerStreamConsumer( | |
+ public RegisterStreamConsumerResponse registerStreamConsumer( | |
RegisterStreamConsumerRequest registerStreamConsumerRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public RemoveTagsFromStreamResult removeTagsFromStream( | |
+ public RemoveTagsFromStreamResponse removeTagsFromStream( | |
RemoveTagsFromStreamRequest removeTagsFromStreamRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public SplitShardResult splitShard(SplitShardRequest splitShardRequest) { | |
+ public SplitShardResponse splitShard(SplitShardRequest splitShardRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public SplitShardResult splitShard( | |
- String streamName, String shardToSplit, String newStartingHashKey) { | |
- throw new RuntimeException("Not implemented"); | |
- } | |
- | |
- @Override | |
- public StartStreamEncryptionResult startStreamEncryption( | |
+ public StartStreamEncryptionResponse startStreamEncryption( | |
StartStreamEncryptionRequest startStreamEncryptionRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public StopStreamEncryptionResult stopStreamEncryption( | |
+ public StopStreamEncryptionResponse stopStreamEncryption( | |
StopStreamEncryptionRequest stopStreamEncryptionRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
@Override | |
- public UpdateShardCountResult updateShardCount(UpdateShardCountRequest updateShardCountRequest) { | |
- throw new RuntimeException("Not implemented"); | |
- } | |
- | |
- @Override | |
- public void shutdown() {} | |
- | |
- @Override | |
- public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) { | |
- throw new RuntimeException("Not implemented"); | |
- } | |
- | |
- @Override | |
- public AmazonKinesisWaiters waiters() { | |
+ public UpdateShardCountResponse updateShardCount( | |
+ UpdateShardCountRequest updateShardCountRequest) { | |
throw new RuntimeException("Not implemented"); | |
} | |
} | |
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/CustomOptionalTest.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java 2019-10-09 22:46:48.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/CustomOptionalTest.java 2019-10-25 12:08:13.000000000 -0700 | |
@@ -15,7 +15,7 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
import com.google.common.testing.EqualsTester; | |
import java.util.NoSuchElementException; | |
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/DynamicCheckpointGeneratorTest.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java 2019-10-24 21:25:10.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/DynamicCheckpointGeneratorTest.java 2019-10-27 12:35:16.000000000 -0700 | |
@@ -15,22 +15,24 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import static org.mockito.Mockito.when; | |
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; | |
-import com.amazonaws.services.kinesis.model.Shard; | |
import java.util.Set; | |
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.mockito.Mock; | |
-import org.mockito.runners.MockitoJUnitRunner; | |
+import org.powermock.core.classloader.annotations.PrepareForTest; | |
+import org.powermock.modules.junit4.PowerMockRunner; | |
+import software.amazon.awssdk.services.kinesis.model.Shard; | |
+import software.amazon.kinesis.common.InitialPositionInStream; | |
/** * */ | |
-@RunWith(MockitoJUnitRunner.class) | |
+@RunWith(PowerMockRunner.class) | |
+@PrepareForTest(Shard.class) | |
public class DynamicCheckpointGeneratorTest { | |
@Mock private SimplifiedKinesisClient kinesisClient; | |
@@ -39,16 +41,16 @@ | |
@Test | |
public void shouldMapAllShardsToCheckpoints() throws Exception { | |
- when(shard1.getShardId()).thenReturn("shard-01"); | |
- when(shard2.getShardId()).thenReturn("shard-02"); | |
- when(shard3.getShardId()).thenReturn("shard-03"); | |
+ when(shard1.shardId()).thenReturn("shard-01"); | |
+ when(shard2.shardId()).thenReturn("shard-02"); | |
+ when(shard3.shardId()).thenReturn("shard-03"); | |
Set<Shard> shards = Sets.newHashSet(shard1, shard2, shard3); | |
StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST); | |
when(startingPointShardsFinder.findShardsAtStartingPoint( | |
kinesisClient, "stream", startingPoint)) | |
.thenReturn(shards); | |
DynamicCheckpointGenerator underTest = | |
- new DynamicCheckpointGenerator("stream", startingPoint, startingPointShardsFinder); | |
+ new DynamicCheckpointGenerator("stream", null, startingPoint, startingPointShardsFinder); | |
KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient); | |
@@ -57,9 +59,9 @@ | |
@Test | |
public void shouldMapAllValidShardsToCheckpoints() throws Exception { | |
- when(shard1.getShardId()).thenReturn("shard-01"); | |
- when(shard2.getShardId()).thenReturn("shard-02"); | |
- when(shard3.getShardId()).thenReturn("shard-03"); | |
+ when(shard1.shardId()).thenReturn("shard-01"); | |
+ when(shard2.shardId()).thenReturn("shard-02"); | |
+ when(shard3.shardId()).thenReturn("shard-03"); | |
String streamName = "stream"; | |
Set<Shard> shards = Sets.newHashSet(shard1, shard2); | |
StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST); | |
@@ -68,11 +70,11 @@ | |
.thenReturn(shards); | |
DynamicCheckpointGenerator underTest = | |
- new DynamicCheckpointGenerator(streamName, startingPoint, startingPointShardsFinder); | |
+ new DynamicCheckpointGenerator(streamName, null, startingPoint, startingPointShardsFinder); | |
KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient); | |
assertThat(checkpoint) | |
.hasSize(2) | |
- .doesNotContain(new ShardCheckpoint(streamName, shard3.getShardId(), startingPoint)); | |
+ .doesNotContain(new ShardCheckpoint(streamName, shard3.shardId(), null, startingPoint)); | |
} | |
} | |
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisIOIT.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java 2019-10-09 22:46:48.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisIOIT.java 2019-10-27 12:35:16.000000000 -0700 | |
@@ -15,16 +15,16 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
import com.amazonaws.regions.Regions; | |
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; | |
import java.io.Serializable; | |
import java.nio.charset.StandardCharsets; | |
import java.util.Random; | |
import org.apache.beam.sdk.io.GenerateSequence; | |
import org.apache.beam.sdk.io.common.HashingFn; | |
import org.apache.beam.sdk.io.common.TestRow; | |
+import org.apache.beam.sdk.io.kinesis.KinesisPartitioner; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.testing.PAssert; | |
import org.apache.beam.sdk.testing.TestPipeline; | |
@@ -40,6 +40,8 @@ | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.junit.runners.JUnit4; | |
+import software.amazon.awssdk.regions.Region; | |
+import software.amazon.kinesis.common.InitialPositionInStream; | |
/** | |
* Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link | |
@@ -79,7 +81,7 @@ | |
.apply("Prepare Kinesis input records", ParDo.of(new ConvertToBytes())) | |
.apply( | |
"Write to Kinesis", | |
- KinesisIO.write() | |
+ org.apache.beam.sdk.io.kinesis.KinesisIO.write() | |
.withStreamName(options.getAwsKinesisStream()) | |
.withPartitioner(new RandomPartitioner()) | |
.withAWSClientsProvider( | |
@@ -99,7 +101,7 @@ | |
.withAWSClientsProvider( | |
options.getAwsAccessKey(), | |
options.getAwsSecretKey(), | |
- Regions.fromName(options.getAwsKinesisRegion())) | |
+ Region.of(options.getAwsKinesisRegion())) | |
.withMaxNumRecords(numberOfRows) | |
// to prevent endless running in case of error | |
.withMaxReadTime(Duration.standardMinutes(10)) | |
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisMockReadTest.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java 2019-10-09 22:46:48.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisMockReadTest.java 2019-10-27 12:35:16.000000000 -0700 | |
@@ -15,11 +15,10 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList; | |
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; | |
import java.util.List; | |
import org.apache.beam.sdk.testing.PAssert; | |
import org.apache.beam.sdk.testing.TestPipeline; | |
@@ -32,6 +31,7 @@ | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.junit.runners.JUnit4; | |
+import software.amazon.kinesis.common.InitialPositionInStream; | |
/** Tests {@link AmazonKinesisMock}. */ | |
@RunWith(JUnit4.class) | |
Only in sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/: KinesisMockWriteTest.java | |
Only in sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/: KinesisProducerMock.java | |
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisReaderCheckpointTest.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java 2019-10-09 22:46:48.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisReaderCheckpointTest.java 2019-10-26 15:27:02.000000000 -0700 | |
@@ -15,7 +15,7 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
import static java.util.Arrays.asList; | |
import static org.assertj.core.api.Assertions.assertThat; | |
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisReaderTest.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java 2019-10-22 13:43:07.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisReaderTest.java 2019-10-26 15:28:03.000000000 -0700 | |
@@ -15,7 +15,7 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
import static java.util.Arrays.asList; | |
import static org.assertj.core.api.Assertions.assertThat; | |
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisRecordCoderTest.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java 2019-10-09 22:46:48.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisRecordCoderTest.java 2019-10-27 12:35:16.000000000 -0700 | |
@@ -15,7 +15,7 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
import java.nio.ByteBuffer; | |
import java.nio.charset.StandardCharsets; | |
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisServiceMock.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisServiceMock.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisServiceMock.java 2019-10-09 22:46:48.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisServiceMock.java 2019-10-25 12:08:13.000000000 -0700 | |
@@ -15,7 +15,7 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList; | |
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisTestOptions.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java 2019-10-09 22:46:48.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisTestOptions.java 2019-10-25 12:08:13.000000000 -0700 | |
@@ -15,7 +15,7 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
import org.apache.beam.sdk.options.Default; | |
import org.apache.beam.sdk.options.Description; | |
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/RecordFilterTest.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java 2019-10-09 22:46:48.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/RecordFilterTest.java 2019-10-26 15:29:02.000000000 -0700 | |
@@ -15,7 +15,7 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
import static org.mockito.Mockito.when; | |
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardCheckpointTest.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java 2019-10-09 22:46:48.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardCheckpointTest.java 2019-10-27 12:35:17.000000000 -0700 | |
@@ -15,22 +15,20 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
-import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.LATEST; | |
-import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.TRIM_HORIZON; | |
-import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER; | |
-import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER; | |
-import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import static org.mockito.Matchers.anyString; | |
import static org.mockito.Matchers.eq; | |
import static org.mockito.Matchers.isNull; | |
import static org.mockito.Mockito.mock; | |
import static org.mockito.Mockito.when; | |
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER; | |
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER; | |
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP; | |
+import static software.amazon.kinesis.common.InitialPositionInStream.LATEST; | |
+import static software.amazon.kinesis.common.InitialPositionInStream.TRIM_HORIZON; | |
-import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; | |
-import com.amazonaws.services.kinesis.model.ShardIteratorType; | |
import java.io.IOException; | |
import org.joda.time.DateTime; | |
import org.joda.time.Instant; | |
@@ -39,6 +37,8 @@ | |
import org.junit.runner.RunWith; | |
import org.mockito.Mock; | |
import org.mockito.runners.MockitoJUnitRunner; | |
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; | |
+import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; | |
/** */ | |
@RunWith(MockitoJUnitRunner.class) | |
@@ -83,12 +83,12 @@ | |
@Test | |
public void testComparisonWithExtendedSequenceNumber() { | |
assertThat( | |
- new ShardCheckpoint("", "", new StartingPoint(LATEST)) | |
+ new ShardCheckpoint("", "", null, new StartingPoint(LATEST)) | |
.isBeforeOrAt(recordWith(new ExtendedSequenceNumber("100", 0L)))) | |
.isTrue(); | |
assertThat( | |
- new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)) | |
+ new ShardCheckpoint("", "", null, new StartingPoint(TRIM_HORIZON)) | |
.isBeforeOrAt(recordWith(new ExtendedSequenceNumber("100", 0L)))) | |
.isTrue(); | |
@@ -147,7 +147,7 @@ | |
private ShardCheckpoint checkpoint( | |
ShardIteratorType iteratorType, String sequenceNumber, Long subSequenceNumber) { | |
return new ShardCheckpoint( | |
- STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber, subSequenceNumber); | |
+ STREAM_NAME, SHARD_ID, null, iteratorType, sequenceNumber, subSequenceNumber); | |
} | |
private KinesisRecord recordWith(Instant approximateArrivalTimestamp) { | |
@@ -157,6 +157,6 @@ | |
} | |
private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) { | |
- return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp); | |
+ return new ShardCheckpoint(STREAM_NAME, SHARD_ID, null, iteratorType, timestamp); | |
} | |
} | |
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardReadersPoolTest.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java 2019-10-22 13:43:07.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardReadersPoolTest.java 2019-10-27 12:35:17.000000000 -0700 | |
@@ -15,7 +15,7 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
import static java.util.Collections.singletonList; | |
import static org.assertj.core.api.Assertions.assertThat; | |
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardRecordsIteratorTest.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java 2019-10-09 22:46:48.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardRecordsIteratorTest.java 2019-10-27 12:35:16.000000000 -0700 | |
@@ -15,7 +15,7 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
import static java.util.Arrays.asList; | |
import static java.util.Collections.singletonList; | |
@@ -24,7 +24,6 @@ | |
import static org.mockito.Matchers.anyListOf; | |
import static org.mockito.Mockito.when; | |
-import com.amazonaws.services.kinesis.model.ExpiredIteratorException; | |
import java.io.IOException; | |
import java.util.Collections; | |
import org.joda.time.Duration; | |
@@ -36,6 +35,7 @@ | |
import org.mockito.invocation.InvocationOnMock; | |
import org.mockito.junit.MockitoJUnitRunner; | |
import org.mockito.stubbing.Answer; | |
+import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; | |
/** Tests {@link ShardRecordsIterator}. */ | |
@RunWith(MockitoJUnitRunner.Silent.class) | |
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/SimplifiedKinesisClientTest.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java 2019-10-09 22:46:48.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/SimplifiedKinesisClientTest.java 2019-10-27 12:35:17.000000000 -0700 | |
@@ -15,7 +15,7 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; | |
@@ -25,25 +25,6 @@ | |
import static org.mockito.Mockito.verifyZeroInteractions; | |
import static org.mockito.Mockito.when; | |
-import com.amazonaws.AmazonServiceException; | |
-import com.amazonaws.AmazonServiceException.ErrorType; | |
-import com.amazonaws.services.cloudwatch.AmazonCloudWatch; | |
-import com.amazonaws.services.cloudwatch.model.Datapoint; | |
-import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsRequest; | |
-import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsResult; | |
-import com.amazonaws.services.kinesis.AmazonKinesis; | |
-import com.amazonaws.services.kinesis.model.DescribeStreamResult; | |
-import com.amazonaws.services.kinesis.model.ExpiredIteratorException; | |
-import com.amazonaws.services.kinesis.model.GetRecordsRequest; | |
-import com.amazonaws.services.kinesis.model.GetRecordsResult; | |
-import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; | |
-import com.amazonaws.services.kinesis.model.GetShardIteratorResult; | |
-import com.amazonaws.services.kinesis.model.LimitExceededException; | |
-import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; | |
-import com.amazonaws.services.kinesis.model.Record; | |
-import com.amazonaws.services.kinesis.model.Shard; | |
-import com.amazonaws.services.kinesis.model.ShardIteratorType; | |
-import com.amazonaws.services.kinesis.model.StreamDescription; | |
import java.nio.ByteBuffer; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
@@ -56,6 +37,27 @@ | |
import org.mockito.Mock; | |
import org.mockito.runners.MockitoJUnitRunner; | |
import org.mockito.stubbing.Answer; | |
+import software.amazon.awssdk.core.SdkBytes; | |
+import software.amazon.awssdk.core.exception.SdkClientException; | |
+import software.amazon.awssdk.core.exception.SdkServiceException; | |
+import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; | |
+import software.amazon.awssdk.services.cloudwatch.model.Datapoint; | |
+import software.amazon.awssdk.services.cloudwatch.model.GetMetricStatisticsRequest; | |
+import software.amazon.awssdk.services.cloudwatch.model.GetMetricStatisticsResponse; | |
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; | |
+import software.amazon.awssdk.services.kinesis.KinesisClient; | |
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; | |
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; | |
+import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; | |
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; | |
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; | |
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; | |
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; | |
+import software.amazon.awssdk.services.kinesis.model.LimitExceededException; | |
+import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; | |
+import software.amazon.awssdk.services.kinesis.model.Record; | |
+import software.amazon.awssdk.services.kinesis.model.Shard; | |
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; | |
/** * */ | |
@RunWith(MockitoJUnitRunner.class) | |
@@ -68,19 +70,21 @@ | |
private static final String SHARD_ITERATOR = "iterator"; | |
private static final String SEQUENCE_NUMBER = "abc123"; | |
- @Mock private AmazonKinesis kinesis; | |
- @Mock private AmazonCloudWatch cloudWatch; | |
+ @Mock private KinesisClient kinesis; | |
+ @Mock private KinesisAsyncClient kinesisAsync; | |
+ @Mock private CloudWatchClient cloudWatch; | |
@InjectMocks private SimplifiedKinesisClient underTest; | |
@Test | |
public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception { | |
when(kinesis.getShardIterator( | |
- new GetShardIteratorRequest() | |
- .withStreamName(STREAM) | |
- .withShardId(SHARD_1) | |
- .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) | |
- .withStartingSequenceNumber(SEQUENCE_NUMBER))) | |
- .thenReturn(new GetShardIteratorResult().withShardIterator(SHARD_ITERATOR)); | |
+ GetShardIteratorRequest.builder() | |
+ .streamName(STREAM) | |
+ .shardId(SHARD_1) | |
+ .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) | |
+ .startingSequenceNumber(SEQUENCE_NUMBER) | |
+ .build())) | |
+ .thenReturn(GetShardIteratorResponse.builder().shardIterator(SHARD_ITERATOR).build()); | |
String stream = | |
underTest.getShardIterator( | |
@@ -93,12 +97,13 @@ | |
public void shouldReturnIteratorStartingWithTimestamp() throws Exception { | |
Instant timestamp = Instant.now(); | |
when(kinesis.getShardIterator( | |
- new GetShardIteratorRequest() | |
- .withStreamName(STREAM) | |
- .withShardId(SHARD_1) | |
- .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) | |
- .withTimestamp(timestamp.toDate()))) | |
- .thenReturn(new GetShardIteratorResult().withShardIterator(SHARD_ITERATOR)); | |
+ GetShardIteratorRequest.builder() | |
+ .streamName(STREAM) | |
+ .shardId(SHARD_1) | |
+ .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) | |
+ .timestamp(TimeUtil.toJava(timestamp)) | |
+ .build())) | |
+ .thenReturn(GetShardIteratorResponse.builder().shardIterator(SHARD_ITERATOR).build()); | |
String stream = | |
underTest.getShardIterator( | |
@@ -110,31 +115,30 @@ | |
@Test | |
public void shouldHandleExpiredIterationExceptionForGetShardIterator() { | |
shouldHandleGetShardIteratorError( | |
- new ExpiredIteratorException(""), ExpiredIteratorException.class); | |
+ ExpiredIteratorException.builder().build(), ExpiredIteratorException.class); | |
} | |
@Test | |
public void shouldHandleLimitExceededExceptionForGetShardIterator() { | |
shouldHandleGetShardIteratorError( | |
- new LimitExceededException(""), TransientKinesisException.class); | |
+ LimitExceededException.builder().build(), TransientKinesisException.class); | |
} | |
@Test | |
public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() { | |
shouldHandleGetShardIteratorError( | |
- new ProvisionedThroughputExceededException(""), TransientKinesisException.class); | |
+ ProvisionedThroughputExceededException.builder().build(), TransientKinesisException.class); | |
} | |
@Test | |
public void shouldHandleServiceErrorForGetShardIterator() { | |
shouldHandleGetShardIteratorError( | |
- newAmazonServiceException(ErrorType.Service), TransientKinesisException.class); | |
+ SdkServiceException.builder().build(), TransientKinesisException.class); | |
} | |
@Test | |
public void shouldHandleClientErrorForGetShardIterator() { | |
- shouldHandleGetShardIteratorError( | |
- newAmazonServiceException(ErrorType.Client), RuntimeException.class); | |
+ shouldHandleGetShardIteratorError(SdkClientException.builder().build(), RuntimeException.class); | |
} | |
@Test | |
@@ -145,10 +149,11 @@ | |
private void shouldHandleGetShardIteratorError( | |
Exception thrownException, Class<? extends Exception> expectedExceptionClass) { | |
GetShardIteratorRequest request = | |
- new GetShardIteratorRequest() | |
- .withStreamName(STREAM) | |
- .withShardId(SHARD_1) | |
- .withShardIteratorType(ShardIteratorType.LATEST); | |
+ GetShardIteratorRequest.builder() | |
+ .streamName(STREAM) | |
+ .shardId(SHARD_1) | |
+ .shardIteratorType(ShardIteratorType.LATEST) | |
+ .build(); | |
when(kinesis.getShardIterator(request)).thenThrow(thrownException); | |
@@ -164,19 +169,24 @@ | |
@Test | |
public void shouldListAllShards() throws Exception { | |
- Shard shard1 = new Shard().withShardId(SHARD_1); | |
- Shard shard2 = new Shard().withShardId(SHARD_2); | |
- Shard shard3 = new Shard().withShardId(SHARD_3); | |
- when(kinesis.describeStream(STREAM, null)) | |
+ Shard shard1 = Shard.builder().shardId(SHARD_1).build(); | |
+ Shard shard2 = Shard.builder().shardId(SHARD_2).build(); | |
+ Shard shard3 = Shard.builder().shardId(SHARD_3).build(); | |
+ when(kinesis.describeStream( | |
+ DescribeStreamRequest.builder().streamName(STREAM).exclusiveStartShardId(null).build())) | |
.thenReturn( | |
- new DescribeStreamResult() | |
- .withStreamDescription( | |
- new StreamDescription().withShards(shard1, shard2).withHasMoreShards(true))); | |
- when(kinesis.describeStream(STREAM, SHARD_2)) | |
+ DescribeStreamResponse.builder() | |
+ .streamDescription(s -> s.shards(shard1, shard2).hasMoreShards(true)) | |
+ .build()); | |
+ when(kinesis.describeStream( | |
+ DescribeStreamRequest.builder() | |
+ .streamName(STREAM) | |
+ .exclusiveStartShardId(SHARD_2) | |
+ .build())) | |
.thenReturn( | |
- new DescribeStreamResult() | |
- .withStreamDescription( | |
- new StreamDescription().withShards(shard3).withHasMoreShards(false))); | |
+ DescribeStreamResponse.builder() | |
+ .streamDescription(s -> s.shards(shard3).hasMoreShards(false)) | |
+ .build()); | |
List<Shard> shards = underTest.listShards(STREAM); | |
@@ -185,30 +195,31 @@ | |
@Test | |
public void shouldHandleExpiredIterationExceptionForShardListing() { | |
- shouldHandleShardListingError(new ExpiredIteratorException(""), ExpiredIteratorException.class); | |
+ shouldHandleShardListingError( | |
+ ExpiredIteratorException.builder().build(), ExpiredIteratorException.class); | |
} | |
@Test | |
public void shouldHandleLimitExceededExceptionForShardListing() { | |
- shouldHandleShardListingError(new LimitExceededException(""), TransientKinesisException.class); | |
+ shouldHandleShardListingError( | |
+ LimitExceededException.builder().build(), TransientKinesisException.class); | |
} | |
@Test | |
public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() { | |
shouldHandleShardListingError( | |
- new ProvisionedThroughputExceededException(""), TransientKinesisException.class); | |
+ ProvisionedThroughputExceededException.builder().build(), TransientKinesisException.class); | |
} | |
@Test | |
public void shouldHandleServiceErrorForShardListing() { | |
shouldHandleShardListingError( | |
- newAmazonServiceException(ErrorType.Service), TransientKinesisException.class); | |
+ SdkServiceException.builder().build(), TransientKinesisException.class); | |
} | |
@Test | |
public void shouldHandleClientErrorForShardListing() { | |
- shouldHandleShardListingError( | |
- newAmazonServiceException(ErrorType.Client), RuntimeException.class); | |
+ shouldHandleShardListingError(SdkClientException.builder().build(), RuntimeException.class); | |
} | |
@Test | |
@@ -218,7 +229,7 @@ | |
private void shouldHandleShardListingError( | |
Exception thrownException, Class<? extends Exception> expectedExceptionClass) { | |
- when(kinesis.describeStream(STREAM, null)).thenThrow(thrownException); | |
+ when(kinesis.describeStream(any(DescribeStreamRequest.class))).thenThrow(thrownException); | |
try { | |
underTest.listShards(STREAM); | |
failBecauseExceptionWasNotThrown(expectedExceptionClass); | |
@@ -236,8 +247,10 @@ | |
Minutes periodTime = Minutes.minutesBetween(countSince, countTo); | |
GetMetricStatisticsRequest metricStatisticsRequest = | |
underTest.createMetricStatisticsRequest(STREAM, countSince, countTo, periodTime); | |
- GetMetricStatisticsResult result = | |
- new GetMetricStatisticsResult().withDatapoints(new Datapoint().withSum(1.0)); | |
+ GetMetricStatisticsResponse result = | |
+ GetMetricStatisticsResponse.builder() | |
+ .datapoints(Datapoint.builder().sum(1.0).build()) | |
+ .build(); | |
when(cloudWatch.getMetricStatistics(metricStatisticsRequest)).thenReturn(result); | |
@@ -253,12 +266,13 @@ | |
Minutes periodTime = Minutes.minutesBetween(countSince, countTo); | |
GetMetricStatisticsRequest metricStatisticsRequest = | |
underTest.createMetricStatisticsRequest(STREAM, countSince, countTo, periodTime); | |
- GetMetricStatisticsResult result = | |
- new GetMetricStatisticsResult() | |
- .withDatapoints( | |
- new Datapoint().withSum(1.0), | |
- new Datapoint().withSum(3.0), | |
- new Datapoint().withSum(2.0)); | |
+ GetMetricStatisticsResponse result = | |
+ GetMetricStatisticsResponse.builder() | |
+ .datapoints( | |
+ Datapoint.builder().sum(1.0).build(), | |
+ Datapoint.builder().sum(3.0).build(), | |
+ Datapoint.builder().sum(2.0).build()) | |
+ .build(); | |
when(cloudWatch.getMetricStatistics(metricStatisticsRequest)).thenReturn(result); | |
@@ -281,25 +295,24 @@ | |
@Test | |
public void shouldHandleLimitExceededExceptionForGetBacklogBytes() { | |
shouldHandleGetBacklogBytesError( | |
- new LimitExceededException(""), TransientKinesisException.class); | |
+ LimitExceededException.builder().build(), TransientKinesisException.class); | |
} | |
@Test | |
public void shouldHandleProvisionedThroughputExceededExceptionForGetBacklogBytes() { | |
shouldHandleGetBacklogBytesError( | |
- new ProvisionedThroughputExceededException(""), TransientKinesisException.class); | |
+ ProvisionedThroughputExceededException.builder().build(), TransientKinesisException.class); | |
} | |
@Test | |
public void shouldHandleServiceErrorForGetBacklogBytes() { | |
shouldHandleGetBacklogBytesError( | |
- newAmazonServiceException(ErrorType.Service), TransientKinesisException.class); | |
+ SdkServiceException.builder().build(), TransientKinesisException.class); | |
} | |
@Test | |
public void shouldHandleClientErrorForGetBacklogBytes() { | |
- shouldHandleGetBacklogBytesError( | |
- newAmazonServiceException(ErrorType.Client), RuntimeException.class); | |
+ shouldHandleGetBacklogBytesError(SdkClientException.builder().build(), RuntimeException.class); | |
} | |
@Test | |
@@ -326,22 +339,19 @@ | |
} | |
} | |
- private AmazonServiceException newAmazonServiceException(ErrorType errorType) { | |
- AmazonServiceException exception = new AmazonServiceException(""); | |
- exception.setErrorType(errorType); | |
- return exception; | |
- } | |
- | |
@Test | |
public void shouldReturnLimitedNumberOfRecords() throws Exception { | |
final Integer limit = 100; | |
doAnswer( | |
- (Answer<GetRecordsResult>) | |
+ (Answer<GetRecordsResponse>) | |
invocation -> { | |
GetRecordsRequest request = (GetRecordsRequest) invocation.getArguments()[0]; | |
- List<Record> records = generateRecords(request.getLimit()); | |
- return new GetRecordsResult().withRecords(records).withMillisBehindLatest(1000L); | |
+ List<Record> records = generateRecords(request.limit()); | |
+ return GetRecordsResponse.builder() | |
+ .records(records) | |
+ .millisBehindLatest(1000L) | |
+ .build(); | |
}) | |
.when(kinesis) | |
.getRecords(any(GetRecordsRequest.class)); | |
@@ -356,10 +366,11 @@ | |
byte[] value = new byte[1024]; | |
Arrays.fill(value, (byte) i); | |
records.add( | |
- new Record() | |
- .withSequenceNumber(String.valueOf(i)) | |
- .withPartitionKey("key") | |
- .withData(ByteBuffer.wrap(value))); | |
+ Record.builder() | |
+ .sequenceNumber(String.valueOf(i)) | |
+ .partitionKey("key") | |
+ .data(SdkBytes.fromByteBuffer(ByteBuffer.wrap(value))) | |
+ .build()); | |
} | |
return records; | |
} | |
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/StartingPointShardsFinderTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/StartingPointShardsFinderTest.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/StartingPointShardsFinderTest.java 2019-10-09 22:46:48.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/StartingPointShardsFinderTest.java 2019-10-27 12:34:56.000000000 -0700 | |
@@ -15,16 +15,12 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import static org.mockito.Mockito.mock; | |
import static org.mockito.Mockito.when; | |
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; | |
-import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; | |
-import com.amazonaws.services.kinesis.model.Shard; | |
-import com.amazonaws.services.kinesis.model.ShardIteratorType; | |
import java.util.Collections; | |
import java.util.List; | |
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; | |
@@ -32,6 +28,10 @@ | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.junit.runners.JUnit4; | |
+import software.amazon.awssdk.services.kinesis.model.Shard; | |
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; | |
+import software.amazon.kinesis.common.InitialPositionInStream; | |
+import software.amazon.kinesis.retrieval.KinesisClientRecord; | |
/** Tests StartingPointShardsFinder. */ | |
@RunWith(JUnit4.class) | |
@@ -61,18 +61,30 @@ | |
*/ | |
private final Shard shard00 = createClosedShard("0000"); | |
private final Shard shard01 = createClosedShard("0001"); | |
- private final Shard shard02 = createClosedShard("0002").withParentShardId("0000"); | |
- private final Shard shard03 = createClosedShard("0003").withParentShardId("0000"); | |
- private final Shard shard04 = createClosedShard("0004").withParentShardId("0001"); | |
- private final Shard shard05 = createClosedShard("0005").withParentShardId("0001"); | |
+ private final Shard shard02 = createClosedShard("0002").toBuilder().parentShardId("0000").build(); | |
+ private final Shard shard03 = createClosedShard("0003").toBuilder().parentShardId("0000").build(); | |
+ private final Shard shard04 = createClosedShard("0004").toBuilder().parentShardId("0001").build(); | |
+ private final Shard shard05 = createClosedShard("0005").toBuilder().parentShardId("0001").build(); | |
private final Shard shard06 = | |
- createClosedShard("0006").withParentShardId("0003").withAdjacentParentShardId("0004"); | |
- private final Shard shard07 = createClosedShard("0007").withParentShardId("0006"); | |
- private final Shard shard08 = createClosedShard("0008").withParentShardId("0006"); | |
+ createClosedShard("0006") | |
+ .toBuilder() | |
+ .parentShardId("0003") | |
+ .adjacentParentShardId("0004") | |
+ .build(); | |
+ private final Shard shard07 = createClosedShard("0007").toBuilder().parentShardId("0006").build(); | |
+ private final Shard shard08 = createClosedShard("0008").toBuilder().parentShardId("0006").build(); | |
private final Shard shard09 = | |
- createOpenShard("0009").withParentShardId("0002").withAdjacentParentShardId("0007"); | |
+ createOpenShard("0009") | |
+ .toBuilder() | |
+ .parentShardId("0002") | |
+ .adjacentParentShardId("0007") | |
+ .build(); | |
private final Shard shard10 = | |
- createOpenShard("0010").withParentShardId("0008").withAdjacentParentShardId("0005"); | |
+ createOpenShard("0010") | |
+ .toBuilder() | |
+ .parentShardId("0008") | |
+ .adjacentParentShardId("0005") | |
+ .build(); | |
private final List<Shard> allShards = | |
ImmutableList.of( | |
@@ -191,7 +203,11 @@ | |
// given | |
StartingPoint latestStartingPoint = new StartingPoint(InitialPositionInStream.LATEST); | |
Shard closedShard10 = | |
- createClosedShard("0010").withParentShardId("0008").withAdjacentParentShardId("0005"); | |
+ createClosedShard("0010") | |
+ .toBuilder() | |
+ .parentShardId("0008") | |
+ .adjacentParentShardId("0005") | |
+ .build(); | |
List<Shard> shards = | |
ImmutableList.of( | |
shard00, | |
@@ -213,14 +229,14 @@ | |
} | |
private Shard createClosedShard(String shardId) { | |
- Shard shard = new Shard().withShardId(shardId); | |
+ Shard shard = Shard.builder().shardId(shardId).build(); | |
activeAtPoint(shard, ShardIteratorType.TRIM_HORIZON); | |
expiredAtPoint(shard, ShardIteratorType.LATEST); | |
return shard; | |
} | |
private Shard createOpenShard(String shardId) { | |
- Shard shard = new Shard().withShardId(shardId); | |
+ Shard shard = Shard.builder().shardId(shardId).build(); | |
activeAtPoint(shard, ShardIteratorType.TRIM_HORIZON); | |
activeAtPoint(shard, ShardIteratorType.LATEST); | |
return shard; | |
@@ -237,13 +253,13 @@ | |
private void activeAtTimestamp(Shard shard, Instant startTimestamp) { | |
prepareShard( | |
shard, | |
- "timestampIterator-" + shard.getShardId(), | |
+ "timestampIterator-" + shard.shardId(), | |
ShardIteratorType.AT_TIMESTAMP, | |
startTimestamp); | |
} | |
private void activeAtPoint(Shard shard, ShardIteratorType shardIteratorType) { | |
- prepareShard(shard, shardIteratorType.toString() + shard.getShardId(), shardIteratorType, null); | |
+ prepareShard(shard, shardIteratorType.toString() + shard.shardId(), shardIteratorType, null); | |
} | |
private void prepareShard( | |
@@ -252,28 +268,23 @@ | |
ShardIteratorType shardIteratorType, | |
Instant startTimestamp) { | |
try { | |
- String shardIterator = shardIteratorType + shard.getShardId() + "-current"; | |
+ String shardIterator = shardIteratorType + shard.shardId() + "-current"; | |
if (shardIteratorType == ShardIteratorType.AT_TIMESTAMP) { | |
when(kinesis.getShardIterator( | |
- STREAM_NAME, | |
- shard.getShardId(), | |
- ShardIteratorType.AT_TIMESTAMP, | |
- null, | |
- startTimestamp)) | |
+ STREAM_NAME, shard.shardId(), ShardIteratorType.AT_TIMESTAMP, null, startTimestamp)) | |
.thenReturn(shardIterator); | |
} else { | |
- when(kinesis.getShardIterator( | |
- STREAM_NAME, shard.getShardId(), shardIteratorType, null, null)) | |
+ when(kinesis.getShardIterator(STREAM_NAME, shard.shardId(), shardIteratorType, null, null)) | |
.thenReturn(shardIterator); | |
} | |
GetKinesisRecordsResult result = | |
new GetKinesisRecordsResult( | |
- Collections.<UserRecord>emptyList(), | |
+ Collections.<KinesisClientRecord>emptyList(), | |
nextIterator, | |
0, | |
STREAM_NAME, | |
- shard.getShardId()); | |
- when(kinesis.getRecords(shardIterator, STREAM_NAME, shard.getShardId())).thenReturn(result); | |
+ shard.shardId()); | |
+ when(kinesis.getRecords(shardIterator, STREAM_NAME, shard.shardId())).thenReturn(result); | |
} catch (TransientKinesisException e) { | |
throw new RuntimeException(e); | |
} | |
diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/WatermarkPolicyTest.java | |
--- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java 2019-10-09 22:46:48.000000000 -0700 | |
+++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/WatermarkPolicyTest.java 2019-10-25 12:08:13.000000000 -0700 | |
@@ -15,7 +15,7 @@ | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
-package org.apache.beam.sdk.io.kinesis; | |
+package org.apache.beam.sdk.io.kinesis2; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import static org.mockito.Mockito.mock; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment