Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save mostroverkhov/8154bd41202b3850f3548569d85ed6b8 to your computer and use it in GitHub Desktop.
Save mostroverkhov/8154bd41202b3850f3548569d85ed6b8 to your computer and use it in GitHub Desktop.
rsocket-metadata-push-issue
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.rsocket.examples.transport.tcp.stream;
import io.rsocket.*;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.time.Duration;
public final class StreamingClient {
static final Logger logger = LoggerFactory.getLogger(StreamingClient.class);
public static void main(String[] args) {
RSocketFactory.receive()
.acceptor(new SocketAcceptorImpl())
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.map(Closeable::onClose)
.block()
.subscribe();
RSocket socket =
RSocketFactory.connect().acceptor(rSocket -> new AbstractRSocket() {
@Override
public Mono<Void> metadataPush(Payload payload) {
return Mono.fromRunnable(() -> logger.debug("Metadata received by client: "+payload.getDataUtf8()));
}
})
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.block();
socket.metadataPush(DefaultPayload.create("metadata"))
.then(Mono.delay(Duration.ofSeconds(5))).doFinally(__ -> socket.dispose())
.block();
}
private static class SocketAcceptorImpl implements SocketAcceptor {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {
return Mono.just(
new AbstractRSocket() {
@Override
public Mono<Void> metadataPush(Payload payload) {
reactiveSocket.metadataPush(DefaultPayload.create(
"Metadata sent by server " +payload.getDataUtf8())).subscribe();
return Mono.empty();
}
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment