Skip to content

Instantly share code, notes, and snippets.

@mcchae
Last active August 29, 2015 14:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mcchae/74e9ab6b1e8232dfcf7f to your computer and use it in GitHub Desktop.
Save mcchae/74e9ab6b1e8232dfcf7f to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
#coding=utf8
"""
====================================
:mod: pcap 파일을 읽어 IPv4의 Layer3 주소를 IPv6 주소로 변환
@참고 : http://comments.gmane.org/gmane.network.tcpreplay.user/1086
====================================
.. moduleauthor:: 채문창 <mcchae@future.co.kr>
.. note:: GNU License
설명
=====
pcap 파일을 읽어 IPv4의 Layer3 주소를 IPv6 주소로 변환
"""
__author__ = "MoonChang Chae <mcchae@gmail.com>"
__date__ = "2014/12/16"
__version__ = "1.14.1216"
__version_info__ = (1, 14, 1216)
__license__ = "GNU's NDA"
##########################################################################################
from scapy.all import *
##########################################################################################
def ipnumber(ip):
ip=ip.rstrip().split('.')
ipn=0
while ip:
ipn=(ipn<<8)+int(ip.pop(0))
return ipn
##########################################################################################
def TruncateV6Data(pcapname):
pcap = rdpcap(pcapname)
for i in range (0, len(pcap)):
if (len(pcap[i]) > 1500):
if (pcap[i].haslayer(TCP)):
pcap[i][Raw].load = pcap[i][Raw].load[0:1426]
del(pcap[i][TCP].chksum)
if (pcap[i].haslayer(UDP)):
pcap[i][Raw].load = pcap[i][Raw].load[0:1438]
del(pcap[i][UDP].chksum)
del(pcap[i][UDP].len)
del(pcap[i][IPv6].plen)
wrpcap(pcapname,pcap)
##########################################################################################
def convertV4toV6(pcapname, outpcap, v6src=None, v6dst=None):
pcap = rdpcap(pcapname)
newpcap = []
bailing = 0
# for i in range (0, len(pcap)):
cnt = 0
for pkt in pcap:
cnt += 1
try:
#if it is IPv6 bail to save time
if(pkt.haslayer(IPv6)):
bailing = 1
break
#before we even start check for frag, if fragged bail... for now.
try:
if (pkt[IP].flags == 4L or pkt[IP].frag != 0L):
print "Do not support Fragment at this time, packet " + str(i) + " failed"
bailing = 1
break
except Exception,e:
continue
src = ipnumber(pkt[IP].src)
dst = ipnumber(pkt[IP].dst)
if not v6src:
src = hex(src)
src = src.rstrip().split('0x')
src = src.pop(1)
v6src = src[0:4] + ':' + src[4:8]
v6src = '2011::1:' +v6src
if not v6dst:
dst = hex(dst)
dst = dst.rstrip().split('0x')
dst = dst.pop(1)
v6dst = dst[0:4] + ':' + dst[4:8]
v6dst = '2011::1:' +v6dst
ethersrc = pkt[Ether].src
ethersrc = ethersrc.rstrip().split(':')
etherdst = pkt[Ether].dst
etherdst = etherdst.rstrip().split(':')
# replace the header
#print v6src + " " + v6dst + "packet is: " + str(i)
if pkt.haslayer(IP):
if pkt.haslayer(TCP):
del(pkt[TCP].chksum)
if pkt.haslayer(UDP):
del(pkt[UDP].chksum)
del(pkt[IP].chksum)
del(pkt[IP].len)
if pkt.haslayer(Padding):
del(pkt[Padding])
#the IPv6 header length is 40.
if len(pkt[IP].payload) < 6:
pcapload = ''.join(['0' for num in xrange(0,6-len(pkt[IP].payload))])
pkt = pkt/Padding(load=pcapload)
newpcap.append(Ether(src=pkt[Ether].src,dst=pkt[Ether].dst,type=0x86dd)/
IPv6(nh=pkt[IP].proto,src=v6src, dst=v6dst)/pkt[IP].payload)
newpcap[-1].time = pkt.time
if (bailing != 1):
wrpcap(outpcap,newpcap)
except Exception, e:
print "[%d]%s:<%s>" % (cnt, e, pkt)
raise
##########################################################################################
if __name__=='__main__':
convertV4toV6('/tmp/foo.pcap', '/tmp/bar.pcap')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment