Created
January 28, 2015 11:52
-
-
Save enukane/a0a8dbd26c13d38272f9 to your computer and use it in GitHub Desktop.
pcapng input
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "csv" | |
module Embulk | |
class InputExample < InputPlugin | |
# input plugin file name must be: embulk/input_<name>.rb | |
Plugin.register_input('example', self) | |
def self.transaction(config, &control) | |
p ">> start transaction" | |
p "called transaction" | |
threads = config.param('threads', :integer, default: 2) | |
task = { | |
'paths' => config.param('paths', :array, default: []).map {|path| | |
return [] unless Dir.exists?(path) | |
Dir.entries(path).sort.select {|entry| | |
entry.match(/^.+\.pcapng$/) | |
}.map{|entry| | |
path + "/" + entry | |
} | |
}.flatten, | |
'each_paths' => [], | |
} | |
task['each_paths'] = task['paths'].each_slice(task['paths'].length / threads + 1).to_a | |
p task['each_paths'] | |
schema = config.param('schema', :array, default: []) | |
columns = [] | |
columns << Column.new(0, "path", :string) | |
idx = 0 | |
columns.concat schema.map{|c| | |
idx += 1 | |
Column.new(idx, "#{c['name']}", c['type'].to_sym) | |
} | |
commit_reports = yield(task, columns, threads) | |
p "<< end transaction" | |
return {} | |
end | |
def initialize(task, schema, index, page_builder) | |
p "called initialize" | |
super | |
end | |
attr_reader :task | |
attr_reader :schema | |
attr_reader :page_builder | |
def run | |
p ">> start run #{@index}" | |
puts "Example input thread #{@index}..." | |
paths = task['each_paths'][@index] | |
p paths | |
paths.each do |path| | |
p "doing path #{path}" | |
each_packet(path, schema[1..-1].map{|elm| elm.name}) do |hash| | |
entry = [ path ] + schema[1..-1].map {|c| | |
v = hash[c.name] | |
v = v.to_i if c.type == :long | |
v | |
} | |
@page_builder.add(entry) | |
end | |
p "done path #{path}" | |
end | |
@page_builder.finish # must call finish they say | |
commit_report = { | |
} | |
p "<< end run #{@index}" | |
return commit_report | |
end | |
private | |
def build_options(fields) | |
options = "" | |
fields.each do |field| | |
options += "-e '#{field}' " | |
end | |
return options | |
end | |
def each_packet(path, fields, &block) | |
options = build_options(fields) | |
io = IO.popen("tshark -E separator=, #{options} -T fields -r #{path}") | |
while line = io.gets | |
array = [fields, CSV.parse(line).flatten].transpose | |
yield(Hash[*array.flatten]) | |
end | |
io.close | |
end | |
def fetch_from_pcap(path, fields) | |
options = build_options(fields) | |
io = IO.popen("tshark -E separator=, #{options} -T fields -r #{path}") | |
data = io.read | |
io.close | |
return data | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment