Skip to content

Instantly share code, notes, and snippets.

@Doridian
Last active December 4, 2021 16:15
Show Gist options
  • Save Doridian/3dbad472604f1d661528ff980c400abe to your computer and use it in GitHub Desktop.
Save Doridian/3dbad472604f1d661528ff980c400abe to your computer and use it in GitHub Desktop.
from scapy.all import *
import json
Ether.payload_guess = [({"type": 0x800}, IP)]
IP.payload_guess = [({"frag": 0, "proto": 0x06}, TCP)]
TCP.payload_guess = [({"dport": 6379}, Raw), ({"sport": 6379}, Raw)]
INSTANT_READY = True
def sniffOpen():
return PcapReader("big.pcap")
SYNFLAG = long(1 << 1)
RSTFLAG = long(1 << 2)
FINFLAG = long(1 << 0)
ANYRSTFLAG = RSTFLAG | FINFLAG
lastLinkIndex = 0
knownLinks = {}
knownLinkIDs = []
LinkStateWAIT = 0
LinkStateREADY = 1
LinkStateIGNORE = 2
subscribeCommands = {
'SUBSCRIBE': True,
'UNSUBSCRIBE': True,
'PSUBSCRIBE': True,
'PUNSUBSCRIBE': True,
'MONITOR': True
}
commandAggregate = {}
def filterHash(raw):
data = raw.split(':')
res = []
for i in data:
num = False
if len(i) > 0:
num = True
for c in i:
if c < '0' or c > '9':
num = False
break
if num:
# Numerical ID
i = '{ID}'
else:
# UUID
i = re.sub(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', '{UUID}', i)
res.append(i)
if len(res) == 4:
if res[0] == 'chat' and res[1] == '{ID}' and res[2] == 'authkey':
res[3] = '{RAND}'
elif res[0] == 'xbl' and res[1] == 'rating':
res[2] = '{GID}'
res[3] = '{LANG}'
return ':'.join(res)
class HashAggregate():
def __init__(self):
self.count = 0
self.avgLatency = 0
self.errors = 0
self.successes = 0
def addCount(self, latency, success):
if success:
self.successes += 1
else:
self.errors += 1
self.count += 1
if self.count > 1:
self.avgLatency *= (self.count - 1) / self.count
self.avgLatency += latency / self.count
class CmdAggregate():
def __init__(self):
self.count = 0
self.avgLatency = 0
self.errors = 0
self.successes = 0
self.hashes = {}
def addCount(self, latency, success):
if success:
self.successes += 1
else:
self.errors += 1
self.count += 1
if self.count > 1:
self.avgLatency *= (self.count - 1) / self.count
self.avgLatency += latency / self.count
def sendAggregate(cmd, res, latency):
if len(cmd) < 2:
return
cmdName = cmd[0].upper()
if cmdName in subscribeCommands:
return
if re.match(r'[^A-Za-z0-9+ \.,;:_\-]', cmd[1]):
return
cmdHash = filterHash(cmd[1])
if not cmdHash:
return
isOK = not (res is Exception)
def _sendAggregate(cmdName):
global commandAggregate
if not cmdName in commandAggregate:
commandAggregate[cmdName] = CmdAggregate()
cmdAggregate = commandAggregate[cmdName]
cmdAggregate.addCount(latency, isOK)
if not cmdHash in cmdAggregate.hashes:
cmdAggregate.hashes[cmdHash] = HashAggregate()
cmdAggregate.hashes[cmdHash].addCount(latency, isOK)
_sendAggregate(cmdName)
_sendAggregate('ANY')
class REDPDecoder():
def __init__(self, data):
self.data = data
self.oData = data
def decode(self):
return self._decode(None, 0)
def _decode(self, results, maxlen):
while len(self.data) > 0:
indicator = self.data[0]
if indicator == '+':
res = self._untilNewline()
elif indicator == '-':
res = Exception(self._untilNewline())
elif indicator == ':':
res = self._readInt()
elif indicator == '$':
strlen = self._readInt()
if strlen < 0:
res = None
else:
if strlen == 0:
res = ''
else:
res = self.data[:strlen]
if self.data[strlen: strlen + 2] != '\r\n':
raise Exception("$ string does not end in \r\n. Skipping")
self.data = self.data[strlen + 2:]
elif indicator == '*':
arrayLen = self._readInt()
if arrayLen < 0:
res = None
else:
if arrayLen == 0:
res = []
else:
res = self._decode([], arrayLen)
else:
self._untilNewline()
raise Exception("Unknown indicator " + indicator)
if results == None:
return res
results.append(res)
if len(results) >= maxlen:
return results
raise Exception("Decoder error: Reached end of stream")
def _untilNewline(self):
i = self.data.index('\r\n')
res = self.data[1:i]
self.data = self.data[i + 2:]
return res
def _readInt(self):
return int(self._untilNewline(), 10)
class Link:
def __init__(self, state, lastSide):
global lastLinkIndex
self.index = lastLinkIndex
lastLinkIndex += 1
self.state = state
self.lastSide = lastSide
self.lastSideSwap = -1
self.lastPacketTime = -1
self.buffer = ''
self.lastCommand = None
self.lastReply = None
self.lastCommandTime = 0
self.lastReplyTime = 0
self.isSubscriber = False
self.firstReplyTime = -1
def setIgnore(self):
self.state = LinkStateIGNORE
self.buffer = None
self.lastCommand = None
self.lastReply = None
def decodeBufferSafe(self):
try:
res = self.decodeBuffer()
except Exception as e:
return False
if self.lastSide == False and self.lastCommand != None:
sendAggregate(self.lastCommand, self.lastReply, self.lastReplyTime - self.lastCommandTime)
self.lastCommand = None
self.lastReply = None
return True
def decodeBuffer(self):
if self.isSubscriber and self.lastSide == False:
self.buffer = ''
raise Exception("No reading subscriber connections")
if len(self.buffer) <= 0:
raise Exception("Empty buffer")
dec = REDPDecoder(self.buffer)
res = dec.decode()
self.buffer = dec.data
if self.lastSide == True:
self.lastCommand = res
#self.isSubscriber = (res[1].upper() in subscribeCommands)
self.lastCommandTime = self.lastSideSwap
else:
self.lastReply = res
self.lastReplyTime = self.lastPacketTime
if self.firstReplyTime < 0:
self.firstReplyTime = self.lastPacketTime
elif self.lastPacketTime - self.firstReplyTime > 5:
self.isSubscriber = True
return res
for packet in sniffOpen():
ipPacket = packet["IP"]
srcip = ipPacket.src
dstip = ipPacket.dst
tcpPacket = packet["TCP"]
srcport = tcpPacket.sport
dstport = tcpPacket.dport
if srcport != 6379 and dstport != 6379:
continue
incoming = dstport == 6379
if incoming:
linkID = "%s_%s_%d_%d" % (srcip, dstip, srcport, dstport)
else:
linkID = "%s_%s_%d_%d" % (dstip, srcip, dstport, srcport)
tcpFlags = tcpPacket.flags
if tcpFlags & SYNFLAG != 0:
knownLinks[linkID] = Link(LinkStateREADY, incoming)
knownLinks[linkID].lastSideSwap = packet.time
knownLinkIDs.append(linkID)
if tcpFlags & ANYRSTFLAG != 0:
if linkID in knownLinks:
thisLink = knownLinks[linkID]
if thisLink.state == LinkStateREADY:
thisLink.decodeBufferSafe()
knownLinkIDs.remove(linkID)
del knownLinks[linkID]
if not "Raw" in packet:
continue
payload = packet["Raw"].load
if not payload or len(payload) < 1:
continue
if linkID not in knownLinks:
if INSTANT_READY:
knownLinks[linkID] = Link(LinkStateREADY, incoming)
knownLinkIDs.append(linkID)
else:
knownLinks[linkID] = Link(LinkStateWAIT, incoming)
continue
thisLink = knownLinks[linkID]
if thisLink.state == LinkStateWAIT:
if thisLink.lastSide != incoming:
thisLink.state = LinkStateREADY
thisLink.lastSide = incoming
knownLinkIDs.append(linkID)
else:
continue
if thisLink.state != LinkStateREADY:
continue
thisLink.lastPacketTime = packet.time
if thisLink.lastSideSwap < 0:
thisLink.lastSideSwap = packet.time
if incoming != thisLink.lastSide:
thisLink.decodeBufferSafe()
thisLink.buffer = payload
thisLink.lastSide = incoming
thisLink.lastSideSwap = packet.time
thisLink.decodeBufferSafe()
else:
thisLink.buffer += payload
thisLink.decodeBufferSafe()
class MyEncoder(json.JSONEncoder):
def default(self, o):
return o.__dict__
print(json.dumps(commandAggregate, cls=MyEncoder, ensure_ascii=False))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment