Skip to content

Instantly share code, notes, and snippets.

@ytti
Last active March 15, 2018 12:39
Show Gist options
  • Save ytti/df3eaaec54fc423ae1685d336d08b452 to your computer and use it in GitHub Desktop.
Save ytti/df3eaaec54fc423ae1685d336d08b452 to your computer and use it in GitHub Desktop.
script to offer unified API to pcap and pcapng, and to pop N bytes out of each captured file (useful to remove e.g. ERSPAN monitoring headers)
#!/usr/bin/env ruby
require "logger"
require "stringio"
class PCAPop
class Error < StandardError; end
class InvalidFormat < Error; end
class ByteOrderError < Error; end
FORMAT = {
0xa1b2c3d4 => :pcap,
0xd4c3b2a1 => :pcap_swapped,
0x0a0d0d0a => :pcapng,
}
HEADER = {
pcap: "LSSlLLL",
pcap_packet: "LLLL",
pcapng_section: "LSSq",
pcapng_interface_description: "SSL",
pcapng_enchanced_packet: "LLLLL",
pcapng_interface_statistics: "LLL",
}
BLOCK_TYPE = {
0x0 => :reserved,
0x1 => :interface_description,
0x2 => :obsolete_packet,
0x3 => :simple_packet,
0x4 => :name_resolution,
0x5 => :interface_statistics,
0x6 => :enchanced_packet,
0x7 => :irig_timestamp,
0x8 => :arinc429_in_afdx,
0xbad => :custom,
0x40000bad => :custom,
0x0a0d0d0a => :section,
}
def initialize(filename)
@timestamp_resolution = nil
@read = ""
@log = Logger.new(STDERR)
@log.formatter = proc do |sev, datetime, progname, msg|
"%5s -- %s\n" % [sev, msg]
end
@file = open(filename)
@format = detect_format(@file)
end
def parse_file(file=@file, format=@format, &block)
method = "parse_file_#{format}"
send(method, file, &block)
end
def pop(pop_bytes)
parse_file do |p|
if [:enchanced_packet, :pcap_packet].include? p.block.name
p.length_capture -= pop_bytes
p.length_original -= pop_bytes
p.packet = p.packet[pop_bytes..-1]
print p.serialize
else
print p.block.bytes_read
end
end
end
def detect_format(file)
magic = read(file, 4, "L").first
file.pos = 0
format = FORMAT[magic]
raise InvalidFormat, "unknown magic '0x#{magic.to_s(16)}', unable to detect format" unless format
@log.debug "detected format '#{format}'"
raise ByteOrderError, "non-native byte order not implemented, FIXME" if format == :pcap_swapped
format
end
def read(file, bytes, unpack_string=nil)
## FIXME: implement non-native byte order support
@log.debug "pos: #{file.pos}/#{file.size}, read: #{bytes}"
data = file.read(bytes)
@read += data
unpack_string ? data.unpack(unpack_string) : data
end
def parse_file_pcap(file, &block)
@read = ""
magic_number, version_major, version_minor, thiszone, sigfigs, snaplen, network = read(file, 24, HEADER[:pcap])
basic = PcapBlock.new(
{
name: :pcap_header,
bytes_read: @read.dup,
})
@log.debug "version: #{version_major}.#{version_minor}"
@log.debug "thiszone: #{thiszone} - GMT to local correction"
@log.debug "sigfigs: #{sigfigs} - accuracy of timestamps"
@log.debug "snaplen: #{snaplen} - max length of captured packets, in octects"
@log.debug "network: #{@network} - data link type"
@log.warn "tested only on pcap version 2.4, file version is #{version_major}.#{version_minor}" unless version_major == 2 and version_minor == 4
block.call PcapHeader.new(
{
magic_number: magic_number,
version_major: version_major,
version_minor: version_minor,
thiszone: thiszone,
snaplen: snaplen,
network: network,
block: basic,
})
while not file.eof?
block.call parse_pcap_packet(file)
end
end
def parse_pcap_packet(file)
@read = ""
ts_sec, ts_usec, length_capture, length_original = read(file, 16, HEADER[:pcap_packet])
@log.debug "ts_sec: #{ts_sec}"
@log.debug "ts_usec: #{ts_usec}"
@log.debug "length_capture: #{length_capture}"
@log.debug "length_original: #{length_original}"
timestamp = parse_timestamp(ts_sec, ts_usec, -1)
packet = read(file, length_capture)
basic = PcapBlock.new(
{
name: :pcap_packet,
bytes_read: @read.dup,
})
PcapPacket.new(
{
ts_sec: ts_sec,
ts_usec: ts_usec,
length_capture: length_capture,
length_original: length_original,
timestamp: timestamp,
packet: packet,
block: basic
})
end
def parse_file_pcapng(file, &block)
while not file.eof?
basic = read_block(file)
method_name = "parse_block_#{basic.name}"
data = if respond_to?(method_name)
send(method_name, StringIO.new(basic.body), basic.length)
else
@log.warn "block not implemented, FIXME"
UnsupportedBlock.new
end
data.block = basic
block.call(data)
end
end
def parse_block_section(body, length)
header = HEADER[:pcapng_section]
byte_order_magic, version_major, version_minor, section_length = read(body, 16, header)
raise ByteOrderError, "non-native byte order not implemented, FIXME" if not byte_order_magic == 0x1a2b3c4d
@log.debug "byte_order_magic 0x#{byte_order_magic.to_s(16)}"
@log.debug "version: #{version_major}.#{version_minor}"
@log.debug "section_length: #{section_length}"
@log.warn "tested only on pcapng version 1.0, file version is #{version_major}.#{version_minor}" unless version_major == 1 and version_minor == 0
opts = parse_options(body, length-28)
Section.new(
{
byte_order_magic: byte_order_magic,
version_major: version_major,
version_minor: version_minor,
section_length: section_length,
header: header,
options: opts,
})
end
def parse_block_interface_description(body, length)
header = HEADER[:pcapng_interface_description]
link_type, reserved, snap_len = read(body, 8, header)
@log.debug "link_type: #{link_type}"
@log.debug "reserved: #{reserved}"
@log.debug "snap_len: #{snap_len}"
opts = parse_options(body, length-20)
@timestamp_resolution = opts.timestamp_resolution
InterfaceDescription.new(
{
link_type: link_type,
reserved: reserved,
snap_len: snap_len,
header: header,
options: opts,
})
end
def parse_block_enchanced_packet(body, length)
header = HEADER[:pcapng_enchanced_packet]
interface_id, timestamp_high, timestamp_low, length_capture, length_original = read(body, 20, header)
@log.debug "interface_id: #{interface_id}"
@log.debug "timestamp_high: #{timestamp_high}"
@log.debug "timestamp_log: #{timestamp_low}"
@log.debug "length_capture: #{length_capture}"
@log.debug "length_original: #{length_original}"
timestamp = parse_timestamp(timestamp_high, timestamp_low, @timestamp_resolution)
packet = read(body, length_capture)
pad_size, pad = read_pad(body, length_capture)
offset = 32+pad_size
opts = parse_options(body, length-(offset+length_capture))
EnchancedPacket.new(
{
interface_id: interface_id,
timestamp_high: timestamp_high,
timestamp_low: timestamp_low,
length_capture: length_capture,
length_original: length_original,
header: header,
options: opts,
packet: packet,
pad: pad,
timestamp: timestamp,
})
end
def parse_block_interface_statistics(body, length)
header = HEADER[:pcapng_interface_statistics]
interface_id, timestamp_high, timestamp_low = read(body, 12, header)
@log.debug "interface_id: #{interface_id}"
@log.debug "timestamp_high: #{timestamp_high}"
@log.debug "timestamp_log: #{timestamp_low}"
timestamp = parse_timestamp(timestamp_high, timestamp_low, 6) # why isn't precision what interface description tells us?'
opts = parse_options(body, length-24)
InterfaceStatistics.new(
{
interface_id: interface_id,
timestamp_high: timestamp_high,
timestamp_low: timestamp_low,
header: header,
options: opts,
timestamp: timestamp,
})
end
def parse_timestamp(high, low, resolution)
timestamp = nil
if resolution == -1 #pcap
timestamp = "#{high}.#{low}".to_f
else #pcapng
timestamp = high << 32 | low
timestamp = timestamp.to_f * 10**-resolution
end
timestamp = Time.at(timestamp)
@log.debug "timestamp: #{timestamp}"
timestamp
end
def read_block(file)
@read = ""
type = read(file, 4, "L").first
length = read(file, 4, "L").first
body = read(file, length-12)
pad, _pad_size = read_pad(file, length)
length_end = read(file, 4, "L").first
type_name = BLOCK_TYPE[type]
@log.error "unknown block type" unless type_name
@log.debug "block type: #{type}, name: #{type_name}, length: #{length}, length_end: #{length_end}"
@log.error "mismatch length in start and end of block" unless length == length_end
Block.new(
{
type: type,
length: length,
header: "LL",
pad: pad,
body: body,
name: type_name,
read: @read.dup,
})
end
def parse_options(file, options_length)
opts = {}
@read = ""
if options_length > 0
@log.debug "block has #{options_length} bytes of options"
option_code = nil
while option_code != 0
option_code, option_length = read(file, 4, "SS")
option_value = read(file, option_length)
opts[option_code] = option_value
read_pad(file, option_length)
@log.debug "option_code: #{option_code}, option_length: #{option_length}B, option_value: '#{option_value}'"
end
else
@log.debug "block has no options"
end
Options.new(
{
bytes_read: @read,
hash: opts,
})
end
def read_pad(file, length)
pad = length.divmod(4).last
if pad == 0
@log.debug "no padding"
[0, ""]
else
pad = 4-pad
@log.debug "reading padding for #{pad}"
[pad, read(file, pad)]
end
end
class PcapBlock
attr_accessor :name, :bytes_read
def initialize opts
@name = opts.delete :name
@bytes_read = opts.delete :bytes_read
end
end
class PcapHeader
attr_accessor :magic_number, :version_major, :version_minor, :this_zone, :sigfigs, :snaplen, :network, :block
def initialize opts
@magic_number = opts.delete :magic_number
@version_major = opts.delete :version_major
@version_minor = opts.delete :version_minor
@this_zone = opts.delete :this_zone
@sigfigs = opts.delete :sigfigs
@snaplen = opts.delete :snaplen
@network = opts.delete :network
@block = opts.delete :block
end
end
class PcapPacket
attr_accessor :ts_sec, :ts_usec, :length_capture, :length_original, :block, :timestamp, :packet
def initialize opts
@ts_sec = opts.delete :ts_sec
@ts_usec = opts.delete :ts_usec
@length_capture = opts.delete :length_capture
@length_original = opts.delete :length_original
@block = opts.delete :block
@timestamp = opts.delete :timestamp
@packet = opts.delete :packet
end
def serialize
data = [ts_sec, ts_usec, length_capture, length_original].pack("LLLL")
data += packet
data
end
end
class Block
attr_accessor :type, :length, :pad, :body, :name, :bytes_read, :options, :header
def initialize opts
@type = opts.delete :type
@length = opts.delete :length
@pad = opts.delete :pad
@body = opts.delete :body
@bytes_read = opts.delete :read
@options = opts.delete :options
@header = opts.delete :header
@name = opts.delete :name
end
end
class Section
attr_accessor :byte_order_magic, :version_major, :version_minor, :section_length, :header, :options, :block
def initialize opts
@byte_order_magic = opts.delete :byte_order_magic
@version_major = opts.delete :version_major
@version_minor = opts.delete :version_minor
@section_length = opts.delete :section_length
@header = opts.delete :header
@options = opts.delete :options
end
end
class InterfaceDescription
attr_accessor :link_type, :reserved, :snap_len, :header, :options, :block
def initialize opts
@link_type = opts.delete :link_type
@reserved = opts.delete :reserved
@snap_len = opts.delete :snap_len
@header = opts.delete :header
@options = opts.delete :options
end
end
class InterfaceStatistics
attr_accessor :interface_id, :timesteamp_high, :timestamp_low, :header, :timestamp, :options, :block
def initialize opts
@interface_id = opts.delete :interface_id
@timestamp_high = opts.delete :timestamp_high
@timestamp_low = opts.delete :timestamp_low
@timestamp = opts.delete :timestamp
@header = opts.delete :header
@options = opts.delete :options
end
end
class EnchancedPacket
attr_accessor :interface_id, :timestamp_high, :timestamp_low, :length_capture, :length_original, :header, :options, :packet, :pad, :timestamp, :block
def initialize opts
@interface_id = opts.delete :interface_id
@timestamp_high = opts.delete :timestamp_high
@timestamp_low = opts.delete :timestamp_low
@length_capture = opts.delete :length_capture
@length_original = opts.delete :length_original
@header = opts.delete :header
@options = opts.delete :options
@packet = opts.delete :packet
@pad = opts.delete :pad
@timestamp = opts.delete :timestamp
end
def serialize
remainder = packet.size.divmod(4).last
pad = remainder == 0 ? 0 : 4-remainder
@packet += "\0" * pad
blk_length = 4+4+4+4+4+4+4+packet.size+options.bytes_read.size+4
blk = ""
blk += [block.type, blk_length, interface_id, timestamp_high, timestamp_low,
length_capture, length_original].pack("LLLLLLL")
blk += packet
blk += options.bytes_read
blk += [blk_length].pack("L")
blk
end
end
class UnsupportedBlock
attr_accessor :block
def initialize opts={}
@block = opts.delete :block
end
end
class Options
attr_accessor :hash, :bytes_read
def initialize opts
@hash = opts.delete :hash
@bytes_read = opts.delete :bytes_read
end
def timestamp_resolution
@hash[9] ? @hash[9].bytes.first : 6
end
end
end
begin
if __FILE__ == $0
if not ARGV[1]
puts "two arguments required"
puts "pcapop filename bytes_to_pop"
puts
puts "pcapop moi.pcap 38 > popped.pcap"
exit 42
end
pcap = PCAPop.new(ARGV[0])
pcap.pop(ARGV[1].to_i)
#pcap.parse_file do |packet|
# print packet.block.bytes_read
#end
end
rescue => exception
warn exception.message
#raise
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment