Created
January 28, 2015 12:20
-
-
Save enukane/b9231c92329808e25435 to your computer and use it in GitHub Desktop.
embulk-plugin-input-pcapng
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "csv" | |
module Embulk | |
class InputPcapngFiles < InputPlugin | |
# input plugin file name must be: embulk/input_<name>.rb | |
Plugin.register_input('pcapng', self) | |
def self.transaction(config, &control) | |
threads = config.param('threads', :integer, default: 2) | |
task = { | |
'paths' => config.param('paths', :array, default: []).map {|path| | |
return [] unless Dir.exists?(path) | |
Dir.entries(path).sort.select {|entry| | |
entry.match(/^.+\.pcapng$/) | |
}.map{|entry| | |
path + "/" + entry | |
} | |
}.flatten, | |
'done' => config.param('done', :array, default: []), | |
'paths_per_thread' => [], | |
} | |
task['paths'] = task['paths'] - task['done'] | |
task['paths_per_thread'] = task['paths'].each_slice(task['paths'].length / threads + 1).to_a | |
schema = config.param('schema', :array, default: []) | |
columns = [] | |
columns << Column.new(0, "path", :string) | |
idx = 0 | |
columns.concat schema.map{|c| | |
idx += 1 | |
Column.new(idx, "#{c['name']}", c['type'].to_sym) | |
} | |
commit_reports = yield(task, columns, threads) | |
done = commit_reports.map{|hash| hash["done"]}.flatten.compact | |
return config.merge({ "done" => done }) | |
end | |
def initialize(task, schema, index, page_builder) | |
super | |
end | |
attr_reader :task | |
attr_reader :schema | |
attr_reader :page_builder | |
def run | |
paths = task['paths_per_thread'][@index] | |
if paths == nil or paths == [] | |
return {} | |
end | |
paths.each do |path| | |
each_packet(path, schema[1..-1].map{|elm| elm.name}) do |hash| | |
entry = [ path ] + schema[1..-1].map {|c| | |
v = hash[c.name] | |
v = v.to_i if c.type == :long | |
v | |
} | |
@page_builder.add(entry) | |
end | |
end | |
@page_builder.finish # must call finish they say | |
return {"done" => paths} | |
end | |
private | |
def build_options(fields) | |
options = "" | |
fields.each do |field| | |
options += "-e '#{field}' " | |
end | |
return options | |
end | |
def each_packet(path, fields, &block) | |
options = build_options(fields) | |
io = IO.popen("tshark -E separator=, #{options} -T fields -r #{path}") | |
while line = io.gets | |
array = [fields, CSV.parse(line).flatten].transpose | |
yield(Hash[*array.flatten]) | |
end | |
io.close | |
end | |
def fetch_from_pcap(path, fields) | |
options = build_options(fields) | |
io = IO.popen("tshark -E separator=, #{options} -T fields -r #{path}") | |
data = io.read | |
io.close | |
return data | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment