Created
November 10, 2014 06:09
-
-
Save jarutis/37ecd995a6875dd13236 to your computer and use it in GitHub Desktop.
First successful impala UDA!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <math.h> | |
#include <impala_udf/uda-test-harness.h> | |
#include "uda-dist.h" | |
using namespace impala; | |
using namespace impala_udf; | |
using namespace std; | |
bool TestDistribution() { | |
UdaTestHarness2<StringVal, StringVal, DoubleVal, IntVal> test( | |
DistributionInit, DistributionUpdate, DistributionMerge, DistributionSerialize, DistributionFinalize); | |
vector<DoubleVal> vals; | |
vector<IntVal> bins; | |
// Test empty input | |
cerr << "--TESTING NULL--" << endl; | |
if (!test.Execute(vals, bins, StringVal::null())) { | |
cerr << "Input empty: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
// Test values | |
cerr << "--TESTING 10 VALUE SAMPLE--" << endl; | |
for (int i = 0; i < 10; ++i) { | |
vals.push_back(DoubleVal(1)); | |
bins.push_back(IntVal(2)); | |
} | |
if (!test.Execute(vals, bins, StringVal("5.00,5.00"))) { | |
cerr << "Input small: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
// Test NULL values | |
cerr << "--TESTING NULL VALUE SAMPLE--" << endl; | |
for (int i = 0; i < 100; ++i) { | |
vals.push_back(DoubleVal::null()); | |
bins.push_back(IntVal(2)); | |
} | |
if (!test.Execute(vals, bins, StringVal("5.00,5.00"))) { | |
cerr << "Input small: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
vals.clear(); | |
bins.clear(); | |
// Test large values, > histogram size | |
cerr << "--TESTING LARGE VALUE SAMPLE--" << endl; | |
for (int i = 0; i < 10; ++i) { | |
vals.push_back(DoubleVal(3859658153.650001)); | |
bins.push_back(IntVal(2)); | |
} | |
if (!test.Execute(vals, bins, StringVal("25000.00,25000.00"))) { | |
cerr << "Input small: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
vals.clear(); | |
bins.clear(); | |
// Test count < bins | |
cerr << "--TESTING COUNT < BINS--" << endl; | |
for (int i = 0; i < 10; ++i) { | |
vals.push_back(DoubleVal(1)); | |
bins.push_back(IntVal(20)); | |
} | |
if (!test.Execute(vals, bins, StringVal::null())) { | |
cerr << "Input small: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
// Test biger values | |
vals.clear(); | |
bins.clear(); | |
StringVal res_equal = StringVal("1.00,1.00,1.00,1.00,1.00,1.00,1.00,1.00,1.00,1.00"); | |
// Test count < bins | |
cerr << "--TESTING COUNT == BINS--" << endl; | |
for (int i = 0; i < 10; ++i) { | |
vals.push_back(DoubleVal(1)); | |
bins.push_back(IntVal(10)); | |
} | |
if (!test.Execute(vals, bins, res_equal)) { | |
cerr << "Input small: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
vals.clear(); | |
bins.clear(); | |
StringVal res_not_int = StringVal("9.00,8.00,7.00,6.00,5.00,4.00"); | |
// Test count < bins | |
cerr << "--TESTING COUNT / BINS not integer--" << endl; | |
for (int i = 0; i < 10; ++i) { | |
vals.push_back(DoubleVal(i)); | |
bins.push_back(IntVal(6)); | |
} | |
if (!test.Execute(vals, bins, res_not_int)) { | |
cerr << "Input small: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
// Test biger values | |
vals.clear(); | |
bins.clear(); | |
StringVal res_big = StringVal("94950.00,84950.00,74950.00,64950.00,54950.00,44950.00,34950.00,24950.00,14950.00,4950.00"); | |
std::cout << "--TESTING 1000 UNSORTED VALUE SAMPLE--" << endl; | |
for (int i = 0; i < 1000; ++i) { | |
vals.push_back(DoubleVal(i)); | |
bins.push_back(IntVal(10)); | |
} | |
if (!test.Execute(vals, bins, res_big)) { | |
cerr << "Input medium: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
// Test big values | |
vals.clear(); | |
bins.clear(); | |
StringVal res_mil = StringVal("100000.00,100000.00,100000.00,100000.00,100000.00,100000.00,100000.00,100000.00,100000.00,100000.00"); | |
std::cout << "--TESTING 1.000.000 VALUE SAMPLE--" << endl; | |
for (int i = 0; i < 1000000; ++i) { | |
vals.push_back(DoubleVal(1)); | |
bins.push_back(IntVal(10)); | |
} | |
if (!test.Execute(vals, bins, res_mil)) { | |
cerr << "Input million: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
// Test big values | |
vals.clear(); | |
bins.clear(); | |
StringVal res_100mil = StringVal("10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00"); | |
std::cout << "--TESTING 100.000.000 VALUE SAMPLE--" << endl; | |
for (int i = 0; i < 100000000; ++i) { | |
vals.push_back(DoubleVal(1)); | |
bins.push_back(IntVal(10)); | |
} | |
if (!test.Execute(vals, bins, res_100mil)) { | |
cerr << "Input big: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
return true; | |
} | |
int main(int argc, char** argv) { | |
bool passed = true; | |
passed &= TestDistribution(); | |
cerr << (passed ? "Tests passed." : "Tests failed.") << endl; | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "uda-dist.h" | |
#include <assert.h> | |
#include <sstream> | |
#include <vector> | |
#include <string> | |
#include <algorithm> | |
#include <iostream> | |
#include <malloc.h> | |
#include <stdio.h> | |
#define DEBUG_ENABLED true | |
#define HISTOGRAM_SIZE 100001 | |
#define PRECISION 20 | |
using namespace impala_udf; | |
using namespace std; | |
// --------------------------------------------------------------------------- | |
// State object | |
// --------------------------------------------------------------------------- | |
struct DistributionStruct { | |
int bins; | |
int64_t elements[HISTOGRAM_SIZE]; | |
}; | |
// ---------------------------------------------------------------------------- | |
// Function to calculate distribution and convert to vector in final step | |
// --------------------------------------------------------------------------- | |
string distribution (const int64_t *histogram, const int bins) { | |
assert(bins > 0); | |
int64_t total_count = 0; | |
for(int i = 0; i < HISTOGRAM_SIZE; i++) {total_count += histogram[i];} | |
double result[bins]; | |
memset(result, 0, sizeof result); | |
int64_t bin_size = total_count / bins; | |
int64_t count = 0; | |
int64_t result_index = 0; | |
for (int i = HISTOGRAM_SIZE - 1; i >= 0; i-- ) { | |
count + histogram[i]; | |
if (count + histogram[i] < bin_size) { | |
result[result_index] += (double) i / PRECISION * histogram[i]; | |
count += histogram[i]; | |
} else { | |
result[result_index++] += (double) i / PRECISION * (bin_size - count); | |
count += histogram[i] - bin_size; | |
while (count >= bin_size) { | |
result[result_index++] = (double) i / PRECISION * bin_size; | |
count -= bin_size; | |
} | |
if (result_index >= bins) break; // remove overflow if total_count / bins not int | |
result[result_index] = (double) i / PRECISION * count; | |
} | |
} | |
// convert to string | |
stringstream ss; | |
ss.precision(2); | |
ss << fixed; | |
for (int i = 0; i < bins; i++) { | |
ss << result[i]; | |
if(i < (bins - 1)) { | |
ss << ","; | |
} | |
} | |
return ss.str(); | |
} | |
// --------------------------------------------------------------------------- | |
// Distribution aggregate function. | |
// --------------------------------------------------------------------------- | |
// Initialize the StringVal intermediate to a zero'd DistributionStruct | |
void DistributionInit(FunctionContext* context, StringVal* val) { | |
val->is_null = false; | |
val->len = sizeof(DistributionStruct); | |
val->ptr = context->Allocate(val->len); | |
memset(val->ptr, 0, val->len); | |
} | |
void DistributionUpdate(FunctionContext* context, const DoubleVal& input, | |
const IntVal& bins, StringVal* val) { | |
if (input.is_null || input.val < 0) return; | |
DistributionStruct* dist = reinterpret_cast<DistributionStruct*>(val->ptr); | |
int64_t value = (int64_t) (input.val * PRECISION); | |
value = value > HISTOGRAM_SIZE - 1 ? HISTOGRAM_SIZE - 1 : value; | |
dist->elements[value]++; | |
dist->bins = bins.val; | |
} | |
const StringVal DistributionSerialize(FunctionContext* context, const StringVal& val) { | |
const DistributionStruct* dist = reinterpret_cast<const DistributionStruct*>(val.ptr); | |
StringVal result(context, val.len); | |
memcpy(result.ptr, val.ptr, val.len); | |
context->Free(val.ptr); | |
return result; | |
} | |
void DistributionMerge(FunctionContext* context, const StringVal& src, StringVal* dst) { | |
if (src.is_null) return; | |
const DistributionStruct* src_dist = reinterpret_cast<const DistributionStruct*>(src.ptr); | |
DistributionStruct* dst_dist = reinterpret_cast<DistributionStruct*>(dst->ptr); | |
for(int i = 0; i < HISTOGRAM_SIZE; i++) { | |
dst_dist->elements[i] += src_dist->elements[i]; | |
} | |
if (src_dist->bins > 0) { | |
dst_dist->bins = src_dist->bins; | |
} | |
} | |
StringVal DistributionFinalize(FunctionContext* context, const StringVal& val) { | |
const DistributionStruct* dist = reinterpret_cast<const DistributionStruct*>(val.ptr); | |
StringVal result; | |
int count = 0; | |
for (int i = 1; i < HISTOGRAM_SIZE; i++) { | |
count += dist->elements[i]; | |
} | |
if (count == 0 || count < dist->bins) { | |
result = StringVal::null(); | |
} else { | |
string str = distribution(dist->elements, dist->bins); | |
StringVal string_val(context, str.size()); | |
memcpy(string_val.ptr, str.c_str(), str.size()); | |
result = string_val; | |
} | |
context->Free(val.ptr); | |
return result; | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef UDA_DIST_H | |
#define UDA_DIST_H | |
#include <impala_udf/udf.h> | |
using namespace impala_udf; | |
void DistributionInit(FunctionContext* context, StringVal* val); | |
void DistributionUpdate(FunctionContext* context, const DoubleVal& input, const IntVal& bins, StringVal* val); | |
void DistributionMerge(FunctionContext* context, const StringVal& src, StringVal* dst); | |
const StringVal DistributionSerialize(FunctionContext* context, const StringVal& val); | |
StringVal DistributionFinalize(FunctionContext* context, const StringVal& val); | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment