Skip to content

Instantly share code, notes, and snippets.

@anilgursel
Last active October 17, 2019 16:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anilgursel/301efbac56f1a6755956da6f63d8a0df to your computer and use it in GitHub Desktop.
Save anilgursel/301efbac56f1a6755956da6f63d8a0df to your computer and use it in GitHub Desktop.
A dummy code to understand `doOnDiscard` behavior
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.server.HttpServer;
import reactor.netty.resources.ConnectionProvider;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
public class ResourceReleaseTest {
private static DisposableServer mockServer;
@BeforeClass
public static void setup() {
mockServer = HttpServer.create()
.host("localhost")
.route(routes -> routes
.get("/hello", (request, response) ->
response.addHeader(HttpHeaderNames.CONTENT_TYPE, "text/plain")
.sendString(Mono.just("Hello World!"))
)
)
.bindNow();
}
@Test
public void testReleasingResources() throws InterruptedException {
final String result =
HttpClient.create(ConnectionProvider.fixed("dummy", 10))
.protocol(HttpProtocol.HTTP11)
.keepAlive(true)
.request(HttpMethod.GET)
.uri("http://localhost:" + mockServer.port() + "/hello")
.responseSingle((response, bytes) -> bytes
.asInputStream()
.map(is -> new SomeClass(is))
.flatMap(sc -> slowMethod(sc))
.doOnDiscard(InputStream.class, is -> {
try {
is.close();
} catch (IOException e) {
System.out.println("Not able to close InputStream. This may lead to direct memory leaks");
}
})
.doOnDiscard(ByteBuf.class, ByteBuf::release)
.doOnDiscard(SomeClass.class, SomeClass::close)
)
.timeout(Duration.ofMillis(500))
.map(someClass -> convertToString(someClass))
.doOnDiscard(InputStream.class, is -> {
try {
is.close();
} catch (IOException e) {
System.out.println("Not able to close InputStream. This may lead to direct memory leaks");
}
})
.doOnDiscard(ByteBuf.class, ByteBuf::release)
.doOnDiscard(SomeClass.class, SomeClass::close)
.block();
Assert.assertEquals("Hello World!", result);
}
public Mono<SomeClass> slowMethod(final SomeClass sc) {
return Mono.just(sc).delayElement(Duration.ofMillis(1_000));
}
public class SomeClass {
private final InputStream is;
public SomeClass(final InputStream is) {
this.is = is;
}
public void close() {
try {
is.close();
} catch(Exception e) {
// Swallow
System.out.println("Ouch!!");
}
}
}
private String convertToString(final SomeClass sc) {
final StringBuilder stringBuilder = new StringBuilder();
String line = null;
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(sc.is, StandardCharsets.UTF_8))) {
while ((line = bufferedReader.readLine()) != null) {
stringBuilder.append(line);
}
} catch (IOException e) {
e.printStackTrace();
}
return stringBuilder.toString();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment