Skip to content

Instantly share code, notes, and snippets.

@HeikoBornholdt
Last active March 30, 2022 21:53
Show Gist options
  • Save HeikoBornholdt/db947385bdc1677172554f26bb1a58fa to your computer and use it in GitHub Desktop.
Save HeikoBornholdt/db947385bdc1677172554f26bb1a58fa to your computer and use it in GitHub Desktop.
package org.drasyl.node;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.timeout.WriteTimeoutException;
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.drasyl.channel.DrasylChannel;
import org.drasyl.handler.arq.stopandwait.ByteToStopAndWaitArqDataCodec;
import org.drasyl.handler.arq.stopandwait.StopAndWaitArqCodec;
import org.drasyl.handler.arq.stopandwait.StopAndWaitArqHandler;
import org.drasyl.node.channel.DrasylNodeChannelInitializer;
import org.drasyl.node.event.Event;
/**
* {@link DrasylNode} implementation that utilizes the <a href="https://en.wikipedia.org/wiki/Stop-and-wait_ARQ">Stop-and-wait
* ARQ</a> protocol. In this special case the semantics of the {@link
* java.util.concurrent.CompletionStage} returned by {@link DrasylNode#send(String, Object)} change
* as follows: The contained future will now only complete if message arrival has been confirmed by
* the recipient.
* <p>
* Be aware: Stop-and-wait-ARQ protocol introduces some state on both peers, as it uses an
* alternating sequence no. This means, that when one peer is restarted, there is a chance that the
* "next expected sequence no" is out-of-sync. The most effective (and least performant) solution is
* to send a NO-OP message first, every time you want to send a message.
*/
public class NodeWithAutomaticRepeatRequest extends DrasylNode {
public NodeWithAutomaticRepeatRequest() throws DrasylException {
final DrasylConfig config = DrasylConfig.of();
bootstrap.childHandler(new DrasylNodeChannelInitializer(config, this) {
@Override
protected void firstStage(final DrasylChannel ch) {
super.firstStage(ch);
final ChannelPipeline p = ch.pipeline();
p.addLast(new StopAndWaitArqCodec());
p.addLast(new StopAndWaitArqHandler(100)); // re-send message every 100ms
p.addLast(new ByteToStopAndWaitArqDataCodec());
p.addLast(new WriteTimeoutHandler(10)); // give up after 10 seconds
p.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(final ChannelHandlerContext ctx,
final Throwable cause) {
if (cause instanceof WriteTimeoutException) {
System.out.println("message arrival was not confirmed by recipient");
}
else {
// other exception -> passthrough
ctx.fireExceptionCaught(cause);
}
}
});
}
});
}
@Override
public void onEvent(final Event event) {
System.out.println("NodeWithAutomaticRepeatRequest.onEvent");
System.out.println("event = " + event);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment