Last active
December 4, 2021 16:15
-
-
Save Doridian/3dbad472604f1d661528ff980c400abe to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from scapy.all import * | |
import json | |
Ether.payload_guess = [({"type": 0x800}, IP)] | |
IP.payload_guess = [({"frag": 0, "proto": 0x06}, TCP)] | |
TCP.payload_guess = [({"dport": 6379}, Raw), ({"sport": 6379}, Raw)] | |
INSTANT_READY = True | |
def sniffOpen(): | |
return PcapReader("big.pcap") | |
SYNFLAG = long(1 << 1) | |
RSTFLAG = long(1 << 2) | |
FINFLAG = long(1 << 0) | |
ANYRSTFLAG = RSTFLAG | FINFLAG | |
lastLinkIndex = 0 | |
knownLinks = {} | |
knownLinkIDs = [] | |
LinkStateWAIT = 0 | |
LinkStateREADY = 1 | |
LinkStateIGNORE = 2 | |
subscribeCommands = { | |
'SUBSCRIBE': True, | |
'UNSUBSCRIBE': True, | |
'PSUBSCRIBE': True, | |
'PUNSUBSCRIBE': True, | |
'MONITOR': True | |
} | |
commandAggregate = {} | |
def filterHash(raw): | |
data = raw.split(':') | |
res = [] | |
for i in data: | |
num = False | |
if len(i) > 0: | |
num = True | |
for c in i: | |
if c < '0' or c > '9': | |
num = False | |
break | |
if num: | |
# Numerical ID | |
i = '{ID}' | |
else: | |
# UUID | |
i = re.sub(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', '{UUID}', i) | |
res.append(i) | |
if len(res) == 4: | |
if res[0] == 'chat' and res[1] == '{ID}' and res[2] == 'authkey': | |
res[3] = '{RAND}' | |
elif res[0] == 'xbl' and res[1] == 'rating': | |
res[2] = '{GID}' | |
res[3] = '{LANG}' | |
return ':'.join(res) | |
class HashAggregate(): | |
def __init__(self): | |
self.count = 0 | |
self.avgLatency = 0 | |
self.errors = 0 | |
self.successes = 0 | |
def addCount(self, latency, success): | |
if success: | |
self.successes += 1 | |
else: | |
self.errors += 1 | |
self.count += 1 | |
if self.count > 1: | |
self.avgLatency *= (self.count - 1) / self.count | |
self.avgLatency += latency / self.count | |
class CmdAggregate(): | |
def __init__(self): | |
self.count = 0 | |
self.avgLatency = 0 | |
self.errors = 0 | |
self.successes = 0 | |
self.hashes = {} | |
def addCount(self, latency, success): | |
if success: | |
self.successes += 1 | |
else: | |
self.errors += 1 | |
self.count += 1 | |
if self.count > 1: | |
self.avgLatency *= (self.count - 1) / self.count | |
self.avgLatency += latency / self.count | |
def sendAggregate(cmd, res, latency): | |
if len(cmd) < 2: | |
return | |
cmdName = cmd[0].upper() | |
if cmdName in subscribeCommands: | |
return | |
if re.match(r'[^A-Za-z0-9+ \.,;:_\-]', cmd[1]): | |
return | |
cmdHash = filterHash(cmd[1]) | |
if not cmdHash: | |
return | |
isOK = not (res is Exception) | |
def _sendAggregate(cmdName): | |
global commandAggregate | |
if not cmdName in commandAggregate: | |
commandAggregate[cmdName] = CmdAggregate() | |
cmdAggregate = commandAggregate[cmdName] | |
cmdAggregate.addCount(latency, isOK) | |
if not cmdHash in cmdAggregate.hashes: | |
cmdAggregate.hashes[cmdHash] = HashAggregate() | |
cmdAggregate.hashes[cmdHash].addCount(latency, isOK) | |
_sendAggregate(cmdName) | |
_sendAggregate('ANY') | |
class REDPDecoder(): | |
def __init__(self, data): | |
self.data = data | |
self.oData = data | |
def decode(self): | |
return self._decode(None, 0) | |
def _decode(self, results, maxlen): | |
while len(self.data) > 0: | |
indicator = self.data[0] | |
if indicator == '+': | |
res = self._untilNewline() | |
elif indicator == '-': | |
res = Exception(self._untilNewline()) | |
elif indicator == ':': | |
res = self._readInt() | |
elif indicator == '$': | |
strlen = self._readInt() | |
if strlen < 0: | |
res = None | |
else: | |
if strlen == 0: | |
res = '' | |
else: | |
res = self.data[:strlen] | |
if self.data[strlen: strlen + 2] != '\r\n': | |
raise Exception("$ string does not end in \r\n. Skipping") | |
self.data = self.data[strlen + 2:] | |
elif indicator == '*': | |
arrayLen = self._readInt() | |
if arrayLen < 0: | |
res = None | |
else: | |
if arrayLen == 0: | |
res = [] | |
else: | |
res = self._decode([], arrayLen) | |
else: | |
self._untilNewline() | |
raise Exception("Unknown indicator " + indicator) | |
if results == None: | |
return res | |
results.append(res) | |
if len(results) >= maxlen: | |
return results | |
raise Exception("Decoder error: Reached end of stream") | |
def _untilNewline(self): | |
i = self.data.index('\r\n') | |
res = self.data[1:i] | |
self.data = self.data[i + 2:] | |
return res | |
def _readInt(self): | |
return int(self._untilNewline(), 10) | |
class Link: | |
def __init__(self, state, lastSide): | |
global lastLinkIndex | |
self.index = lastLinkIndex | |
lastLinkIndex += 1 | |
self.state = state | |
self.lastSide = lastSide | |
self.lastSideSwap = -1 | |
self.lastPacketTime = -1 | |
self.buffer = '' | |
self.lastCommand = None | |
self.lastReply = None | |
self.lastCommandTime = 0 | |
self.lastReplyTime = 0 | |
self.isSubscriber = False | |
self.firstReplyTime = -1 | |
def setIgnore(self): | |
self.state = LinkStateIGNORE | |
self.buffer = None | |
self.lastCommand = None | |
self.lastReply = None | |
def decodeBufferSafe(self): | |
try: | |
res = self.decodeBuffer() | |
except Exception as e: | |
return False | |
if self.lastSide == False and self.lastCommand != None: | |
sendAggregate(self.lastCommand, self.lastReply, self.lastReplyTime - self.lastCommandTime) | |
self.lastCommand = None | |
self.lastReply = None | |
return True | |
def decodeBuffer(self): | |
if self.isSubscriber and self.lastSide == False: | |
self.buffer = '' | |
raise Exception("No reading subscriber connections") | |
if len(self.buffer) <= 0: | |
raise Exception("Empty buffer") | |
dec = REDPDecoder(self.buffer) | |
res = dec.decode() | |
self.buffer = dec.data | |
if self.lastSide == True: | |
self.lastCommand = res | |
#self.isSubscriber = (res[1].upper() in subscribeCommands) | |
self.lastCommandTime = self.lastSideSwap | |
else: | |
self.lastReply = res | |
self.lastReplyTime = self.lastPacketTime | |
if self.firstReplyTime < 0: | |
self.firstReplyTime = self.lastPacketTime | |
elif self.lastPacketTime - self.firstReplyTime > 5: | |
self.isSubscriber = True | |
return res | |
for packet in sniffOpen(): | |
ipPacket = packet["IP"] | |
srcip = ipPacket.src | |
dstip = ipPacket.dst | |
tcpPacket = packet["TCP"] | |
srcport = tcpPacket.sport | |
dstport = tcpPacket.dport | |
if srcport != 6379 and dstport != 6379: | |
continue | |
incoming = dstport == 6379 | |
if incoming: | |
linkID = "%s_%s_%d_%d" % (srcip, dstip, srcport, dstport) | |
else: | |
linkID = "%s_%s_%d_%d" % (dstip, srcip, dstport, srcport) | |
tcpFlags = tcpPacket.flags | |
if tcpFlags & SYNFLAG != 0: | |
knownLinks[linkID] = Link(LinkStateREADY, incoming) | |
knownLinks[linkID].lastSideSwap = packet.time | |
knownLinkIDs.append(linkID) | |
if tcpFlags & ANYRSTFLAG != 0: | |
if linkID in knownLinks: | |
thisLink = knownLinks[linkID] | |
if thisLink.state == LinkStateREADY: | |
thisLink.decodeBufferSafe() | |
knownLinkIDs.remove(linkID) | |
del knownLinks[linkID] | |
if not "Raw" in packet: | |
continue | |
payload = packet["Raw"].load | |
if not payload or len(payload) < 1: | |
continue | |
if linkID not in knownLinks: | |
if INSTANT_READY: | |
knownLinks[linkID] = Link(LinkStateREADY, incoming) | |
knownLinkIDs.append(linkID) | |
else: | |
knownLinks[linkID] = Link(LinkStateWAIT, incoming) | |
continue | |
thisLink = knownLinks[linkID] | |
if thisLink.state == LinkStateWAIT: | |
if thisLink.lastSide != incoming: | |
thisLink.state = LinkStateREADY | |
thisLink.lastSide = incoming | |
knownLinkIDs.append(linkID) | |
else: | |
continue | |
if thisLink.state != LinkStateREADY: | |
continue | |
thisLink.lastPacketTime = packet.time | |
if thisLink.lastSideSwap < 0: | |
thisLink.lastSideSwap = packet.time | |
if incoming != thisLink.lastSide: | |
thisLink.decodeBufferSafe() | |
thisLink.buffer = payload | |
thisLink.lastSide = incoming | |
thisLink.lastSideSwap = packet.time | |
thisLink.decodeBufferSafe() | |
else: | |
thisLink.buffer += payload | |
thisLink.decodeBufferSafe() | |
class MyEncoder(json.JSONEncoder): | |
def default(self, o): | |
return o.__dict__ | |
print(json.dumps(commandAggregate, cls=MyEncoder, ensure_ascii=False)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment