test programs for npppd
Last active
April 23, 2020 03:42
-
-
Save yasuoka/0609087b82dda64290b341d02acee196 to your computer and use it in GitHub Desktop.
npppd test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# | |
# | |
# | |
# | |
# | |
from scapy.all import * | |
from socket import AF_INET, SOCK_DGRAM, socket | |
import random | |
import select | |
import monotonic | |
import sys | |
def usage(): | |
print("usage: python %s [-v {0|1|2}][-n num_of_sessions][remote_addr]" \ | |
% sys.argv[0]) | |
def main(): | |
variant = 0 | |
ncalls = None | |
opts, args = getopt.getopt(sys.argv[1:], "v:n:") | |
for ch, optarg in opts: | |
if ch == '-v': | |
variant = int(optarg) | |
if not (variant in [0, 1, 2]): | |
print("-v variant must be 0, 1, or 2") | |
sys.exit(1) | |
elif ch == '-n': | |
ncalls = int(optarg) | |
else: | |
usage() | |
sys.exit(1) | |
if len(args) == 0: | |
addr = "127.0.0.1" | |
elif len(args) == 1: | |
addr = args[0] | |
else: | |
usage() | |
sys.exit(1) | |
sock = socket(AF_INET, SOCK_DGRAM, 0) | |
sock.bind(("0.0.0.0", 1701)) | |
lac = LAC(sock, addr) | |
if variant == 0: | |
pass | |
elif variant == 1: | |
lac.variant_send_before_recv = True | |
elif variant == 2: | |
lac.variant_send_before_recv = True | |
lac.ignore_peer_winsz = True | |
test = test_cisco(lac, ncalls=ncalls) | |
lac.SCCRQ() | |
while True: | |
rrdys,wrdys,erdys = select.select([sock], [], [], 0.2) | |
for rrdy in rrdys: | |
if rrdy is sock: | |
resp, peer = sock.recvfrom(4096) | |
l2tp = L2TP(resp) | |
if lac.tunnid != l2tp.tunnel_id: | |
# might be a packet for a previous tunnel | |
continue | |
if l2tp.ns != lac.rcv_nxt: | |
# this might indicate packet loss | |
# ZLB handling? | |
continue | |
lac.recv(l2tp) | |
#lac.recv0() | |
if seqcmp(lac.snd_nxt, lac.snd_una) > 0: | |
lac.resend() | |
result = test.periodic() | |
if not result: | |
break | |
test.show_result() | |
class test_cisco: | |
def __init__(self, lac, ncalls): | |
self.lac = lac | |
self.tearup_calls = False | |
self.teardown = False | |
self.teardowndone = False | |
self.start_time = monotonic.monotonic() | |
self.end_time = 0 | |
self.num_calls = ncalls if ncalls else 10 | |
def periodic(self): | |
# | |
# Phase 1: tearup 10 calls | |
# | |
if self.lac.established and not self.tearup_calls: | |
for i in range(self.num_calls): | |
call = self.lac.newCall() | |
self.lac.ICRQ(call) | |
self.tearup_calls = True | |
# | |
# Phase 2: wait the calls being established, then start teardown | |
# | |
if self.tearup_calls and not self.lac.txwin and not self.teardown: | |
call = reduce( | |
lambda a,b: | |
a if a and a.established and b and b.established else None, | |
self.lac.calls.values()) | |
if call: | |
#return True | |
for call in self.lac.calls.values(): | |
self.lac.CDN(call) | |
self.teardown = True | |
# | |
# Phase 3: wait completions of teardown | |
# | |
if not self.lac.txwin and self.teardown and not self.teardowndone: | |
call = reduce( | |
lambda a,b: | |
a if a and a.established and b and b.established else None, | |
self.lac.calls.values()) | |
if not call: | |
self.lac.StopCCN() | |
self.teardowndone = True | |
# | |
# Phase 4: wait the control is closed | |
# | |
if self.teardowndone and not self.lac.txwin: | |
self.end_time = monotonic.monotonic() | |
return False | |
return True | |
def show_result(self): | |
print( | |
"********\n" \ | |
"Tearup/Teardown %d calls takes %.2fsec\n" \ | |
"Received %d packets\n" | |
"Send %d packets (Resend %d)" | |
% (self.num_calls, self.end_time - self.start_time, | |
self.lac.recv_pkts, self.lac.send_pkts, self.lac.resend_pkts)) | |
def toshort(str, idx=0): | |
return ord(str[idx:idx + 1]) << 8 | ord(str[idx + 1:idx + 2]) | |
def tolong(str, idx=0): | |
return ord(str[idx:idx + 1]) << 24 | ord(str[idx:idx + 1]) << 16 | \ | |
ord(str[idx:idx + 1]) << 8 | ord(str[idx + 1:idx + 2]) | |
def long(val): | |
return chr((val >> 24) & 0xff) + chr((val >> 16) & 0xff) + \ | |
chr((val >> 8) & 0xff) + chr(val & 0xff) | |
def short(val): | |
return chr((val >> 8) & 0xff) + chr(val & 0xff) | |
def seqincl(val): | |
val = (val + 1) % 0xffff | |
return val | |
def seqcmp(a, b): | |
c = a - b | |
if c > 0x7fff: | |
c = c - 0x10000 | |
elif c < -0x7fff: | |
c = c + 0x10000 | |
return c | |
# Attribute Types | |
MessageType = 0 | |
ResultCode = 1 | |
ProtocolVersion = 2 | |
FramingCapabilities = 3 | |
BearerCapabilities = 4 | |
TieBreaker = 5 | |
FirmwareRevision = 6 | |
HostName = 7 | |
VendorName = 8 | |
AssignedTunnelId = 9 | |
ReceiveWindowSize = 10 | |
AssignedSessionId = 14 | |
CallSerialNumber = 15 | |
BearerType = 18 | |
FramingType = 19 | |
TxConnectSpeed = 24 | |
RxConnectSpeed = 38 | |
# Message Types | |
SCCRQ = 1 | |
SCCRP = 2 | |
SCCCN = 3 | |
StopCCN = 4 | |
HELLO = 6 | |
ICRQ = 10 | |
ICRP = 11 | |
ICCN = 12 | |
CDN = 14 | |
def AVP(atype, avalue, vendor=0, mandatory=True, hidden=False): | |
flag = 0 | |
if mandatory: | |
flag |= 0x80 | |
if hidden: | |
flag |= 0x40 | |
length = len(avalue) + 6 | |
return chr(flag) + chr(length) + short(vendor) + short(atype) + avalue | |
def ParseAVPs(msg, ret=dict()): | |
avp = {} | |
if msg is None or len(msg) == 0: | |
return [] | |
flags = ord(msg[0:1]) | |
length = toshort(msg, 0) & 0x3f | |
avp['mandatory'] = True if flags & 0x80 else False | |
avp['hidden'] = True if flags & 0x40 else False | |
avp['length'] = length | |
avp['vendor'] = toshort(msg, 2) | |
avp['value'] = msg[6:length] | |
ret[toshort(msg, 4)] = avp | |
if len(msg) > length: | |
return ParseAVPs(msg[length:], ret) | |
return ret | |
class Call: | |
def __init__(self, seq): | |
self.call_seq = seq | |
self.sessid = seq & 0xffff | |
self.peer_sessid = 0 | |
self.established = False | |
def msgstr(msg): | |
payload = bytes(msg.payload) | |
if len(payload) != 0: | |
avps = ParseAVPs(bytes(msg.payload)) | |
msgtype = toshort(avps[MessageType]['value']) | |
msgtypestr = \ | |
"SCCRQ" if msgtype == SCCRQ else \ | |
"SCCRP" if msgtype == SCCRP else \ | |
"SCCCN" if msgtype == SCCCN else \ | |
"StopCCN" if msgtype == StopCCN else \ | |
"HELLO" if msgtype == HELLO else \ | |
"ICRQ" if msgtype == ICRQ else \ | |
"ICRP" if msgtype == ICRP else \ | |
"ICCN" if msgtype == ICCN else \ | |
"CDN" if msgtype == CDN else \ | |
"Unknown(%s)" % msgtype | |
else: | |
msgtypestr = "ZLB" | |
return "%s [Ns=%d,Nr=%d]" % (msgtypestr, msg.ns, msg.nr) | |
class LAC: | |
def __init__(self, sock, addr, port=1701): | |
self.sock = sock | |
self.addr = addr | |
self.port = port | |
self.tunnid = random.randint(0, 0xffff) | |
self.callseq = 1 | |
self.peer_tunnid = 0 | |
self.snd_nxt = 0 | |
self.snd_una = 0 | |
self.snd_last = 0 | |
self.rcv_nxt = 0 | |
self.calls = dict() | |
self.established = False | |
self.rxwin = [] | |
self.txwin = [] | |
self.recvwinsiz = 1 | |
self.send_pkts = 0 | |
self.recv_pkts = 0 | |
self.resend_pkts = 0 | |
self.variant_send_before_recv = False | |
self.ignore_peer_winsz = False | |
def send(self, msg, call = None): | |
sessid = call.peer_sessid if call else 0 | |
zlb = False | |
if msg is None: | |
# send ZLB | |
msg = L2TP(hdr=0xc80, version=2, tunnel_id=self.peer_tunnid, | |
ns=self.snd_nxt, nr=self.rcv_nxt, len=12) | |
zlb = True | |
else: | |
msg = L2TP(hdr=0xc80, version=2, tunnel_id=self.peer_tunnid, | |
session_id = sessid, ns=self.snd_nxt, nr=self.rcv_nxt, | |
len=12 + len(msg))/msg | |
self.snd_nxt = seqincl(self.snd_nxt) | |
dif = seqcmp(msg.ns, self.snd_una) | |
if dif < self.recvwinsiz or self.ignore_peer_winsz: | |
print(" Send " + msgstr(msg)) | |
self.sock.sendto(bytes(msg), (self.addr, self.port)) | |
self.snd_last = msg.ns | |
self.lasttx = monotonic.monotonic() | |
self.send_pkts = self.send_pkts + 1 | |
if not zlb: | |
self.txwin.append(msg) | |
def resend(self): | |
currtime = monotonic.monotonic() | |
if currtime - self.lasttx < 2.0: | |
return | |
self.resend0(True) | |
def resend0(self, resend=False): | |
currtime = monotonic.monotonic() | |
for msg in self.txwin: | |
dif = seqcmp(msg.ns, self.snd_una) | |
if dif >= self.recvwinsiz: | |
break | |
if not resend and seqcmp(msg.ns, self.snd_last) <= 0: | |
continue | |
msg.nr = self.rcv_nxt | |
print(" Send " + msgstr(msg) + (" resend" if resend else "")) | |
self.sock.sendto(bytes(msg), (self.addr, self.port)) | |
self.snd_last = msg.ns | |
self.lasttx = currtime | |
self.send_pkts = self.send_pkts + 1 | |
if resend: | |
self.resend_pkts = self.resend_pkts + 1 | |
def SCCRQ(self): | |
self.send(AVP(MessageType, short(SCCRQ)) + | |
AVP(ProtocolVersion, short(0x0100)) + | |
AVP(FirmwareRevision, short(0x4400)) + | |
AVP(HostName, "cisco") + | |
AVP(VendorName, "Cisco Systems, Inc.") + | |
AVP(FramingCapabilities, long(0x0)) + | |
AVP(BearerCapabilities, long(0x0)) + | |
AVP(AssignedTunnelId, short(self.tunnid)) + | |
AVP(ReceiveWindowSize, short(20050))) | |
def SCCCN(self): | |
self.send(AVP(MessageType, short(SCCCN))) | |
def StopCCN(self): | |
self.send(AVP(MessageType, short(StopCCN)) + | |
AVP(AssignedTunnelId, short(self.tunnid)) + | |
AVP(ResultCode, "\x00\x01\x00\x00OK Test done")) | |
def newCall(self): | |
call = Call(self.callseq) | |
self.callseq = self.callseq + 1 | |
self.calls[call.sessid] = call | |
return call | |
def ICRQ(self, call): | |
self.send( | |
AVP(MessageType, short(ICRQ)) + | |
AVP(CallSerialNumber, long(call.call_seq)) + | |
AVP(AssignedSessionId, short(call.sessid))) | |
def ICCN(self, call): | |
self.send( | |
AVP(MessageType, short(ICCN)) + | |
AVP(TxConnectSpeed, long(1000000000)) + | |
AVP(RxConnectSpeed, long(1000000000)) + | |
AVP(FramingType, long(0x1)), call=call) | |
def CDN(self, call): | |
self.send( | |
AVP(MessageType, short(CDN)) + | |
AVP(ResultCode, "\x00\x03\x00\x00OK Test done") + | |
AVP(AssignedSessionId, short(call.sessid)), call) | |
call.established = False | |
def recv(self, msg): | |
print(" Recv " + msgstr(msg)) | |
self.recv_pkts = self.recv_pkts + 1 | |
if seqcmp(self.snd_una, msg.nr) < 0: | |
self.snd_una = msg.nr | |
if self.txwin: | |
# cleanup tx windows by updated Nr | |
idx = 0 | |
for idx in range(0, len(self.txwin)): | |
if seqcmp(self.txwin[idx].ns, self.snd_una) >= 0: | |
break | |
if idx == 0: | |
del self.txwin[:] | |
elif idx > 0: | |
self.txwin = self.txwin[idx:] | |
# send here (without upating rcv_nxt) | |
if self.variant_send_before_recv: | |
self.resend0() | |
if len(bytes(msg.payload)) == 0: | |
if not self.variant_send_before_recv: | |
self.resend0() | |
return | |
if self.rcv_nxt == msg.ns: | |
self.rcv_nxt = seqincl(msg.ns) | |
avps = ParseAVPs(bytes(msg.payload)) | |
msgtype = toshort(avps[MessageType]['value']) | |
if msgtype == SCCRP: | |
self.peer_tunnid = toshort(avps[AssignedTunnelId]['value']) | |
self.recvwinsiz = toshort(avps[ReceiveWindowSize]['value']) | |
self.established = True | |
self.SCCCN() | |
print("Control #%d established peer=%d" % ( | |
self.tunnid, self.peer_tunnid)) | |
elif msgtype == ICRP: | |
if msg.session_id in self.calls.keys(): | |
call = self.calls[msg.session_id] | |
call.peer_sessid = toshort(avps[AssignedSessionId]['value']) | |
call.established = True | |
self.ICCN(call) | |
print("Call #%d established peer=%d" % ( | |
call.sessid, call.peer_sessid)) | |
elif msgtype == HELLO: | |
self.send(None) | |
elif msgtype == CDN: | |
self.send(None) | |
sessid = toshort(avps[AssignedSessionId]['value']) | |
for idx in self.calls.keys(): | |
call = self.calls[idx] | |
if call.peer_sessid == sessid: | |
call.established = False | |
break | |
if not self.variant_send_before_recv: | |
self.resend0() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment