Skip to content

Instantly share code, notes, and snippets.

@codeout
Last active November 20, 2017 15:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save codeout/b777f6e1ebb45c908dc96df92a1aa9e2 to your computer and use it in GitHub Desktop.
Save codeout/b777f6e1ebb45c908dc96df92a1aa9e2 to your computer and use it in GitHub Desktop.
Download MRT archive from Route Views Project to load into PostgreSQL
require 'open-uri'
require 'pg'
require 'uri'
module RouteViews
DIR = 'mrt'
BASE_URL = 'http://archive.routeviews.org'
SERVERS = {
'oregon2' => '',
'oregon3' => 'route-views3',
'oregon4' => 'route-views4',
'eqix' => 'route-views.eqix',
'isc' => 'route-views.isc',
'kixp' => 'route-views.kixp',
'jinx' => 'route-views.jinx',
'linx' => 'route-views.linx',
'napafrica' => 'route-views.napafrica',
'nwax' => 'route-views.nwax',
'telxatl' => 'route-views.telxatl',
'wide' => 'route-views.wide',
'sydney' => 'route-views.sydney',
'saopaulo' => 'route-views.saopaulo',
'sg' => 'route-views.sg',
'perth' => 'route-views.perth',
'sfmix' => 'route-views.sfmix',
'soxrs' => 'route-views.soxrs',
}
class << self
def download(type, time)
unless time =~ /^(\d{4})(\d{2})\d{2}\.\d{4}$/
raise ArgumentError, 'invalid time format'
end
date = "#$1.#$2"
mkdir
SERVERS.each do |name, code|
archive = Archive.new(name, code, type, date, time)
archive.download
end
end
def migrate(db)
conn = PG::Connection.open(dbname: db)
conn.exec DATA.read
end
def load(type, db)
conn = PG::Connection.open(dbname: db)
SERVERS.each do |name, _|
Dir["#{File.join(__dir__, DIR)}\/#{name}\/#{type}*.bz2"].each do |f|
print " Loading #{f} ... "
num = 0
IO.popen("bgpdump #{f}") do |io|
while str = io.gets("\n\n")
bgp = Bgp.new(name, type, str)
bgp.sql.each do |s|
begin
conn.exec s
num += 1
rescue
$stderr.puts "SQL failed: #{s}"
end
end
end
end
puts "done #{num} entries"
end
end
end
private
def mkdir
dir = File.join(__dir__, DIR)
Dir.mkdir dir unless File.exists?(dir)
SERVERS.each do |name, _|
dir = File.join(__dir__, DIR, name)
Dir.mkdir dir unless File.exists?(dir)
end
end
end
class Archive
def initialize(name, code, type, date, time)
@name = name
@code = code
@type = type
@date = date
@time = time
end
def download()
print " Downloading #{url} ... "
open(url, 'rb') do |f|
File.write path, f.read, mode: 'wb'
end
puts 'done'
rescue OpenURI::HTTPError
puts $!
end
private
def file()
case @type
when 'update'
"updates.#{@time}.bz2"
when 'rib'
"rib.#{@time}.bz2"
end
end
def path()
File.join(__dir__, DIR, @name, file)
end
def url()
URI.join(BASE_URL, "#{@code}/", 'bgpdata/', "#{@date}/", "#{@type.upcase}S/", file)
end
end
class Bgp
def initialize(ix, type, str)
@type = type
@str = str
@columns = {ix: %('#{ix}')}
@nlris = []
@withdrawals = []
end
def sql
parse
case @type
when 'update'
sql_for_updates
when 'rib'
sql_for_ribs
end
end
private
def parse
@str.gsub!(/^ UNKNOWN_ATTR.*/, '')
@str.gsub!(/\n /, ' ')
if @str =~ /^TIME: (.+)/
@columns[:time] = %('#$1'::TIMESTAMP)
end
if @str =~ /^FROM: (\S+) AS(\d+)/
@columns[:neighbor_addr] = %('#$1')
@columns[:neighbor_as] = $2
end
if @str =~ /^TO: (\S+) AS(\d+)/
@columns[:local_addr] = %('#$1')
@columns[:local_as] = $2
end
if @str =~ /^ORIGIN: (\S+)/
@columns[:origin] = %('#$1')
end
if @str =~ /^ASPATH: (.+)/
@columns[:aspath] = %('#$1')
end
if @str =~ /^NEXT_HOP: (.+)/ # NOTE: TABLE_DUMP_V2 might have multiple nexthops. Pick one.
@columns[:nexthop] = %('#$1')
end
if @str =~ /^MULTI_EXIT_DISC: (\d+)/
@columns[:med] = $1
end
if @str =~ /^COMMUNITY: (.+)/
@columns[:community] = %('#$1')
end
if @str =~ /^ATOMIC_AGGREGATE/
@columns[:atomic_aggregate] = %(TRUE)
end
if @str =~ /^AGGREGATOR: (.+)/
@columns[:aggregator] = %('#$1')
end
if @str =~ /^ANNOUNCE (.+)/
@nlris = $1.strip.split(' ')
end
if @str =~ /^WITHDRAW (.+)/
@withdrawals = $1.strip.split(' ')
end
if @str =~ /^ORIGINATED: (.+)/
@columns[:originated] = %('#$1'::TIMESTAMP)
end
if @str =~ /^PREFIX: (.+)/
@columns[:prefix] = %('#$1'::CIDR)
end
end
def sql_for_updates
@nlris.map {|prefix|
"INSERT INTO updates (#{@columns.keys.join(', ')}, prefix) \
values (#{@columns.values.join(', ')}, '#{prefix}'::CIDR)"
} + @withdrawals.map {|prefix|
"INSERT INTO updates (ix, time, neighbor_addr, neighbor_as, local_addr, local_as, prefix, withdraw) \
values (#{@columns[:ix]}, #{@columns[:time]}, #{@columns[:neighbor_addr]}, #{@columns[:neighbor_as]}, \
#{@columns[:local_addr]}, #{@columns[:local_as]}, '#{prefix}'::CIDR, TRUE)"
}
end
def sql_for_ribs
["INSERT INTO rib (#{@columns.keys.join(', ')}) values (#{@columns.values.join(', ')})"]
end
end
end
usage = <<EOS
Usage: #$0 command
Commands:
update download YYYYmmdd.HHMM Download MRT UPDATE archive
update load DB_NAME INSERT routes into database
rib download YYYYmmdd.HHMM Download MRT UPDATE archive
rib load DB_NAME INSERT routes into database
migrate DB_NAME Migrate database
EOS
if !%w(update rib).product(%w(download load)).include?(ARGV[0..1]) and ARGV[0] != 'migrate'
abort usage
end
case ARGV.size
when 3
case ARGV[1]
when 'download'
RouteViews.download ARGV[0], ARGV[2]
when 'load'
RouteViews.load ARGV[0], ARGV[2]
end
when 2
RouteViews.migrate ARGV[1] if ARGV[0] == 'migrate'
end
__END__
CREATE TABLE updates (
id serial,
ix text,
time timestamp,
withdraw boolean,
local_addr text,
local_as integer,
neighbor_addr text,
neighbor_as integer,
prefix cidr,
origin text,
aspath text,
nexthop text,
med bigint,
community text,
atomic_aggregate boolean,
aggregator text
);
CREATE INDEX index_updates_on_ix ON updates USING btree(ix);
CREATE INDEX index_updates_on_time ON updates USING btree(time);
CREATE INDEX index_updates_on_withdraw ON updates USING btree(withdraw);
CREATE INDEX index_updates_on_neighbor_addr ON updates USING btree(neighbor_addr);
CREATE INDEX index_updates_on_prefix ON updates USING btree(prefix);
CREATE TABLE rib (
id serial,
ix text,
time timestamp,
neighbor_addr text,
neighbor_as integer,
prefix cidr,
origin text,
aspath text,
nexthop text,
med bigint,
community text,
atomic_aggregate boolean,
aggregator text,
originated timestamp
);
CREATE INDEX index_rib_on_ix ON rib USING btree(ix);
CREATE INDEX index_rib_on_time ON rib USING btree(time);
CREATE INDEX index_rib_on_neighbor_addr ON rib USING btree(neighbor_addr);
CREATE INDEX index_rib_on_prefix ON rib USING btree(prefix);
@codeout
Copy link
Author

codeout commented Nov 20, 2017

How to use

createdb -E UTF8 -T template0 route_leak
ruby route_views.rb migrate route_leak

for i in 0300 0315 0330 0345; \
  ruby route_views.rb update download 20170825.$i
ruby route_views.rb update load route_leak

ruby route_views.rb rib download 20170825.0200
ruby route_views.rb rib load route_leak

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment