Created
April 23, 2014 10:20
-
-
Save fcicq/11209848 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import pcap | |
TYPE_DISCOVERY = '\x88\x63' | |
TYPE_SESSION = '\x88\x64' | |
PADI = '\x09' | |
PADO = '\x07' | |
PADR = '\x19' | |
PADS = '\x65' | |
PADD = '\x00' | |
TAG_SRV_NAME = '\x01\x01' | |
TAG_AC_NAME = '\x01\x02' | |
TAG_HOST_UNIQ = '\x01\x03' | |
TAG_AC_COOKIE = '\x01\x04' | |
PPPOE_VERSION = '\x11' | |
srv_name = '\x01\x01\x00\x00' | |
ac_name = '\x01\x02\x00\x02\x47\x38' | |
ac_cookie = '\x01\x04\x00\x12\x52\x53\x50\x45\x00\x14\x78\xda\xe5\x03\xe6\x6a\xaf\xe7\xe0\x2a\xc9\x01' | |
src_mac = '\x00\x02\x3f\xea\x84\xa3' | |
sessionid0 = '\x00\x00' | |
sessionid1 = '\x00\x06' | |
LCP = '\xc0\x21' | |
PAP = '\xc0\x23' | |
lcp_request = '\x01\x04\x00\x12\x01\x04\x05\xd4\x03\x04\xc0\x23\x05\x06\x22\x1e\x17\x4f' | |
CONF_REQ = '\x01' | |
CONF_ACK = '\x02' | |
pc = '' | |
request_flag = False | |
#------------------------------------------------------- | |
FILTER = ''.join([(len(repr(chr(x))) == 3) and chr(x) or '.' for x in range(256)]) | |
def dump(src, length=16): | |
result = [] | |
for i in xrange(0, len(src), length): | |
chars = src[i:i+length] | |
hex = ' '.join(["%02x" % ord(x) for x in chars]) | |
printable = ''.join(["%s" % ((ord(x) <= 127 and FILTER[ord(x)]) or '.') for x in chars]) | |
result.append("%04x %-*s %s\n" % (i, length*3, hex, printable)) | |
return ''.join(result) | |
def finduid (packet): | |
packstr = packet[20: len(packet)] | |
host_index = packstr.find(TAG_HOST_UNIQ, 0, len(packstr)) | |
host_end = host_index + 4 + ord(packstr[host_index+3:host_index+4]) | |
return packstr[host_index:host_end] | |
#------------------------------------------------ | |
def discovery_init (packet): | |
dst_mac = packet[6:12] | |
host_uniq = finduid(packet) | |
payload = ''.join((srv_name, ac_name, host_uniq, ac_cookie)) | |
payload_length = ''.join((chr(0), chr(len(payload)))) | |
pkt = ''.join((dst_mac, src_mac, TYPE_DISCOVERY, PPPOE_VERSION, PADO, | |
sessionid0, payload_length, payload)) | |
pc.inject(pkt, len(pkt)) | |
print dump(pkt) | |
def discovery_req (packet): | |
dst_mac = packet[6:12] | |
host_uniq = finduid(packet) | |
payload = ''.join((srv_name, host_uniq)) | |
payload_length = ''.join((chr(0), chr(len(payload)))) | |
pkt = ''.join((dst_mac, src_mac, TYPE_DISCOVERY, PPPOE_VERSION, PADS, | |
sessionid1, payload_length, payload)) | |
pc.inject(pkt, len(pkt)) | |
print dump(pkt) | |
def discovery_err (packet): | |
print 'Error Discovery' | |
discovery_action = { | |
PADI: discovery_init, | |
PADR: discovery_req} | |
def discovery_handler (packet): | |
code = packet[15:16] | |
discovery_action.get(code, discovery_err)(packet) | |
#--------------------------------------------------- | |
def pap_handler (packet): | |
print 'pap' | |
#----------------------------------------------------- | |
def lcp_handler (packet): | |
global request_flag | |
dst_mac = packet[6:12] | |
#send my request | |
if request_flag == False: | |
payload_length = '\x00\x14' | |
pkt = ''.join((dst_mac, src_mac, TYPE_SESSION, PPPOE_VERSION, PADD, | |
sessionid1, payload_length, LCP, lcp_request)) | |
pc.inject(pkt, len(pkt)) | |
request_flag = True | |
#send ack | |
if packet[22:23] == CONF_REQ: | |
lcp_len = ord(packet[25:26]) | |
reply = packet[23 : 22+lcp_len] | |
pkt = ''.join((dst_mac, src_mac, TYPE_SESSION, PPPOE_VERSION, PADD, | |
sessionid1, payload_length, LCP, CONF_ACK ,reply)) | |
pc.inject(pkt, len(pkt)) | |
else: | |
print 'other lcp' | |
def ppp_err (packet): | |
print 'PPP ERROR' | |
ppp_action = { | |
LCP : lcp_handler, | |
PAP : pap_handler} | |
def session_handler (packet): | |
proto = packet[20:22] | |
ppp_action.get(proto, ppp_err)(packet) | |
#---------------------------------------------------- | |
def oe_err (packet): | |
print 'Error Ethernet Frame' | |
dump(packet) | |
oe_action = { | |
TYPE_DISCOVERY: discovery_handler, | |
TYPE_SESSION : session_handler} | |
def main(): | |
global pc | |
pc = pcap.pcap() | |
for ts, packet in pc: | |
ether_type = packet[12:14] | |
oe_action.get(ether_type, oe_err)(packet) | |
#file = pcap.pcap('/tmp/tplink.pcap') | |
#packet = file.next()[1] | |
#packet = file.next()[1] | |
#packet = file.next()[1] | |
if __name__ == '__main__': | |
main() | |
print 'OK\n' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment