Skip to content

Instantly share code, notes, and snippets.

@enukane
Created January 28, 2015 12:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save enukane/b9231c92329808e25435 to your computer and use it in GitHub Desktop.
Save enukane/b9231c92329808e25435 to your computer and use it in GitHub Desktop.
embulk-plugin-input-pcapng
require "csv"
module Embulk
class InputPcapngFiles < InputPlugin
# input plugin file name must be: embulk/input_<name>.rb
Plugin.register_input('pcapng', self)
def self.transaction(config, &control)
threads = config.param('threads', :integer, default: 2)
task = {
'paths' => config.param('paths', :array, default: []).map {|path|
return [] unless Dir.exists?(path)
Dir.entries(path).sort.select {|entry|
entry.match(/^.+\.pcapng$/)
}.map{|entry|
path + "/" + entry
}
}.flatten,
'done' => config.param('done', :array, default: []),
'paths_per_thread' => [],
}
task['paths'] = task['paths'] - task['done']
task['paths_per_thread'] = task['paths'].each_slice(task['paths'].length / threads + 1).to_a
schema = config.param('schema', :array, default: [])
columns = []
columns << Column.new(0, "path", :string)
idx = 0
columns.concat schema.map{|c|
idx += 1
Column.new(idx, "#{c['name']}", c['type'].to_sym)
}
commit_reports = yield(task, columns, threads)
done = commit_reports.map{|hash| hash["done"]}.flatten.compact
return config.merge({ "done" => done })
end
def initialize(task, schema, index, page_builder)
super
end
attr_reader :task
attr_reader :schema
attr_reader :page_builder
def run
paths = task['paths_per_thread'][@index]
if paths == nil or paths == []
return {}
end
paths.each do |path|
each_packet(path, schema[1..-1].map{|elm| elm.name}) do |hash|
entry = [ path ] + schema[1..-1].map {|c|
v = hash[c.name]
v = v.to_i if c.type == :long
v
}
@page_builder.add(entry)
end
end
@page_builder.finish # must call finish they say
return {"done" => paths}
end
private
def build_options(fields)
options = ""
fields.each do |field|
options += "-e '#{field}' "
end
return options
end
def each_packet(path, fields, &block)
options = build_options(fields)
io = IO.popen("tshark -E separator=, #{options} -T fields -r #{path}")
while line = io.gets
array = [fields, CSV.parse(line).flatten].transpose
yield(Hash[*array.flatten])
end
io.close
end
def fetch_from_pcap(path, fields)
options = build_options(fields)
io = IO.popen("tshark -E separator=, #{options} -T fields -r #{path}")
data = io.read
io.close
return data
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment