Skip to content

Instantly share code, notes, and snippets.

@ikhoon
Created February 5, 2021 07:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ikhoon/cfbf9cb3fe7a7f6e651ffde02f8707c2 to your computer and use it in GitHub Desktop.
Save ikhoon/cfbf9cb3fe7a7f6e651ffde02f8707c2 to your computer and use it in GitHub Desktop.
package example.armeria;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.reactivestreams.Publisher;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.stream.HttpDecoder;
import com.linecorp.armeria.common.stream.HttpDecoderInput;
import com.linecorp.armeria.common.stream.HttpDecoderOutput;
import com.linecorp.armeria.common.stream.StreamMessage;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.annotation.Post;
import com.linecorp.armeria.server.annotation.ProducesOctetStream;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.netty.buffer.ByteBuf;
import reactor.core.publisher.Flux;
class BidiStream {
@RegisterExtension
static ServerExtension server = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) throws Exception {
sb.annotatedService(new Object() {
@ProducesOctetStream
@Post("/bidi")
public Publisher<HttpData> bidi(HttpRequest req) {
final StreamMessage<String> decode = req.decode(new FixedLengthDecoder(7));
return Flux.from(decode)
.map(message -> HttpData.ofUtf8("Hello, " + message));
}
});
}
};
@Test
void bidiTest() throws InterruptedException {
final WebClient client = WebClient.of(server.httpUri());
final Flux<HttpData> source = Flux.range(1, 100)
.delayElements(Duration.ofSeconds(1))
.map(i -> HttpData.ofUtf8("Armeria"));
final HttpRequest request = HttpRequest.of(RequestHeaders.of(HttpMethod.POST, "/bidi"), source);
final HttpResponse response = client.execute(request);
Flux.from(response.decode(new FixedLengthDecoder(14)))
.subscribe(System.out::println);
// Wait for fully receiving a response stream
Thread.sleep(20000);
}
private static final class FixedLengthDecoder implements HttpDecoder<String> {
private final int length;
private final List<ByteBuf> byteBufs = new ArrayList<>();
private FixedLengthDecoder(int length) {
this.length = length;
}
@Override
public void process(HttpDecoderInput in, HttpDecoderOutput<String> out) {
int remaining = in.readableBytes();
if (remaining < length) {
return;
}
while (remaining >= length) {
final ByteBuf buf = in.readBytes(length);
out.add(buf.toString(StandardCharsets.UTF_8));
byteBufs.add(buf);
buf.release();
remaining -= length;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment