Skip to content

Instantly share code, notes, and snippets.

@rhuss
Last active January 5, 2021 20:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rhuss/13aa065b4195e9f110b80d40a762b9f2 to your computer and use it in GitHub Desktop.
Save rhuss/13aa065b4195e9f110b80d40a762b9f2 to your computer and use it in GitHub Desktop.
Migration script for migration from a very old InfluxDB installation to a recent version (e.g. 0.7.0 to 1.8.3)
use InfluxDB;
use Data::Dumper;
use REST::Client;
use strict;
# Script for migrating data from an outdated InfluxDB to one of the
# latsest version.
# Developed for migrating from an InfluxDB 0.7.0 to a 1.8.3 version in one go
# ------------------------------------------------------------------------
# Series to migrate (source to target with tags)
my $mapping = {
"puffer" => "collector,topic=home/puffer",
"pv" => "collector,topic=home/current/pv",
"strom" => "collector,topic=home/current/meter",
};
my $source_series = shift || die "Source series has to be provided as first argument";
my $target_series = $mapping->{$source_series} || die "No mapping for source series $source_series found";
# Use this timestamp as start if provided
my $start_ts = shift;
# Source connections parameters
my $source = {
host => "172.16.128.139",
port => 8086,
database => "haus",
};
# Target connections parameters
my $target = {
host => "localhost",
port => 8086,
database => "home",
};
my %auth = ();
%auth = (
username => $source->{username},
password => $source->{password},
) if $source->{username};
# For reading we use this (outdated) Perl module, could enough up to
# InfluxDB 0.9
my $old_ix = InfluxDB->new(
host => $source->{host},
port => $source->{port},
database => $source->{database},
%auth,
);
# Writing with line protocol directly with a rest client
my $target_client = REST::Client->new(host => sprintf("http://%s:%d", $target->{host}, $target->{port}));
# 4 days
my $TIME_WINDOW = 3600 * 24 * 4;
my $data;
# Find oldest entry in the source series for starting to iterate over
my $lower_ts = $start_ts || get_oldest_ts($old_ix, $source_series) - 60;
my $upper_ts = $lower_ts + $TIME_WINDOW;
my $now = time;
$| = 1;
print "Migrating $source_series to $target_series:\n";
my $total = 0;
do {
print scalar(localtime($lower_ts))," [$lower_ts] : ";
my $nr = 0;
# Select with time windows to collect a batch of metrics
$data=$old_ix->query(
q => sprintf("select * from %s where time > %ds and time < %ds",$source_series, $lower_ts, $upper_ts), time_precision => "s");
if ($old_ix->status()->{code} != 200) {
die $old_ix->errstr();
}
my $post_data = "";
for my $rs (@$data) {
# print Dumper($rs);
my $points = $rs->{points};
if (!@$points) {
print "--";
};
my ($idx_map,$metrics) = &extract_idx($rs->{columns});
# print Dumper($idx_map, $metrics);
for my $row (@$points) {
my $time = $row->[$idx_map->{"time"}];
my @values = ();
for my $m (sort @$metrics) {
my $v = $row->[$idx_map->{$m}];
$v = "0" unless $v;
push @values,sprintf("%s=%s", $m, $v);
}
my $influx_line = sprintf("%s %s %d", $target_series, join(",", @values), $time);
$nr++;
# Accumulate lines to send later with one batch request
$post_data .= $influx_line . "\n";
}
# Update all lines found in the time window in the target database
my $result = $target_client->POST(sprintf("/write?db=%s&precision=s",$target->{database}),$post_data);
die "Error posting: ", $result->responseCode(), " ", $result->responseContent() unless $result->responseCode() == 204;
}
print "$nr\n";
$total += $nr;
# Next time window
$lower_ts = $upper_ts - 1;
$upper_ts = $lower_ts + $TIME_WINDOW;
} while $lower_ts < $now; # until we reach the current date.
print "Total entries: $total\n";
sub extract_idx {
my $cols = shift;
my $idx_map = {};
my $metrics = [];
for (my $i = 0; $i <= $#{$cols}; $i++) {
my $c = $cols->[$i];
$idx_map->{$c} = $i;
push @$metrics, $c unless ($c eq "time") or ($c eq "sequence_number");
}
return $idx_map, $metrics;
}
sub get_oldest_ts {
my $ix = shift;
my $series = shift;
my $data = $ix->query(q => "select * from $series order asc limit 1", time_precision => "s");
die $ix->errstr() if $ix->status->{code} != 200;
my ($idx_map) = extract_idx($data->[0]->{columns});
my $ts = $data->[0]->{points}->[0]->[$idx_map->{time}];
# print Dumper($data);
# print $ts;
return $ts;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment