Skip to content

Instantly share code, notes, and snippets.

@aromanenko-dev
Created March 12, 2020 09:07
Show Gist options
  • Save aromanenko-dev/02c4b206bd7c64bec3ee7a4e3588a5b6 to your computer and use it in GitHub Desktop.
Save aromanenko-dev/02c4b206bd7c64bec3ee7a4e3588a5b6 to your computer and use it in GitHub Desktop.
diff --git a/sdks/java/io/kinesis/build.gradle b/sdks/java/io/kinesis/build.gradle
index 2144d10c36..bf477e1dfa 100644
--- a/sdks/java/io/kinesis/build.gradle
+++ b/sdks/java/io/kinesis/build.gradle
@@ -27,6 +27,7 @@ ext.summary = "Library to read Kinesis streams."
test {
// Forking every test resolves an issue where KinesisMockReadTest gets stuck forever.
forkEvery 1
+ useJUnitPlatform()
}
dependencies {
@@ -42,7 +43,12 @@ dependencies {
compile "com.amazonaws:amazon-kinesis-producer:0.13.1"
compile "commons-lang:commons-lang:2.6"
testCompile project(path: ":sdks:java:io:common", configuration: "testRuntime")
- testCompile library.java.junit
+// testCompile library.java.junit
+ testCompile("org.junit.platform:junit-platform-launcher:1.6.0")
+ testCompile("org.junit.platform:junit-platform-runner:1.6.0")
+ testCompile("org.junit.jupiter:junit-jupiter-engine:5.6.0")
+ testCompile("org.junit.vintage:junit-vintage-engine:5.6.0")
+ testCompile("org.mockito:mockito-junit-jupiter:2.23.0")
testCompile library.java.mockito_core
testCompile library.java.guava_testlib
testCompile library.java.hamcrest_core
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
index 59ab92ea22..d5c9110758 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -37,17 +38,24 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.joda.time.Instant;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;
/** Tests {@link ShardReadersPool}. */
-@RunWith(MockitoJUnitRunner.Silent.class)
+//@RunWith(MockitoJUnitRunner.Silent.class)
+@ExtendWith(MockitoExtension.class)
+@RunWith(JUnitPlatform.class)
public class ShardReadersPoolTest {
private static final int TIMEOUT_IN_MILLIS = (int) TimeUnit.SECONDS.toMillis(10);
@@ -64,7 +72,7 @@ public class ShardReadersPoolTest {
private ShardReadersPool shardReadersPool;
private final Instant now = Instant.now();
- @Before
+ @BeforeEach
public void setUp() throws TransientKinesisException {
when(a.getShardId()).thenReturn("shard1");
when(b.getShardId()).thenReturn("shard1");
@@ -73,11 +81,11 @@ public class ShardReadersPoolTest {
when(firstCheckpoint.getShardId()).thenReturn("shard1");
when(secondCheckpoint.getShardId()).thenReturn("shard2");
when(firstIterator.getShardId()).thenReturn("shard1");
- when(firstIterator.getCheckpoint()).thenReturn(firstCheckpoint);
+ lenient().when(firstIterator.getCheckpoint()).thenReturn(firstCheckpoint);
when(secondIterator.getShardId()).thenReturn("shard2");
- when(secondIterator.getCheckpoint()).thenReturn(secondCheckpoint);
- when(thirdIterator.getShardId()).thenReturn("shard3");
- when(fourthIterator.getShardId()).thenReturn("shard4");
+ lenient().when(secondIterator.getCheckpoint()).thenReturn(secondCheckpoint);
+ lenient().when(thirdIterator.getShardId()).thenReturn("shard3");
+ lenient().when(fourthIterator.getShardId()).thenReturn("shard4");
WatermarkPolicy watermarkPolicy =
WatermarkPolicyFactory.withArrivalTimePolicy().createWatermarkPolicy();
@@ -89,14 +97,14 @@ public class ShardReadersPoolTest {
new ShardReadersPool(
kinesis, checkpoint, watermarkPolicyFactory, rateLimitPolicyFactory, 100));
- when(watermarkPolicyFactory.createWatermarkPolicy()).thenReturn(watermarkPolicy);
+ lenient().when(watermarkPolicyFactory.createWatermarkPolicy()).thenReturn(watermarkPolicy);
when(rateLimitPolicyFactory.getRateLimitPolicy()).thenReturn(rateLimitPolicy);
doReturn(firstIterator).when(shardReadersPool).createShardIterator(kinesis, firstCheckpoint);
doReturn(secondIterator).when(shardReadersPool).createShardIterator(kinesis, secondCheckpoint);
}
- @After
+ @AfterEach
public void clean() {
shardReadersPool.stop();
}
@@ -301,7 +309,7 @@ public class ShardReadersPoolTest {
verify(secondIterator, times(2)).getShardWatermark();
}
- @Test
+ @RepeatedTest(10000)
public void shouldCallRateLimitPolicy()
throws TransientKinesisException, KinesisShardClosedException, InterruptedException {
KinesisClientThrottledException e = new KinesisClientThrottledException("", null);
@@ -324,10 +332,20 @@ public class ShardReadersPoolTest {
}
}
+ ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+ verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+ List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+ assertThat(capturedRecords).contains(
+ ImmutableList.of(a, b),
+ singletonList(c),
+ singletonList(d),
+ Collections.emptyList()
+ );
+
verify(customRateLimitPolicy).onThrottle(same(e));
- verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
- verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
- verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
- verify(customRateLimitPolicy, atLeastOnce()).onSuccess(eq(Collections.emptyList()));
+// verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
+// verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
+// verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
+// verify(customRateLimitPolicy, atLeastOnce()).onSuccess(eq(Collections.emptyList()));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment