Skip to content

Instantly share code, notes, and snippets.

@wkgcass
Created December 3, 2019 17:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wkgcass/9fb70d8c4f5312ef7727ec7542f982fa to your computer and use it in GitHub Desktop.
Save wkgcass/9fb70d8c4f5312ef7727ec7542f982fa to your computer and use it in GitHub Desktop.
Fix using kcp with rsock (java impl from kcp-netty)
// base code: Copyright (c) 2017 Zheng Sun, from project kcp-netty
// added part: Copyright (c) 2019 wkgcass
public int input(ByteBuf data) {
long oldSndUna = sndUna;
long maxack = 0;
boolean flag = false;
if (log.isDebugEnabled()) {
log.debug("{} [RI] {} bytes", this, data.readableBytes());
}
if (data == null || data.readableBytes() < IKCP_OVERHEAD) {
return -1;
}
while (true) {
int conv, len, wnd;
long ts, sn, una;
byte cmd;
short frg;
Segment seg;
if (data.readableBytes() < IKCP_OVERHEAD) {
break;
}
conv = data.readIntLE();
if (conv != this.conv && !(this.conv == 0 && autoSetConv)) {
// ==================START
// NOTE: this code slice is not part of the standard kcp impl
// NOTE: there's a udp-to-tcp wrapper impl called rsock
// NOTE: and it might add extra data into the udp packet
// NOTE: we should remove those packets
// NOTE: the packet looks like: (BE)
// f268b10bd0083eed1700d8095cfb738a4008070000004fec855e0000000000
// f268b10bd0083eed170089aac51cc4d6d8a510000000a5e9895e0000000000
// f268b10bd0083eed1700b6bc82180c30f1290101000019e719370000000000
// f268b10bd0083eed1700b6bc82180c30f129f200000083e517370000000000
// f268b10bd0083eed1700b6bc82180c30f129e400000045e318370000000000
// f268b10bd0083eed1700b6bc82180c30f129d9000000b6db1a370000000000
// so the first 10 bytes are the same, and last 5 bytes are 0, total 31 bytes
Logger.error(LogType.INVALID_EXTERNAL_DATA, "invalid kcp conv, try to recover");
// first 4 bytes: (conv is LE, so make this also LE)
if (conv != ((0xf2) | (0x68 << 8) | (0xb1 << 16) | (0x0b << 24))) {
Logger.error(LogType.INVALID_EXTERNAL_DATA, "conv = " + conv + " != 0xf268b10b");
return -4;
}
// length
if (data.readableBytes() < (31 - 4/*4 bytes already read*/)) {
Logger.error(LogType.INVALID_EXTERNAL_DATA, "readable bytes = " + data.readableBytes() + " < " + (31 - 4));
return -4;
}
// 6 bytes after first 4:
{
byte a = data.readByte();
byte b = data.readByte();
byte c = data.readByte();
byte d = data.readByte();
byte e = data.readByte();
byte f = data.readByte();
if (a != ((byte) 0xd0) || b != ((byte) 0x08) || c != ((byte) 0x3e) || d != ((byte) 0xed) || e != ((byte) 0x17) || f != ((byte) 0x00)) {
Logger.error(LogType.INVALID_EXTERNAL_DATA, "6 bytes are " + a + "," + b + "," + c + "," + d + "," + e + "," + f + ", not 0xd0083eed1700");
return -4;
}
}
// last 5 bytes
{
data.skipBytes(31 - 4 - 6 - 5);
byte a = data.readByte();
byte b = data.readByte();
byte c = data.readByte();
byte d = data.readByte();
byte e = data.readByte();
if (a != 0 || b != 0 || c != 0 || d != 0 || e != 0) {
Logger.error(LogType.INVALID_EXTERNAL_DATA, "last 5 bytes are " + a + "," + b + "," + c + "," + d + "," + e + " not all 0");
return -4;
}
}
// return -4;
Logger.info(LogType.INVALID_EXTERNAL_DATA, "recognized rsock magic packet");
continue;
// ==================END
}
cmd = data.readByte();
frg = data.readUnsignedByte();
wnd = data.readUnsignedShortLE();
ts = data.readUnsignedIntLE();
sn = data.readUnsignedIntLE();
una = data.readUnsignedIntLE();
len = data.readIntLE();
if (data.readableBytes() < len || len < 0) {
return -2;
}
if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK && cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS) {
return -3;
}
if (this.conv == 0 && autoSetConv) { // automatically set conv
this.conv = conv;
}
this.rmtWnd = wnd;
parseUna(una);
shrinkBuf();
boolean readed = false;
long uintCurrent = long2Uint(current);
switch (cmd) {
case IKCP_CMD_ACK: {
int rtt = itimediff(uintCurrent, ts);
if (rtt >= 0) {
updateAck(rtt);
}
parseAck(sn);
shrinkBuf();
if (!flag) {
flag = true;
maxack = sn;
} else {
if (itimediff(sn, maxack) > 0) {
maxack = sn;
}
}
if (log.isDebugEnabled()) {
log.debug("{} input ack: sn={}, rtt={}, rto={}", this, sn, rtt, rxRto);
}
break;
}
case IKCP_CMD_PUSH: {
if (itimediff(sn, rcvNxt + rcvWnd) < 0) {
ackPush(sn, ts);
if (itimediff(sn, rcvNxt) >= 0) {
if (len > 0) {
seg = Segment.createSegment(data.readRetainedSlice(len));
readed = true;
} else {
seg = Segment.createSegment(byteBufAllocator, 0);
}
seg.conv = conv;
seg.cmd = cmd;
seg.frg = frg;
seg.wnd = wnd;
seg.ts = ts;
seg.sn = sn;
seg.una = una;
parseData(seg);
}
}
if (log.isDebugEnabled()) {
log.debug("{} input push: sn={}, una={}, ts={}", this, sn, una, ts);
}
break;
}
case IKCP_CMD_WASK: {
// ready to send back IKCP_CMD_WINS in ikcp_flush
// tell remote my window size
probe |= IKCP_ASK_TELL;
if (log.isDebugEnabled()) {
log.debug("{} input ask", this);
}
break;
}
case IKCP_CMD_WINS: {
// do nothing
if (log.isDebugEnabled()) {
log.debug("{} input tell: {}", this, wnd);
}
break;
}
default:
return -3;
}
if (!readed) {
data.skipBytes(len);
}
}
if (flag) {
parseFastack(maxack);
}
if (itimediff(sndUna, oldSndUna) > 0) {
if (cwnd < rmtWnd) {
int mss = this.mss;
if (cwnd < ssthresh) {
cwnd++;
incr += mss;
} else {
if (incr < mss) {
incr = mss;
}
incr += (mss * mss) / incr + (mss / 16);
if ((cwnd + 1) * mss <= incr) {
cwnd++;
}
}
if (cwnd > rmtWnd) {
cwnd = rmtWnd;
incr = rmtWnd * mss;
}
}
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment