Last active
November 20, 2017 15:14
-
-
Save codeout/b777f6e1ebb45c908dc96df92a1aa9e2 to your computer and use it in GitHub Desktop.
Download MRT archive from Route Views Project to load into PostgreSQL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'open-uri' | |
require 'pg' | |
require 'uri' | |
module RouteViews | |
DIR = 'mrt' | |
BASE_URL = 'http://archive.routeviews.org' | |
SERVERS = { | |
'oregon2' => '', | |
'oregon3' => 'route-views3', | |
'oregon4' => 'route-views4', | |
'eqix' => 'route-views.eqix', | |
'isc' => 'route-views.isc', | |
'kixp' => 'route-views.kixp', | |
'jinx' => 'route-views.jinx', | |
'linx' => 'route-views.linx', | |
'napafrica' => 'route-views.napafrica', | |
'nwax' => 'route-views.nwax', | |
'telxatl' => 'route-views.telxatl', | |
'wide' => 'route-views.wide', | |
'sydney' => 'route-views.sydney', | |
'saopaulo' => 'route-views.saopaulo', | |
'sg' => 'route-views.sg', | |
'perth' => 'route-views.perth', | |
'sfmix' => 'route-views.sfmix', | |
'soxrs' => 'route-views.soxrs', | |
} | |
class << self | |
def download(type, time) | |
unless time =~ /^(\d{4})(\d{2})\d{2}\.\d{4}$/ | |
raise ArgumentError, 'invalid time format' | |
end | |
date = "#$1.#$2" | |
mkdir | |
SERVERS.each do |name, code| | |
archive = Archive.new(name, code, type, date, time) | |
archive.download | |
end | |
end | |
def migrate(db) | |
conn = PG::Connection.open(dbname: db) | |
conn.exec DATA.read | |
end | |
def load(type, db) | |
conn = PG::Connection.open(dbname: db) | |
SERVERS.each do |name, _| | |
Dir["#{File.join(__dir__, DIR)}\/#{name}\/#{type}*.bz2"].each do |f| | |
print " Loading #{f} ... " | |
num = 0 | |
IO.popen("bgpdump #{f}") do |io| | |
while str = io.gets("\n\n") | |
bgp = Bgp.new(name, type, str) | |
bgp.sql.each do |s| | |
begin | |
conn.exec s | |
num += 1 | |
rescue | |
$stderr.puts "SQL failed: #{s}" | |
end | |
end | |
end | |
end | |
puts "done #{num} entries" | |
end | |
end | |
end | |
private | |
def mkdir | |
dir = File.join(__dir__, DIR) | |
Dir.mkdir dir unless File.exists?(dir) | |
SERVERS.each do |name, _| | |
dir = File.join(__dir__, DIR, name) | |
Dir.mkdir dir unless File.exists?(dir) | |
end | |
end | |
end | |
class Archive | |
def initialize(name, code, type, date, time) | |
@name = name | |
@code = code | |
@type = type | |
@date = date | |
@time = time | |
end | |
def download() | |
print " Downloading #{url} ... " | |
open(url, 'rb') do |f| | |
File.write path, f.read, mode: 'wb' | |
end | |
puts 'done' | |
rescue OpenURI::HTTPError | |
puts $! | |
end | |
private | |
def file() | |
case @type | |
when 'update' | |
"updates.#{@time}.bz2" | |
when 'rib' | |
"rib.#{@time}.bz2" | |
end | |
end | |
def path() | |
File.join(__dir__, DIR, @name, file) | |
end | |
def url() | |
URI.join(BASE_URL, "#{@code}/", 'bgpdata/', "#{@date}/", "#{@type.upcase}S/", file) | |
end | |
end | |
class Bgp | |
def initialize(ix, type, str) | |
@type = type | |
@str = str | |
@columns = {ix: %('#{ix}')} | |
@nlris = [] | |
@withdrawals = [] | |
end | |
def sql | |
parse | |
case @type | |
when 'update' | |
sql_for_updates | |
when 'rib' | |
sql_for_ribs | |
end | |
end | |
private | |
def parse | |
@str.gsub!(/^ UNKNOWN_ATTR.*/, '') | |
@str.gsub!(/\n /, ' ') | |
if @str =~ /^TIME: (.+)/ | |
@columns[:time] = %('#$1'::TIMESTAMP) | |
end | |
if @str =~ /^FROM: (\S+) AS(\d+)/ | |
@columns[:neighbor_addr] = %('#$1') | |
@columns[:neighbor_as] = $2 | |
end | |
if @str =~ /^TO: (\S+) AS(\d+)/ | |
@columns[:local_addr] = %('#$1') | |
@columns[:local_as] = $2 | |
end | |
if @str =~ /^ORIGIN: (\S+)/ | |
@columns[:origin] = %('#$1') | |
end | |
if @str =~ /^ASPATH: (.+)/ | |
@columns[:aspath] = %('#$1') | |
end | |
if @str =~ /^NEXT_HOP: (.+)/ # NOTE: TABLE_DUMP_V2 might have multiple nexthops. Pick one. | |
@columns[:nexthop] = %('#$1') | |
end | |
if @str =~ /^MULTI_EXIT_DISC: (\d+)/ | |
@columns[:med] = $1 | |
end | |
if @str =~ /^COMMUNITY: (.+)/ | |
@columns[:community] = %('#$1') | |
end | |
if @str =~ /^ATOMIC_AGGREGATE/ | |
@columns[:atomic_aggregate] = %(TRUE) | |
end | |
if @str =~ /^AGGREGATOR: (.+)/ | |
@columns[:aggregator] = %('#$1') | |
end | |
if @str =~ /^ANNOUNCE (.+)/ | |
@nlris = $1.strip.split(' ') | |
end | |
if @str =~ /^WITHDRAW (.+)/ | |
@withdrawals = $1.strip.split(' ') | |
end | |
if @str =~ /^ORIGINATED: (.+)/ | |
@columns[:originated] = %('#$1'::TIMESTAMP) | |
end | |
if @str =~ /^PREFIX: (.+)/ | |
@columns[:prefix] = %('#$1'::CIDR) | |
end | |
end | |
def sql_for_updates | |
@nlris.map {|prefix| | |
"INSERT INTO updates (#{@columns.keys.join(', ')}, prefix) \ | |
values (#{@columns.values.join(', ')}, '#{prefix}'::CIDR)" | |
} + @withdrawals.map {|prefix| | |
"INSERT INTO updates (ix, time, neighbor_addr, neighbor_as, local_addr, local_as, prefix, withdraw) \ | |
values (#{@columns[:ix]}, #{@columns[:time]}, #{@columns[:neighbor_addr]}, #{@columns[:neighbor_as]}, \ | |
#{@columns[:local_addr]}, #{@columns[:local_as]}, '#{prefix}'::CIDR, TRUE)" | |
} | |
end | |
def sql_for_ribs | |
["INSERT INTO rib (#{@columns.keys.join(', ')}) values (#{@columns.values.join(', ')})"] | |
end | |
end | |
end | |
usage = <<EOS | |
Usage: #$0 command | |
Commands: | |
update download YYYYmmdd.HHMM Download MRT UPDATE archive | |
update load DB_NAME INSERT routes into database | |
rib download YYYYmmdd.HHMM Download MRT UPDATE archive | |
rib load DB_NAME INSERT routes into database | |
migrate DB_NAME Migrate database | |
EOS | |
if !%w(update rib).product(%w(download load)).include?(ARGV[0..1]) and ARGV[0] != 'migrate' | |
abort usage | |
end | |
case ARGV.size | |
when 3 | |
case ARGV[1] | |
when 'download' | |
RouteViews.download ARGV[0], ARGV[2] | |
when 'load' | |
RouteViews.load ARGV[0], ARGV[2] | |
end | |
when 2 | |
RouteViews.migrate ARGV[1] if ARGV[0] == 'migrate' | |
end | |
__END__ | |
CREATE TABLE updates ( | |
id serial, | |
ix text, | |
time timestamp, | |
withdraw boolean, | |
local_addr text, | |
local_as integer, | |
neighbor_addr text, | |
neighbor_as integer, | |
prefix cidr, | |
origin text, | |
aspath text, | |
nexthop text, | |
med bigint, | |
community text, | |
atomic_aggregate boolean, | |
aggregator text | |
); | |
CREATE INDEX index_updates_on_ix ON updates USING btree(ix); | |
CREATE INDEX index_updates_on_time ON updates USING btree(time); | |
CREATE INDEX index_updates_on_withdraw ON updates USING btree(withdraw); | |
CREATE INDEX index_updates_on_neighbor_addr ON updates USING btree(neighbor_addr); | |
CREATE INDEX index_updates_on_prefix ON updates USING btree(prefix); | |
CREATE TABLE rib ( | |
id serial, | |
ix text, | |
time timestamp, | |
neighbor_addr text, | |
neighbor_as integer, | |
prefix cidr, | |
origin text, | |
aspath text, | |
nexthop text, | |
med bigint, | |
community text, | |
atomic_aggregate boolean, | |
aggregator text, | |
originated timestamp | |
); | |
CREATE INDEX index_rib_on_ix ON rib USING btree(ix); | |
CREATE INDEX index_rib_on_time ON rib USING btree(time); | |
CREATE INDEX index_rib_on_neighbor_addr ON rib USING btree(neighbor_addr); | |
CREATE INDEX index_rib_on_prefix ON rib USING btree(prefix); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
How to use