Skip to content

Instantly share code, notes, and snippets.

@sdir
Created January 30, 2018 06:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sdir/6e44c14f6fd29149689b2f9fe37ba6b9 to your computer and use it in GitHub Desktop.
Save sdir/6e44c14f6fd29149689b2f9fe37ba6b9 to your computer and use it in GitHub Desktop.
IGMP test v1 v2 v3 report and leave
#!env python
# pip install scapy
# send() 用于发送三层数据 sendp()发送二层数据
# 源码https://github.com/secdev/scapy/blob/master/scapy/sendrecv.py#L239
# 参数 inter发送数据后sleep时间,当count不为None 时loop无效,count为执行
# 次数。count为None,loop大于0无限循环,小于0相当于count,等于0,发送一次。
# p.show() p.show2()
from scapy.all import *
from scapy.contrib.igmp import *
from scapy.contrib.igmpv3 import *
DEF_GROUPADDR = "224.0.0.22"
VLAN = ""
#igmp v1 v2 report leave
def sendigmp(type, srcaddr, groupaddr=DEF_GROUPADDR):
if VLAN :
a = Ether()/Dot1Q(vlan=VLAN)
else:
a = Ether()
b = IP(src=srcaddr, dst=groupaddr)
c = IGMP(type=type, mrcode=0, gaddr=groupaddr)
p = a/b/c
p.show2()
sendp(p, inter=2, loop=1) #pkg delay loop_count
def igmpv3_report(type, srcaddr, gsrcaddrs=None, loop_count=-1):
if VLAN :
a = Ether()/Dot1Q(vlan=VLAN)
else:
a = Ether()
b = IP(src=srcaddr)
c = IGMPv3()
d = IGMPv3mr()
f = IGMPv3gr()
f.rtype = type
f.maddr = DEF_GROUPADDR
if gsrcaddrs :
f.srcaddrs = gsrcaddrs
d.records = [f,] #可以使多个组记录
p = a/b/c/d
p.show2()
sendp(p, inter=2, loop=-loop_count)
def send_arp():
a = ARP()
a.hwsrc = '02:ff:ff:ff:ff:ff'
a.psrc = '192.168.180.124'
a.hwdst = '00:00:00:00:00:00'
a.pdst = '192.168.180.131'
sendp(Ether(dst='ff:ff:ff:ff:ff:ff')/a)
def send_v1_report(ip):
sendigmp(0x12, ip)
def send_v2_report(ip):
sendigmp(0x16, ip)
def send_v2_leave(ip):
sendigmp(0x17, ip)
def send_v3_is_in(ip, gr=None):
igmpv3_report(1, ip, gr)
def send_v3_is_ex(ip, gr=None):
igmpv3_report(2, ip, gr)
def send_v3_to_in(ip, gr=None):#gr=None 表示离开
igmpv3_report(3, ip, gr)
def send_v3_to_ex(ip, gr=None):#gr=None 表示加入
igmpv3_report(4, ip, gr)
def send_v3_allow(ip, gr):
igmpv3_report(5, ip, gr)
def send_v3_block(ip, gr):
igmpv3_report(6, ip, gr)
TN2 = "192.168.180.143"
TN1 = "192.168.1.18"
#340 测试
# send_v3_is_in(TN2, ["10.10.10.10","10.10.10.11"])
# send_v3_allow(TN1, ["10.10.10.10"])
#356 测试
# send_v3_is_in(TN2, ["10.10.10.10","10.10.10.11"])
# send_v3_block(TN1, ["10.10.10.10"])
# send_v3_is_in(TN2, ["10.10.10.10"])
#390 测试
#1.
# send_v3_is_in(TN1)#查不到组记录
#2.
# send_v3_is_in(TN2, ["10.10.10.10"])
# send_v3_is_in(TN1)
#3.
# send_v3_is_in(TN2, ["10.10.10.10","10.10.10.11"])
# send_v3_is_in(TN1, ["10.10.10.10"])
#409 测试
# send_v3_is_ex(TN1)
# send_v3_is_in(TN2, ["10.10.10.10"])
# send_v3_to_in(TN1,["10.10.10.10"])
# send_v3_is_ex(TN1, ["10.10.10.10","10.10.10.11"])
# send_v3_is_in(TN2,["10.10.10.10"])
# send_v3_is_ex(TN1, ["10.10.10.10"])
# send_v3_to_ex(TN1,["10.10.10.10","10.10.10.11"])
# send_v3_is_ex(TN2,["10.10.10.10","10.10.10.11"])
# send_v3_allow(TN2,["10.10.10.12"])
# send_v3_allow(TN1,["10.10.10.10"])
# send_v3_is_ex(TN2)
# send_v3_allow(TN2,["10.10.10.10"])
# send_v3_block(TN1,["10.10.10.10"])
# send_v3_is_in(TN2,["10.10.10.10"])#有回应
#
# send_v1_report(TN2)
# send_v2_leave(TN1)
# send_v3_to_in(TN1)
# send_v3_to_ex(TN1,["10.10.10.10"])
#
# send_v2_report(TN2)
# send_v3_to_ex(TN1,["10.10.10.10"])
#
send_v1_report(TN2)
# send_v2_report(TN1)
#
# send_v3_to_ex(TN2)
# send_v2_leave(TN1)
#
# send_v3_is_ex(TN2,["10.10.10.10","10.10.10.11"])
# send_v3_allow(TN2,["10.10.10.12"])
# send_v3_is_in(TN1,["10.10.10.10"])
#643
# send_v3_is_ex(TN2,["10.10.10.11"])
# send_v3_allow(TN2,["10.10.10.10"])
# send_v3_is_ex(TN1,["10.10.10.11","10.10.10.12"])
#689
# send_v3_is_ex(TN2,["10.10.10.11"])
# send_v3_allow(TN2,["10.10.10.10"])
# send_v3_to_in(TN1,["10.10.10.10"])
#741
# send_v3_is_ex(TN2,["10.10.10.10"])
# send_v3_allow(TN2,["10.10.10.12"])
# send_v3_to_ex(TN1,["10.10.10.10","10.10.10.11"])
# send_v3_is_ex(TN1,["10.10.10.10"])
# send_v3_is_ex(TN1)
# send_v3_allow(TN2,["10.10.10.10"])
# send_v3_to_in(TN1,["10.10.10.10"])
# send_v1_report(TN1)
send_v3_to_ex(TN1,["10.10.10.10"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment