Skip to content

Instantly share code, notes, and snippets.

@fcicq
Created April 23, 2014 10:20
Show Gist options
  • Save fcicq/11209848 to your computer and use it in GitHub Desktop.
Save fcicq/11209848 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import pcap
TYPE_DISCOVERY = '\x88\x63'
TYPE_SESSION = '\x88\x64'
PADI = '\x09'
PADO = '\x07'
PADR = '\x19'
PADS = '\x65'
PADD = '\x00'
TAG_SRV_NAME = '\x01\x01'
TAG_AC_NAME = '\x01\x02'
TAG_HOST_UNIQ = '\x01\x03'
TAG_AC_COOKIE = '\x01\x04'
PPPOE_VERSION = '\x11'
srv_name = '\x01\x01\x00\x00'
ac_name = '\x01\x02\x00\x02\x47\x38'
ac_cookie = '\x01\x04\x00\x12\x52\x53\x50\x45\x00\x14\x78\xda\xe5\x03\xe6\x6a\xaf\xe7\xe0\x2a\xc9\x01'
src_mac = '\x00\x02\x3f\xea\x84\xa3'
sessionid0 = '\x00\x00'
sessionid1 = '\x00\x06'
LCP = '\xc0\x21'
PAP = '\xc0\x23'
lcp_request = '\x01\x04\x00\x12\x01\x04\x05\xd4\x03\x04\xc0\x23\x05\x06\x22\x1e\x17\x4f'
CONF_REQ = '\x01'
CONF_ACK = '\x02'
pc = ''
request_flag = False
#-------------------------------------------------------
FILTER = ''.join([(len(repr(chr(x))) == 3) and chr(x) or '.' for x in range(256)])
def dump(src, length=16):
result = []
for i in xrange(0, len(src), length):
chars = src[i:i+length]
hex = ' '.join(["%02x" % ord(x) for x in chars])
printable = ''.join(["%s" % ((ord(x) <= 127 and FILTER[ord(x)]) or '.') for x in chars])
result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
return ''.join(result)
def finduid (packet):
packstr = packet[20: len(packet)]
host_index = packstr.find(TAG_HOST_UNIQ, 0, len(packstr))
host_end = host_index + 4 + ord(packstr[host_index+3:host_index+4])
return packstr[host_index:host_end]
#------------------------------------------------
def discovery_init (packet):
dst_mac = packet[6:12]
host_uniq = finduid(packet)
payload = ''.join((srv_name, ac_name, host_uniq, ac_cookie))
payload_length = ''.join((chr(0), chr(len(payload))))
pkt = ''.join((dst_mac, src_mac, TYPE_DISCOVERY, PPPOE_VERSION, PADO,
sessionid0, payload_length, payload))
pc.inject(pkt, len(pkt))
print dump(pkt)
def discovery_req (packet):
dst_mac = packet[6:12]
host_uniq = finduid(packet)
payload = ''.join((srv_name, host_uniq))
payload_length = ''.join((chr(0), chr(len(payload))))
pkt = ''.join((dst_mac, src_mac, TYPE_DISCOVERY, PPPOE_VERSION, PADS,
sessionid1, payload_length, payload))
pc.inject(pkt, len(pkt))
print dump(pkt)
def discovery_err (packet):
print 'Error Discovery'
discovery_action = {
PADI: discovery_init,
PADR: discovery_req}
def discovery_handler (packet):
code = packet[15:16]
discovery_action.get(code, discovery_err)(packet)
#---------------------------------------------------
def pap_handler (packet):
print 'pap'
#-----------------------------------------------------
def lcp_handler (packet):
global request_flag
dst_mac = packet[6:12]
#send my request
if request_flag == False:
payload_length = '\x00\x14'
pkt = ''.join((dst_mac, src_mac, TYPE_SESSION, PPPOE_VERSION, PADD,
sessionid1, payload_length, LCP, lcp_request))
pc.inject(pkt, len(pkt))
request_flag = True
#send ack
if packet[22:23] == CONF_REQ:
lcp_len = ord(packet[25:26])
reply = packet[23 : 22+lcp_len]
pkt = ''.join((dst_mac, src_mac, TYPE_SESSION, PPPOE_VERSION, PADD,
sessionid1, payload_length, LCP, CONF_ACK ,reply))
pc.inject(pkt, len(pkt))
else:
print 'other lcp'
def ppp_err (packet):
print 'PPP ERROR'
ppp_action = {
LCP : lcp_handler,
PAP : pap_handler}
def session_handler (packet):
proto = packet[20:22]
ppp_action.get(proto, ppp_err)(packet)
#----------------------------------------------------
def oe_err (packet):
print 'Error Ethernet Frame'
dump(packet)
oe_action = {
TYPE_DISCOVERY: discovery_handler,
TYPE_SESSION : session_handler}
def main():
global pc
pc = pcap.pcap()
for ts, packet in pc:
ether_type = packet[12:14]
oe_action.get(ether_type, oe_err)(packet)
#file = pcap.pcap('/tmp/tplink.pcap')
#packet = file.next()[1]
#packet = file.next()[1]
#packet = file.next()[1]
if __name__ == '__main__':
main()
print 'OK\n'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment