Created
March 12, 2020 09:07
-
-
Save aromanenko-dev/02c4b206bd7c64bec3ee7a4e3588a5b6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/sdks/java/io/kinesis/build.gradle b/sdks/java/io/kinesis/build.gradle | |
index 2144d10c36..bf477e1dfa 100644 | |
--- a/sdks/java/io/kinesis/build.gradle | |
+++ b/sdks/java/io/kinesis/build.gradle | |
@@ -27,6 +27,7 @@ ext.summary = "Library to read Kinesis streams." | |
test { | |
// Forking every test resolves an issue where KinesisMockReadTest gets stuck forever. | |
forkEvery 1 | |
+ useJUnitPlatform() | |
} | |
dependencies { | |
@@ -42,7 +43,12 @@ dependencies { | |
compile "com.amazonaws:amazon-kinesis-producer:0.13.1" | |
compile "commons-lang:commons-lang:2.6" | |
testCompile project(path: ":sdks:java:io:common", configuration: "testRuntime") | |
- testCompile library.java.junit | |
+// testCompile library.java.junit | |
+ testCompile("org.junit.platform:junit-platform-launcher:1.6.0") | |
+ testCompile("org.junit.platform:junit-platform-runner:1.6.0") | |
+ testCompile("org.junit.jupiter:junit-jupiter-engine:5.6.0") | |
+ testCompile("org.junit.vintage:junit-vintage-engine:5.6.0") | |
+ testCompile("org.mockito:mockito-junit-jupiter:2.23.0") | |
testCompile library.java.mockito_core | |
testCompile library.java.guava_testlib | |
testCompile library.java.hamcrest_core | |
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java | |
index 59ab92ea22..d5c9110758 100644 | |
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java | |
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java | |
@@ -24,6 +24,7 @@ import static org.mockito.Matchers.eq; | |
import static org.mockito.Matchers.same; | |
import static org.mockito.Mockito.atLeastOnce; | |
import static org.mockito.Mockito.doReturn; | |
+import static org.mockito.Mockito.lenient; | |
import static org.mockito.Mockito.timeout; | |
import static org.mockito.Mockito.times; | |
import static org.mockito.Mockito.verify; | |
@@ -37,17 +38,24 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch; | |
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; | |
import org.joda.time.Duration; | |
import org.joda.time.Instant; | |
-import org.junit.After; | |
-import org.junit.Before; | |
-import org.junit.Test; | |
+import org.junit.jupiter.api.AfterEach; | |
+import org.junit.jupiter.api.BeforeEach; | |
+import org.junit.jupiter.api.RepeatedTest; | |
+import org.junit.jupiter.api.Test; | |
+import org.junit.jupiter.api.extension.ExtendWith; | |
+import org.junit.platform.runner.JUnitPlatform; | |
import org.junit.runner.RunWith; | |
+import org.mockito.ArgumentCaptor; | |
import org.mockito.Mock; | |
import org.mockito.Mockito; | |
import org.mockito.junit.MockitoJUnitRunner; | |
+import org.mockito.junit.jupiter.MockitoExtension; | |
import org.mockito.stubbing.Answer; | |
/** Tests {@link ShardReadersPool}. */ | |
-@RunWith(MockitoJUnitRunner.Silent.class) | |
+//@RunWith(MockitoJUnitRunner.Silent.class) | |
+@ExtendWith(MockitoExtension.class) | |
+@RunWith(JUnitPlatform.class) | |
public class ShardReadersPoolTest { | |
private static final int TIMEOUT_IN_MILLIS = (int) TimeUnit.SECONDS.toMillis(10); | |
@@ -64,7 +72,7 @@ public class ShardReadersPoolTest { | |
private ShardReadersPool shardReadersPool; | |
private final Instant now = Instant.now(); | |
- @Before | |
+ @BeforeEach | |
public void setUp() throws TransientKinesisException { | |
when(a.getShardId()).thenReturn("shard1"); | |
when(b.getShardId()).thenReturn("shard1"); | |
@@ -73,11 +81,11 @@ public class ShardReadersPoolTest { | |
when(firstCheckpoint.getShardId()).thenReturn("shard1"); | |
when(secondCheckpoint.getShardId()).thenReturn("shard2"); | |
when(firstIterator.getShardId()).thenReturn("shard1"); | |
- when(firstIterator.getCheckpoint()).thenReturn(firstCheckpoint); | |
+ lenient().when(firstIterator.getCheckpoint()).thenReturn(firstCheckpoint); | |
when(secondIterator.getShardId()).thenReturn("shard2"); | |
- when(secondIterator.getCheckpoint()).thenReturn(secondCheckpoint); | |
- when(thirdIterator.getShardId()).thenReturn("shard3"); | |
- when(fourthIterator.getShardId()).thenReturn("shard4"); | |
+ lenient().when(secondIterator.getCheckpoint()).thenReturn(secondCheckpoint); | |
+ lenient().when(thirdIterator.getShardId()).thenReturn("shard3"); | |
+ lenient().when(fourthIterator.getShardId()).thenReturn("shard4"); | |
WatermarkPolicy watermarkPolicy = | |
WatermarkPolicyFactory.withArrivalTimePolicy().createWatermarkPolicy(); | |
@@ -89,14 +97,14 @@ public class ShardReadersPoolTest { | |
new ShardReadersPool( | |
kinesis, checkpoint, watermarkPolicyFactory, rateLimitPolicyFactory, 100)); | |
- when(watermarkPolicyFactory.createWatermarkPolicy()).thenReturn(watermarkPolicy); | |
+ lenient().when(watermarkPolicyFactory.createWatermarkPolicy()).thenReturn(watermarkPolicy); | |
when(rateLimitPolicyFactory.getRateLimitPolicy()).thenReturn(rateLimitPolicy); | |
doReturn(firstIterator).when(shardReadersPool).createShardIterator(kinesis, firstCheckpoint); | |
doReturn(secondIterator).when(shardReadersPool).createShardIterator(kinesis, secondCheckpoint); | |
} | |
- @After | |
+ @AfterEach | |
public void clean() { | |
shardReadersPool.stop(); | |
} | |
@@ -301,7 +309,7 @@ public class ShardReadersPoolTest { | |
verify(secondIterator, times(2)).getShardWatermark(); | |
} | |
- @Test | |
+ @RepeatedTest(10000) | |
public void shouldCallRateLimitPolicy() | |
throws TransientKinesisException, KinesisShardClosedException, InterruptedException { | |
KinesisClientThrottledException e = new KinesisClientThrottledException("", null); | |
@@ -324,10 +332,20 @@ public class ShardReadersPoolTest { | |
} | |
} | |
+ ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class); | |
+ verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture()); | |
+ List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues(); | |
+ assertThat(capturedRecords).contains( | |
+ ImmutableList.of(a, b), | |
+ singletonList(c), | |
+ singletonList(d), | |
+ Collections.emptyList() | |
+ ); | |
+ | |
verify(customRateLimitPolicy).onThrottle(same(e)); | |
- verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b))); | |
- verify(customRateLimitPolicy).onSuccess(eq(singletonList(c))); | |
- verify(customRateLimitPolicy).onSuccess(eq(singletonList(d))); | |
- verify(customRateLimitPolicy, atLeastOnce()).onSuccess(eq(Collections.emptyList())); | |
+// verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b))); | |
+// verify(customRateLimitPolicy).onSuccess(eq(singletonList(c))); | |
+// verify(customRateLimitPolicy).onSuccess(eq(singletonList(d))); | |
+// verify(customRateLimitPolicy, atLeastOnce()).onSuccess(eq(Collections.emptyList())); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment