Skip to content

Instantly share code, notes, and snippets.

@jarutis
Created November 10, 2014 06:09
Show Gist options
  • Save jarutis/37ecd995a6875dd13236 to your computer and use it in GitHub Desktop.
Save jarutis/37ecd995a6875dd13236 to your computer and use it in GitHub Desktop.
First successful impala UDA!
#include <iostream>
#include <math.h>
#include <impala_udf/uda-test-harness.h>
#include "uda-dist.h"
using namespace impala;
using namespace impala_udf;
using namespace std;
bool TestDistribution() {
UdaTestHarness2<StringVal, StringVal, DoubleVal, IntVal> test(
DistributionInit, DistributionUpdate, DistributionMerge, DistributionSerialize, DistributionFinalize);
vector<DoubleVal> vals;
vector<IntVal> bins;
// Test empty input
cerr << "--TESTING NULL--" << endl;
if (!test.Execute(vals, bins, StringVal::null())) {
cerr << "Input empty: " << test.GetErrorMsg() << endl;
return false;
}
// Test values
cerr << "--TESTING 10 VALUE SAMPLE--" << endl;
for (int i = 0; i < 10; ++i) {
vals.push_back(DoubleVal(1));
bins.push_back(IntVal(2));
}
if (!test.Execute(vals, bins, StringVal("5.00,5.00"))) {
cerr << "Input small: " << test.GetErrorMsg() << endl;
return false;
}
// Test NULL values
cerr << "--TESTING NULL VALUE SAMPLE--" << endl;
for (int i = 0; i < 100; ++i) {
vals.push_back(DoubleVal::null());
bins.push_back(IntVal(2));
}
if (!test.Execute(vals, bins, StringVal("5.00,5.00"))) {
cerr << "Input small: " << test.GetErrorMsg() << endl;
return false;
}
vals.clear();
bins.clear();
// Test large values, > histogram size
cerr << "--TESTING LARGE VALUE SAMPLE--" << endl;
for (int i = 0; i < 10; ++i) {
vals.push_back(DoubleVal(3859658153.650001));
bins.push_back(IntVal(2));
}
if (!test.Execute(vals, bins, StringVal("25000.00,25000.00"))) {
cerr << "Input small: " << test.GetErrorMsg() << endl;
return false;
}
vals.clear();
bins.clear();
// Test count < bins
cerr << "--TESTING COUNT < BINS--" << endl;
for (int i = 0; i < 10; ++i) {
vals.push_back(DoubleVal(1));
bins.push_back(IntVal(20));
}
if (!test.Execute(vals, bins, StringVal::null())) {
cerr << "Input small: " << test.GetErrorMsg() << endl;
return false;
}
// Test biger values
vals.clear();
bins.clear();
StringVal res_equal = StringVal("1.00,1.00,1.00,1.00,1.00,1.00,1.00,1.00,1.00,1.00");
// Test count < bins
cerr << "--TESTING COUNT == BINS--" << endl;
for (int i = 0; i < 10; ++i) {
vals.push_back(DoubleVal(1));
bins.push_back(IntVal(10));
}
if (!test.Execute(vals, bins, res_equal)) {
cerr << "Input small: " << test.GetErrorMsg() << endl;
return false;
}
vals.clear();
bins.clear();
StringVal res_not_int = StringVal("9.00,8.00,7.00,6.00,5.00,4.00");
// Test count < bins
cerr << "--TESTING COUNT / BINS not integer--" << endl;
for (int i = 0; i < 10; ++i) {
vals.push_back(DoubleVal(i));
bins.push_back(IntVal(6));
}
if (!test.Execute(vals, bins, res_not_int)) {
cerr << "Input small: " << test.GetErrorMsg() << endl;
return false;
}
// Test biger values
vals.clear();
bins.clear();
StringVal res_big = StringVal("94950.00,84950.00,74950.00,64950.00,54950.00,44950.00,34950.00,24950.00,14950.00,4950.00");
std::cout << "--TESTING 1000 UNSORTED VALUE SAMPLE--" << endl;
for (int i = 0; i < 1000; ++i) {
vals.push_back(DoubleVal(i));
bins.push_back(IntVal(10));
}
if (!test.Execute(vals, bins, res_big)) {
cerr << "Input medium: " << test.GetErrorMsg() << endl;
return false;
}
// Test big values
vals.clear();
bins.clear();
StringVal res_mil = StringVal("100000.00,100000.00,100000.00,100000.00,100000.00,100000.00,100000.00,100000.00,100000.00,100000.00");
std::cout << "--TESTING 1.000.000 VALUE SAMPLE--" << endl;
for (int i = 0; i < 1000000; ++i) {
vals.push_back(DoubleVal(1));
bins.push_back(IntVal(10));
}
if (!test.Execute(vals, bins, res_mil)) {
cerr << "Input million: " << test.GetErrorMsg() << endl;
return false;
}
// Test big values
vals.clear();
bins.clear();
StringVal res_100mil = StringVal("10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00");
std::cout << "--TESTING 100.000.000 VALUE SAMPLE--" << endl;
for (int i = 0; i < 100000000; ++i) {
vals.push_back(DoubleVal(1));
bins.push_back(IntVal(10));
}
if (!test.Execute(vals, bins, res_100mil)) {
cerr << "Input big: " << test.GetErrorMsg() << endl;
return false;
}
return true;
}
int main(int argc, char** argv) {
bool passed = true;
passed &= TestDistribution();
cerr << (passed ? "Tests passed." : "Tests failed.") << endl;
return 0;
}
#include "uda-dist.h"
#include <assert.h>
#include <sstream>
#include <vector>
#include <string>
#include <algorithm>
#include <iostream>
#include <malloc.h>
#include <stdio.h>
#define DEBUG_ENABLED true
#define HISTOGRAM_SIZE 100001
#define PRECISION 20
using namespace impala_udf;
using namespace std;
// ---------------------------------------------------------------------------
// State object
// ---------------------------------------------------------------------------
struct DistributionStruct {
int bins;
int64_t elements[HISTOGRAM_SIZE];
};
// ----------------------------------------------------------------------------
// Function to calculate distribution and convert to vector in final step
// ---------------------------------------------------------------------------
string distribution (const int64_t *histogram, const int bins) {
assert(bins > 0);
int64_t total_count = 0;
for(int i = 0; i < HISTOGRAM_SIZE; i++) {total_count += histogram[i];}
double result[bins];
memset(result, 0, sizeof result);
int64_t bin_size = total_count / bins;
int64_t count = 0;
int64_t result_index = 0;
for (int i = HISTOGRAM_SIZE - 1; i >= 0; i-- ) {
count + histogram[i];
if (count + histogram[i] < bin_size) {
result[result_index] += (double) i / PRECISION * histogram[i];
count += histogram[i];
} else {
result[result_index++] += (double) i / PRECISION * (bin_size - count);
count += histogram[i] - bin_size;
while (count >= bin_size) {
result[result_index++] = (double) i / PRECISION * bin_size;
count -= bin_size;
}
if (result_index >= bins) break; // remove overflow if total_count / bins not int
result[result_index] = (double) i / PRECISION * count;
}
}
// convert to string
stringstream ss;
ss.precision(2);
ss << fixed;
for (int i = 0; i < bins; i++) {
ss << result[i];
if(i < (bins - 1)) {
ss << ",";
}
}
return ss.str();
}
// ---------------------------------------------------------------------------
// Distribution aggregate function.
// ---------------------------------------------------------------------------
// Initialize the StringVal intermediate to a zero'd DistributionStruct
void DistributionInit(FunctionContext* context, StringVal* val) {
val->is_null = false;
val->len = sizeof(DistributionStruct);
val->ptr = context->Allocate(val->len);
memset(val->ptr, 0, val->len);
}
void DistributionUpdate(FunctionContext* context, const DoubleVal& input,
const IntVal& bins, StringVal* val) {
if (input.is_null || input.val < 0) return;
DistributionStruct* dist = reinterpret_cast<DistributionStruct*>(val->ptr);
int64_t value = (int64_t) (input.val * PRECISION);
value = value > HISTOGRAM_SIZE - 1 ? HISTOGRAM_SIZE - 1 : value;
dist->elements[value]++;
dist->bins = bins.val;
}
const StringVal DistributionSerialize(FunctionContext* context, const StringVal& val) {
const DistributionStruct* dist = reinterpret_cast<const DistributionStruct*>(val.ptr);
StringVal result(context, val.len);
memcpy(result.ptr, val.ptr, val.len);
context->Free(val.ptr);
return result;
}
void DistributionMerge(FunctionContext* context, const StringVal& src, StringVal* dst) {
if (src.is_null) return;
const DistributionStruct* src_dist = reinterpret_cast<const DistributionStruct*>(src.ptr);
DistributionStruct* dst_dist = reinterpret_cast<DistributionStruct*>(dst->ptr);
for(int i = 0; i < HISTOGRAM_SIZE; i++) {
dst_dist->elements[i] += src_dist->elements[i];
}
if (src_dist->bins > 0) {
dst_dist->bins = src_dist->bins;
}
}
StringVal DistributionFinalize(FunctionContext* context, const StringVal& val) {
const DistributionStruct* dist = reinterpret_cast<const DistributionStruct*>(val.ptr);
StringVal result;
int count = 0;
for (int i = 1; i < HISTOGRAM_SIZE; i++) {
count += dist->elements[i];
}
if (count == 0 || count < dist->bins) {
result = StringVal::null();
} else {
string str = distribution(dist->elements, dist->bins);
StringVal string_val(context, str.size());
memcpy(string_val.ptr, str.c_str(), str.size());
result = string_val;
}
context->Free(val.ptr);
return result;
}
#ifndef UDA_DIST_H
#define UDA_DIST_H
#include <impala_udf/udf.h>
using namespace impala_udf;
void DistributionInit(FunctionContext* context, StringVal* val);
void DistributionUpdate(FunctionContext* context, const DoubleVal& input, const IntVal& bins, StringVal* val);
void DistributionMerge(FunctionContext* context, const StringVal& src, StringVal* dst);
const StringVal DistributionSerialize(FunctionContext* context, const StringVal& val);
StringVal DistributionFinalize(FunctionContext* context, const StringVal& val);
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment