Skip to content

Instantly share code, notes, and snippets.

@enukane
Created January 28, 2015 11:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save enukane/a0a8dbd26c13d38272f9 to your computer and use it in GitHub Desktop.
Save enukane/a0a8dbd26c13d38272f9 to your computer and use it in GitHub Desktop.
pcapng input
require "csv"
module Embulk
class InputExample < InputPlugin
# input plugin file name must be: embulk/input_<name>.rb
Plugin.register_input('example', self)
def self.transaction(config, &control)
p ">> start transaction"
p "called transaction"
threads = config.param('threads', :integer, default: 2)
task = {
'paths' => config.param('paths', :array, default: []).map {|path|
return [] unless Dir.exists?(path)
Dir.entries(path).sort.select {|entry|
entry.match(/^.+\.pcapng$/)
}.map{|entry|
path + "/" + entry
}
}.flatten,
'each_paths' => [],
}
task['each_paths'] = task['paths'].each_slice(task['paths'].length / threads + 1).to_a
p task['each_paths']
schema = config.param('schema', :array, default: [])
columns = []
columns << Column.new(0, "path", :string)
idx = 0
columns.concat schema.map{|c|
idx += 1
Column.new(idx, "#{c['name']}", c['type'].to_sym)
}
commit_reports = yield(task, columns, threads)
p "<< end transaction"
return {}
end
def initialize(task, schema, index, page_builder)
p "called initialize"
super
end
attr_reader :task
attr_reader :schema
attr_reader :page_builder
def run
p ">> start run #{@index}"
puts "Example input thread #{@index}..."
paths = task['each_paths'][@index]
p paths
paths.each do |path|
p "doing path #{path}"
each_packet(path, schema[1..-1].map{|elm| elm.name}) do |hash|
entry = [ path ] + schema[1..-1].map {|c|
v = hash[c.name]
v = v.to_i if c.type == :long
v
}
@page_builder.add(entry)
end
p "done path #{path}"
end
@page_builder.finish # must call finish they say
commit_report = {
}
p "<< end run #{@index}"
return commit_report
end
private
def build_options(fields)
options = ""
fields.each do |field|
options += "-e '#{field}' "
end
return options
end
def each_packet(path, fields, &block)
options = build_options(fields)
io = IO.popen("tshark -E separator=, #{options} -T fields -r #{path}")
while line = io.gets
array = [fields, CSV.parse(line).flatten].transpose
yield(Hash[*array.flatten])
end
io.close
end
def fetch_from_pcap(path, fields)
options = build_options(fields)
io = IO.popen("tshark -E separator=, #{options} -T fields -r #{path}")
data = io.read
io.close
return data
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment