Last active
October 17, 2019 16:44
-
-
Save anilgursel/301efbac56f1a6755956da6f63d8a0df to your computer and use it in GitHub Desktop.
A dummy code to understand `doOnDiscard` behavior
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.netty.buffer.ByteBuf; | |
import io.netty.handler.codec.http.HttpHeaderNames; | |
import io.netty.handler.codec.http.HttpMethod; | |
import org.junit.Assert; | |
import org.junit.BeforeClass; | |
import org.junit.Test; | |
import reactor.core.publisher.Mono; | |
import reactor.netty.DisposableServer; | |
import reactor.netty.http.HttpProtocol; | |
import reactor.netty.http.client.HttpClient; | |
import reactor.netty.http.server.HttpServer; | |
import reactor.netty.resources.ConnectionProvider; | |
import java.io.BufferedReader; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.InputStreamReader; | |
import java.nio.charset.StandardCharsets; | |
import java.time.Duration; | |
public class ResourceReleaseTest { | |
private static DisposableServer mockServer; | |
@BeforeClass | |
public static void setup() { | |
mockServer = HttpServer.create() | |
.host("localhost") | |
.route(routes -> routes | |
.get("/hello", (request, response) -> | |
response.addHeader(HttpHeaderNames.CONTENT_TYPE, "text/plain") | |
.sendString(Mono.just("Hello World!")) | |
) | |
) | |
.bindNow(); | |
} | |
@Test | |
public void testReleasingResources() throws InterruptedException { | |
final String result = | |
HttpClient.create(ConnectionProvider.fixed("dummy", 10)) | |
.protocol(HttpProtocol.HTTP11) | |
.keepAlive(true) | |
.request(HttpMethod.GET) | |
.uri("http://localhost:" + mockServer.port() + "/hello") | |
.responseSingle((response, bytes) -> bytes | |
.asInputStream() | |
.map(is -> new SomeClass(is)) | |
.flatMap(sc -> slowMethod(sc)) | |
.doOnDiscard(InputStream.class, is -> { | |
try { | |
is.close(); | |
} catch (IOException e) { | |
System.out.println("Not able to close InputStream. This may lead to direct memory leaks"); | |
} | |
}) | |
.doOnDiscard(ByteBuf.class, ByteBuf::release) | |
.doOnDiscard(SomeClass.class, SomeClass::close) | |
) | |
.timeout(Duration.ofMillis(500)) | |
.map(someClass -> convertToString(someClass)) | |
.doOnDiscard(InputStream.class, is -> { | |
try { | |
is.close(); | |
} catch (IOException e) { | |
System.out.println("Not able to close InputStream. This may lead to direct memory leaks"); | |
} | |
}) | |
.doOnDiscard(ByteBuf.class, ByteBuf::release) | |
.doOnDiscard(SomeClass.class, SomeClass::close) | |
.block(); | |
Assert.assertEquals("Hello World!", result); | |
} | |
public Mono<SomeClass> slowMethod(final SomeClass sc) { | |
return Mono.just(sc).delayElement(Duration.ofMillis(1_000)); | |
} | |
public class SomeClass { | |
private final InputStream is; | |
public SomeClass(final InputStream is) { | |
this.is = is; | |
} | |
public void close() { | |
try { | |
is.close(); | |
} catch(Exception e) { | |
// Swallow | |
System.out.println("Ouch!!"); | |
} | |
} | |
} | |
private String convertToString(final SomeClass sc) { | |
final StringBuilder stringBuilder = new StringBuilder(); | |
String line = null; | |
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(sc.is, StandardCharsets.UTF_8))) { | |
while ((line = bufferedReader.readLine()) != null) { | |
stringBuilder.append(line); | |
} | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
return stringBuilder.toString(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment