Skip to content

Instantly share code, notes, and snippets.

@Lazin
Created September 6, 2016 14:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Lazin/39859734b582ee92cd3ea68ce300e522 to your computer and use it in GitHub Desktop.
Save Lazin/39859734b582ee92cd3ea68ce300e522 to your computer and use it in GitHub Desktop.
Zstandard test (block compression)
#include "storage_engine/compression.h"
#include "perftest_tools.h"
#define ZSTD_STATIC_LINKING_ONLY
#include <zstd.h>
#include <iostream>
#include <cstdlib>
#include <algorithm>
#include <zlib.h>
#include <cstring>
using namespace Akumuli;
//! Generate time-series from random walk
struct RandomWalk {
std::random_device randdev;
std::mt19937 generator;
std::normal_distribution<double> distribution;
size_t N;
std::vector<double> values;
RandomWalk(double start, double mean, double stddev, size_t N)
: generator(randdev())
, distribution(mean, stddev)
, N(N)
{
values.resize(N, start);
}
double generate(aku_ParamId id) {
values.at(id) += distribution(generator);
return values.at(id);
}
void add_anomaly(aku_ParamId id, double value) {
values.at(id) += value;
}
};
int main(int argc, char** argv) {
const u64 TEST_SIZE = 100000;
UncompressedChunk header;
std::cout << "Testing timestamp sequence" << std::endl;
int c = 100;
std::vector<aku_ParamId> ids;
RandomWalk rwalk(10.0, 0.0, 0.01, 1);
for (u64 ts = 0; ts < TEST_SIZE; ts++) {
int k = rand() % 2;
if (k) {
c++;
} else if (c > 0) {
c--;
}
header.timestamps.push_back((ts + c) << 8);
header.values.push_back(rwalk.generate(0));
}
ByteVector out;
out.resize(TEST_SIZE*24);
//out.resize(4096);
const size_t UNCOMPRESSED_SIZE = header.timestamps.size()*8
+ header.values.size()*8;
const size_t nruns = 1000;
size_t total_bytes = 0;
std::vector<double> timings(nruns, .0);
for (size_t k = 0; k < nruns; k++) {
PerfTimer tm;
Akumuli::StorageEngine::DataBlockWriter writer(42, out.data(), static_cast<int>(out.size()));
for (size_t i = 0; i < header.timestamps.size(); i++) {
writer.put(header.timestamps[i], header.values[i]);
}
size_t outsize = writer.commit();
timings.at(k) = tm.elapsed();
total_bytes += outsize;
}
std::cout << "\nAkumuli" << std::endl;
std::cout << "Completed at " << std::accumulate(timings.begin(), timings.end(), .0, std::plus<double>()) << std::endl;
std::cout << "Fastest run: " << std::accumulate(timings.begin(), timings.end(), 1E10, [](double a, double b) {
return std::min(a, b);
}) << std::endl;
std::cout << "Total bytes: " << total_bytes << std::endl;
std::cout << "Compression: " << (double(UNCOMPRESSED_SIZE)/double(total_bytes/nruns)) << std::endl;
std::cout << "Bytes/point: " << (double(total_bytes/nruns)/TEST_SIZE) << std::endl;
// Test zstd compression
total_bytes = 0;
for (size_t k = 0; k < nruns; k++) {
PerfTimer tm;
auto ctx = ZSTD_createCCtx();
ZSTD_compressBegin(ctx, 0);
u64 tss[128];
size_t strsize = 0;
for (size_t ix = 0; ix < header.timestamps.size(); ix++) {
tss[ix % 64] = header.timestamps[ix];
tss[ix % 64 + 64] = *reinterpret_cast<double const*>(&header.values[ix]);
if (ix % 64 == 0 && ix) {
size_t hint = ZSTD_compressBlock(ctx, out.data() + strsize, out.size() - strsize, tss, 128*8);
if (ZSTD_isError(hint)) {
std::cout << ZSTD_getErrorName(hint) << std::endl;
abort();
}
strsize += hint;
}
}
timings.at(k) = tm.elapsed();
total_bytes += strsize;
}
std::cout << "\nZstandard" << std::endl;
std::cout << "Completed at " << std::accumulate(timings.begin(), timings.end(), .0, std::plus<double>()) << std::endl;
std::cout << "Fastest run: " << std::accumulate(timings.begin(), timings.end(), 1E10, [](double a, double b) {
return std::min(a, b);
}) << std::endl;
std::cout << "Total bytes: " << total_bytes << std::endl;
std::cout << "Compression: " << (double(UNCOMPRESSED_SIZE)/double(total_bytes/nruns)) << std::endl;
std::cout << "Bytes/point: " << (double(total_bytes/nruns)/TEST_SIZE) << std::endl;
}
@Cyan4973
Copy link

  • ZSTD_createCCtx() is like a malloc(), so it should be followed by a corresponding ZSTD_freeCCtx() to avoid memory leaks.
  • ZSTD_CCtx* can re-used, which is an important efficiency trick when dealing with a lot of small data to compress
    • ZSTD_createCCtx() only once, before the loop
    • Do the loop, init each time with ZSTD_compressBegin()
    • after the loop, ZSTD_freeCCtx()

It should help speed (and memory fragmentation).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment