Last active
January 5, 2021 20:18
-
-
Save rhuss/13aa065b4195e9f110b80d40a762b9f2 to your computer and use it in GitHub Desktop.
Migration script for migration from a very old InfluxDB installation to a recent version (e.g. 0.7.0 to 1.8.3)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use InfluxDB; | |
use Data::Dumper; | |
use REST::Client; | |
use strict; | |
# Script for migrating data from an outdated InfluxDB to one of the | |
# latsest version. | |
# Developed for migrating from an InfluxDB 0.7.0 to a 1.8.3 version in one go | |
# ------------------------------------------------------------------------ | |
# Series to migrate (source to target with tags) | |
my $mapping = { | |
"puffer" => "collector,topic=home/puffer", | |
"pv" => "collector,topic=home/current/pv", | |
"strom" => "collector,topic=home/current/meter", | |
}; | |
my $source_series = shift || die "Source series has to be provided as first argument"; | |
my $target_series = $mapping->{$source_series} || die "No mapping for source series $source_series found"; | |
# Use this timestamp as start if provided | |
my $start_ts = shift; | |
# Source connections parameters | |
my $source = { | |
host => "172.16.128.139", | |
port => 8086, | |
database => "haus", | |
}; | |
# Target connections parameters | |
my $target = { | |
host => "localhost", | |
port => 8086, | |
database => "home", | |
}; | |
my %auth = (); | |
%auth = ( | |
username => $source->{username}, | |
password => $source->{password}, | |
) if $source->{username}; | |
# For reading we use this (outdated) Perl module, could enough up to | |
# InfluxDB 0.9 | |
my $old_ix = InfluxDB->new( | |
host => $source->{host}, | |
port => $source->{port}, | |
database => $source->{database}, | |
%auth, | |
); | |
# Writing with line protocol directly with a rest client | |
my $target_client = REST::Client->new(host => sprintf("http://%s:%d", $target->{host}, $target->{port})); | |
# 4 days | |
my $TIME_WINDOW = 3600 * 24 * 4; | |
my $data; | |
# Find oldest entry in the source series for starting to iterate over | |
my $lower_ts = $start_ts || get_oldest_ts($old_ix, $source_series) - 60; | |
my $upper_ts = $lower_ts + $TIME_WINDOW; | |
my $now = time; | |
$| = 1; | |
print "Migrating $source_series to $target_series:\n"; | |
my $total = 0; | |
do { | |
print scalar(localtime($lower_ts))," [$lower_ts] : "; | |
my $nr = 0; | |
# Select with time windows to collect a batch of metrics | |
$data=$old_ix->query( | |
q => sprintf("select * from %s where time > %ds and time < %ds",$source_series, $lower_ts, $upper_ts), time_precision => "s"); | |
if ($old_ix->status()->{code} != 200) { | |
die $old_ix->errstr(); | |
} | |
my $post_data = ""; | |
for my $rs (@$data) { | |
# print Dumper($rs); | |
my $points = $rs->{points}; | |
if (!@$points) { | |
print "--"; | |
}; | |
my ($idx_map,$metrics) = &extract_idx($rs->{columns}); | |
# print Dumper($idx_map, $metrics); | |
for my $row (@$points) { | |
my $time = $row->[$idx_map->{"time"}]; | |
my @values = (); | |
for my $m (sort @$metrics) { | |
my $v = $row->[$idx_map->{$m}]; | |
$v = "0" unless $v; | |
push @values,sprintf("%s=%s", $m, $v); | |
} | |
my $influx_line = sprintf("%s %s %d", $target_series, join(",", @values), $time); | |
$nr++; | |
# Accumulate lines to send later with one batch request | |
$post_data .= $influx_line . "\n"; | |
} | |
# Update all lines found in the time window in the target database | |
my $result = $target_client->POST(sprintf("/write?db=%s&precision=s",$target->{database}),$post_data); | |
die "Error posting: ", $result->responseCode(), " ", $result->responseContent() unless $result->responseCode() == 204; | |
} | |
print "$nr\n"; | |
$total += $nr; | |
# Next time window | |
$lower_ts = $upper_ts - 1; | |
$upper_ts = $lower_ts + $TIME_WINDOW; | |
} while $lower_ts < $now; # until we reach the current date. | |
print "Total entries: $total\n"; | |
sub extract_idx { | |
my $cols = shift; | |
my $idx_map = {}; | |
my $metrics = []; | |
for (my $i = 0; $i <= $#{$cols}; $i++) { | |
my $c = $cols->[$i]; | |
$idx_map->{$c} = $i; | |
push @$metrics, $c unless ($c eq "time") or ($c eq "sequence_number"); | |
} | |
return $idx_map, $metrics; | |
} | |
sub get_oldest_ts { | |
my $ix = shift; | |
my $series = shift; | |
my $data = $ix->query(q => "select * from $series order asc limit 1", time_precision => "s"); | |
die $ix->errstr() if $ix->status->{code} != 200; | |
my ($idx_map) = extract_idx($data->[0]->{columns}); | |
my $ts = $data->[0]->{points}->[0]->[$idx_map->{time}]; | |
# print Dumper($data); | |
# print $ts; | |
return $ts; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment