Skip to content

Instantly share code, notes, and snippets.

@jarutis
Last active August 29, 2015 14:08
Show Gist options
  • Save jarutis/7bb4e93a137607607a42 to your computer and use it in GitHub Desktop.
Save jarutis/7bb4e93a137607607a42 to your computer and use it in GitHub Desktop.
Failed Impala UDAF
#include <iostream>
#include <math.h>
#include <impala_udf/uda-test-harness.h>
#include "uda-dist.h"
using namespace impala;
using namespace impala_udf;
using namespace std;
bool TestDistribution() {
UdaTestHarness2<StringVal, StringVal, DoubleVal, IntVal> test(
DistributionInit, DistributionUpdate, DistributionMerge, DistributionSerialize, DistributionFinalize);
test.SetIntermediateSize(20);
vector<DoubleVal> vals;
vector<IntVal> bins;
// Test empty input
cerr << "--TESTING NULL--" << endl;
if (!test.Execute(vals, bins, StringVal::null())) {
cerr << "Input empty: " << test.GetErrorMsg() << endl;
return false;
}
// Test values
cerr << "--TESTING 10 VALUE SAMPLE--" << endl;
for (int i = 0; i < 10; ++i) {
vals.push_back(DoubleVal(1));
bins.push_back(IntVal(2));
}
if (!test.Execute(vals, bins, StringVal("5.00,5.00"))) {
cerr << "Input small: " << test.GetErrorMsg() << endl;
return false;
}
// Test NULL values
cerr << "--TESTING NULL VALUE SAMPLE--" << endl;
for (int i = 0; i < 100; ++i) {
vals.push_back(DoubleVal::null());
bins.push_back(IntVal(2));
}
if (!test.Execute(vals, bins, StringVal("5.00,5.00"))) {
cerr << "Input small: " << test.GetErrorMsg() << endl;
return false;
}
vals.clear();
bins.clear();
// Test count < bins
cerr << "--TESTING COUNT < BINS--" << endl;
for (int i = 0; i < 10; ++i) {
vals.push_back(DoubleVal(1));
bins.push_back(IntVal(20));
}
if (!test.Execute(vals, bins, StringVal::null())) {
cerr << "Input small: " << test.GetErrorMsg() << endl;
return false;
}
// Test biger values
vals.clear();
bins.clear();
StringVal res_equal = StringVal("1.00,1.00,1.00,1.00,1.00,1.00,1.00,1.00,1.00,1.00");
// Test count < bins
cerr << "--TESTING COUNT == BINS--" << endl;
for (int i = 0; i < 10; ++i) {
vals.push_back(DoubleVal(1));
bins.push_back(IntVal(10));
}
if (!test.Execute(vals, bins, res_equal)) {
cerr << "Input small: " << test.GetErrorMsg() << endl;
return false;
}
// Test biger values
vals.clear();
bins.clear();
StringVal res_big = StringVal("94950.00,84950.00,74950.00,64950.00,54950.00,44950.00,34950.00,24950.00,14950.00,4950.00");
std::cout << "--TESTING 1000 UNSORTED VALUE SAMPLE--" << endl;
for (int i = 0; i < 1000; ++i) {
vals.push_back(DoubleVal(i));
bins.push_back(IntVal(10));
}
if (!test.Execute(vals, bins, res_big)) {
cerr << "Input medium: " << test.GetErrorMsg() << endl;
return false;
}
// Test big values
vals.clear();
bins.clear();
StringVal res_mil = StringVal("100000.00,100000.00,100000.00,100000.00,100000.00,100000.00,100000.00,100000.00,100000.00,100000.00");
std::cout << "--TESTING 1.000.000 VALUE SAMPLE--" << endl;
for (int i = 0; i < 1000000; ++i) {
vals.push_back(DoubleVal(1));
bins.push_back(IntVal(10));
}
if (!test.Execute(vals, bins, res_mil)) {
cerr << "Input million: " << test.GetErrorMsg() << endl;
return false;
}
// Test big values
vals.clear();
bins.clear();
StringVal res_100mil = StringVal("10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00");
std::cout << "--TESTING 100.000.000 VALUE SAMPLE--" << endl;
for (int i = 0; i < 100000000; ++i) {
vals.push_back(DoubleVal(1));
bins.push_back(IntVal(10));
}
if (!test.Execute(vals, bins, res_100mil)) {
cerr << "Input big: " << test.GetErrorMsg() << endl;
return false;
}
return true;
}
int main(int argc, char** argv) {
bool passed = true;
passed &= TestDistribution();
cerr << (passed ? "Tests passed." : "Tests failed.") << endl;
return 0;
}
#include "uda-dist.h"
#include <assert.h>
#include <sstream>
#include <vector>
#include <string>
#include <algorithm>
#include <iostream>
#include <malloc.h>
#define DEBUG_ENABLED true
#define ELEMENT_ALLOCATION_SIZE 100000
using namespace impala_udf;
using namespace std;
// ---------------------------------------------------------------------------
// State object
// ---------------------------------------------------------------------------
struct DistributionStruct {
int64_t count;
int64_t capacity;
int64_t bins;
double elements[1];
};
// ---------------------------------------------------------------------------
// Convert to StringVal
// ---------------------------------------------------------------------------
template <typename T>
StringVal ToStringVal(FunctionContext* context, const T& val) {
stringstream ss;
ss << val;
string str = ss.str();
StringVal string_val(context, str.size());
memcpy(string_val.ptr, str.c_str(), str.size());
return string_val;
}
// ---------------------------------------------------------------------------
// Debug helpers
// ---------------------------------------------------------------------------
template <typename T>
void debugPrintStr(const char* name, const T& val) {
if (DEBUG_ENABLED) {
std::cout << "[";
std::cout.width(11);
std::cout << std::left << name << "] Message: " << val << std::endl;
}
}
// ----------------------------------------------------------------------------
// Function to calculate distribution and convert to string
// ---------------------------------------------------------------------------
std::string distribution (double *elements, const int bins, const int count) {
assert(count > 0);
assert(count >= bins);
double result[bins];
memset(result, 0, sizeof result);
std::sort(elements, elements + count, std::greater<double>());
int bin_size = count / bins;
for (int i = 0; i < count; i++ ) {
result[i / bin_size] += elements[i];
}
std::stringstream final;
final.precision(2);
for (int i = 0; i < bins; i++) {
final << std::fixed << result[i];
if(i < (bins - 1)) {
final << ",";
}
}
return final.str();
}
// ---------------------------------------------------------------------------
// Distribution aggregate function.
// ---------------------------------------------------------------------------
// Initialize the StringVal intermediate to a zero'd DistributionStruct
void DistributionInit(FunctionContext* context, StringVal* val) {
val->is_null = false;
val->len = sizeof(DistributionStruct);
val->ptr = context->Allocate(val->len + sizeof(double) * ELEMENT_ALLOCATION_SIZE);
memset(val->ptr, 0, val->len);
// Initialise accumualator
DistributionStruct* dist = reinterpret_cast<DistributionStruct*>(val->ptr);
dist->capacity = ELEMENT_ALLOCATION_SIZE;
}
void DistributionUpdate(FunctionContext* context, const DoubleVal& input,
const IntVal& bins, StringVal* val) {
if (input.is_null) return;
DistributionStruct* dist = reinterpret_cast<DistributionStruct*>(val->ptr);
if(dist->count >= dist->capacity) {
dist->capacity += ELEMENT_ALLOCATION_SIZE;
int new_size = val->len + dist->capacity * sizeof(double);
debugPrintStr("Reallocating old size:", malloc_usable_size(dist));
debugPrintStr("Reallocating expected new size:", new_size);
val->ptr = context->Reallocate(val->ptr, new_size);
debugPrintStr("Reallocating allocated new size:", malloc_usable_size(dist));
}
dist->elements[dist->count++] = input.val;
dist->bins = bins.val;
}
const StringVal DistributionSerialize(FunctionContext* context, const StringVal& val) {
const DistributionStruct* dist = reinterpret_cast<const DistributionStruct*>(val.ptr);
StringVal result(context, val.len + dist->capacity * sizeof(double));
memcpy(result.ptr, val.ptr, val.len + dist->capacity * sizeof(double));
context->Free(val.ptr);
return result;
}
void DistributionMerge(FunctionContext* context, const StringVal& src, StringVal* dst) {
if (src.is_null) return;
const DistributionStruct* src_dist = reinterpret_cast<const DistributionStruct*>(src.ptr);
DistributionStruct* dst_dist = reinterpret_cast<DistributionStruct*>(dst->ptr);
if (src_dist->count > 0) {
if(src_dist->count + dst_dist->count >= dst_dist->capacity) {
dst_dist->capacity = src_dist->count + dst_dist->count;
dst->ptr = context->Reallocate(dst->ptr, dst->len + dst_dist->capacity * sizeof(double));
}
for(int i = 0; i < src_dist->count; i++) {
dst_dist->elements[dst_dist->count++] = src_dist->elements[i];
}
dst_dist->bins = src_dist->bins;
}
}
StringVal DistributionFinalize(FunctionContext* context, const StringVal& val) {
DistributionStruct* dist = reinterpret_cast<DistributionStruct*>(val.ptr);
StringVal result;
if (dist->count < dist->bins || dist->count == 0) {
result = StringVal::null();
} else {
std::string res_s = distribution(dist->elements, dist->bins, dist->count);
result = ToStringVal(context, res_s);
}
context->Free(val.ptr);
return result;
}
#ifndef UDA_DIST_H
#define UDA_DIST_H
#include <impala_udf/udf.h>
using namespace impala_udf;
void DistributionInit(FunctionContext* context, StringVal* val);
void DistributionUpdate(FunctionContext* context, const DoubleVal& input, const IntVal& bins, StringVal* val);
void DistributionMerge(FunctionContext* context, const StringVal& src, StringVal* dst);
const StringVal DistributionSerialize(FunctionContext* context, const StringVal& val);
StringVal DistributionFinalize(FunctionContext* context, const StringVal& val);
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment