Last active
August 29, 2015 14:08
Failed Impala UDAF
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <math.h> | |
#include <impala_udf/uda-test-harness.h> | |
#include "uda-dist.h" | |
using namespace impala; | |
using namespace impala_udf; | |
using namespace std; | |
bool TestDistribution() { | |
UdaTestHarness2<StringVal, StringVal, DoubleVal, IntVal> test( | |
DistributionInit, DistributionUpdate, DistributionMerge, DistributionSerialize, DistributionFinalize); | |
test.SetIntermediateSize(20); | |
vector<DoubleVal> vals; | |
vector<IntVal> bins; | |
// Test empty input | |
cerr << "--TESTING NULL--" << endl; | |
if (!test.Execute(vals, bins, StringVal::null())) { | |
cerr << "Input empty: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
// Test values | |
cerr << "--TESTING 10 VALUE SAMPLE--" << endl; | |
for (int i = 0; i < 10; ++i) { | |
vals.push_back(DoubleVal(1)); | |
bins.push_back(IntVal(2)); | |
} | |
if (!test.Execute(vals, bins, StringVal("5.00,5.00"))) { | |
cerr << "Input small: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
// Test NULL values | |
cerr << "--TESTING NULL VALUE SAMPLE--" << endl; | |
for (int i = 0; i < 100; ++i) { | |
vals.push_back(DoubleVal::null()); | |
bins.push_back(IntVal(2)); | |
} | |
if (!test.Execute(vals, bins, StringVal("5.00,5.00"))) { | |
cerr << "Input small: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
vals.clear(); | |
bins.clear(); | |
// Test count < bins | |
cerr << "--TESTING COUNT < BINS--" << endl; | |
for (int i = 0; i < 10; ++i) { | |
vals.push_back(DoubleVal(1)); | |
bins.push_back(IntVal(20)); | |
} | |
if (!test.Execute(vals, bins, StringVal::null())) { | |
cerr << "Input small: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
// Test biger values | |
vals.clear(); | |
bins.clear(); | |
StringVal res_equal = StringVal("1.00,1.00,1.00,1.00,1.00,1.00,1.00,1.00,1.00,1.00"); | |
// Test count < bins | |
cerr << "--TESTING COUNT == BINS--" << endl; | |
for (int i = 0; i < 10; ++i) { | |
vals.push_back(DoubleVal(1)); | |
bins.push_back(IntVal(10)); | |
} | |
if (!test.Execute(vals, bins, res_equal)) { | |
cerr << "Input small: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
// Test biger values | |
vals.clear(); | |
bins.clear(); | |
StringVal res_big = StringVal("94950.00,84950.00,74950.00,64950.00,54950.00,44950.00,34950.00,24950.00,14950.00,4950.00"); | |
std::cout << "--TESTING 1000 UNSORTED VALUE SAMPLE--" << endl; | |
for (int i = 0; i < 1000; ++i) { | |
vals.push_back(DoubleVal(i)); | |
bins.push_back(IntVal(10)); | |
} | |
if (!test.Execute(vals, bins, res_big)) { | |
cerr << "Input medium: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
// Test big values | |
vals.clear(); | |
bins.clear(); | |
StringVal res_mil = StringVal("100000.00,100000.00,100000.00,100000.00,100000.00,100000.00,100000.00,100000.00,100000.00,100000.00"); | |
std::cout << "--TESTING 1.000.000 VALUE SAMPLE--" << endl; | |
for (int i = 0; i < 1000000; ++i) { | |
vals.push_back(DoubleVal(1)); | |
bins.push_back(IntVal(10)); | |
} | |
if (!test.Execute(vals, bins, res_mil)) { | |
cerr << "Input million: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
// Test big values | |
vals.clear(); | |
bins.clear(); | |
StringVal res_100mil = StringVal("10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00,10000000.00"); | |
std::cout << "--TESTING 100.000.000 VALUE SAMPLE--" << endl; | |
for (int i = 0; i < 100000000; ++i) { | |
vals.push_back(DoubleVal(1)); | |
bins.push_back(IntVal(10)); | |
} | |
if (!test.Execute(vals, bins, res_100mil)) { | |
cerr << "Input big: " << test.GetErrorMsg() << endl; | |
return false; | |
} | |
return true; | |
} | |
int main(int argc, char** argv) { | |
bool passed = true; | |
passed &= TestDistribution(); | |
cerr << (passed ? "Tests passed." : "Tests failed.") << endl; | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "uda-dist.h" | |
#include <assert.h> | |
#include <sstream> | |
#include <vector> | |
#include <string> | |
#include <algorithm> | |
#include <iostream> | |
#include <malloc.h> | |
#define DEBUG_ENABLED true | |
#define ELEMENT_ALLOCATION_SIZE 100000 | |
using namespace impala_udf; | |
using namespace std; | |
// --------------------------------------------------------------------------- | |
// State object | |
// --------------------------------------------------------------------------- | |
struct DistributionStruct { | |
int64_t count; | |
int64_t capacity; | |
int64_t bins; | |
double elements[1]; | |
}; | |
// --------------------------------------------------------------------------- | |
// Convert to StringVal | |
// --------------------------------------------------------------------------- | |
template <typename T> | |
StringVal ToStringVal(FunctionContext* context, const T& val) { | |
stringstream ss; | |
ss << val; | |
string str = ss.str(); | |
StringVal string_val(context, str.size()); | |
memcpy(string_val.ptr, str.c_str(), str.size()); | |
return string_val; | |
} | |
// --------------------------------------------------------------------------- | |
// Debug helpers | |
// --------------------------------------------------------------------------- | |
template <typename T> | |
void debugPrintStr(const char* name, const T& val) { | |
if (DEBUG_ENABLED) { | |
std::cout << "["; | |
std::cout.width(11); | |
std::cout << std::left << name << "] Message: " << val << std::endl; | |
} | |
} | |
// ---------------------------------------------------------------------------- | |
// Function to calculate distribution and convert to string | |
// --------------------------------------------------------------------------- | |
std::string distribution (double *elements, const int bins, const int count) { | |
assert(count > 0); | |
assert(count >= bins); | |
double result[bins]; | |
memset(result, 0, sizeof result); | |
std::sort(elements, elements + count, std::greater<double>()); | |
int bin_size = count / bins; | |
for (int i = 0; i < count; i++ ) { | |
result[i / bin_size] += elements[i]; | |
} | |
std::stringstream final; | |
final.precision(2); | |
for (int i = 0; i < bins; i++) { | |
final << std::fixed << result[i]; | |
if(i < (bins - 1)) { | |
final << ","; | |
} | |
} | |
return final.str(); | |
} | |
// --------------------------------------------------------------------------- | |
// Distribution aggregate function. | |
// --------------------------------------------------------------------------- | |
// Initialize the StringVal intermediate to a zero'd DistributionStruct | |
void DistributionInit(FunctionContext* context, StringVal* val) { | |
val->is_null = false; | |
val->len = sizeof(DistributionStruct); | |
val->ptr = context->Allocate(val->len + sizeof(double) * ELEMENT_ALLOCATION_SIZE); | |
memset(val->ptr, 0, val->len); | |
// Initialise accumualator | |
DistributionStruct* dist = reinterpret_cast<DistributionStruct*>(val->ptr); | |
dist->capacity = ELEMENT_ALLOCATION_SIZE; | |
} | |
void DistributionUpdate(FunctionContext* context, const DoubleVal& input, | |
const IntVal& bins, StringVal* val) { | |
if (input.is_null) return; | |
DistributionStruct* dist = reinterpret_cast<DistributionStruct*>(val->ptr); | |
if(dist->count >= dist->capacity) { | |
dist->capacity += ELEMENT_ALLOCATION_SIZE; | |
int new_size = val->len + dist->capacity * sizeof(double); | |
debugPrintStr("Reallocating old size:", malloc_usable_size(dist)); | |
debugPrintStr("Reallocating expected new size:", new_size); | |
val->ptr = context->Reallocate(val->ptr, new_size); | |
debugPrintStr("Reallocating allocated new size:", malloc_usable_size(dist)); | |
} | |
dist->elements[dist->count++] = input.val; | |
dist->bins = bins.val; | |
} | |
const StringVal DistributionSerialize(FunctionContext* context, const StringVal& val) { | |
const DistributionStruct* dist = reinterpret_cast<const DistributionStruct*>(val.ptr); | |
StringVal result(context, val.len + dist->capacity * sizeof(double)); | |
memcpy(result.ptr, val.ptr, val.len + dist->capacity * sizeof(double)); | |
context->Free(val.ptr); | |
return result; | |
} | |
void DistributionMerge(FunctionContext* context, const StringVal& src, StringVal* dst) { | |
if (src.is_null) return; | |
const DistributionStruct* src_dist = reinterpret_cast<const DistributionStruct*>(src.ptr); | |
DistributionStruct* dst_dist = reinterpret_cast<DistributionStruct*>(dst->ptr); | |
if (src_dist->count > 0) { | |
if(src_dist->count + dst_dist->count >= dst_dist->capacity) { | |
dst_dist->capacity = src_dist->count + dst_dist->count; | |
dst->ptr = context->Reallocate(dst->ptr, dst->len + dst_dist->capacity * sizeof(double)); | |
} | |
for(int i = 0; i < src_dist->count; i++) { | |
dst_dist->elements[dst_dist->count++] = src_dist->elements[i]; | |
} | |
dst_dist->bins = src_dist->bins; | |
} | |
} | |
StringVal DistributionFinalize(FunctionContext* context, const StringVal& val) { | |
DistributionStruct* dist = reinterpret_cast<DistributionStruct*>(val.ptr); | |
StringVal result; | |
if (dist->count < dist->bins || dist->count == 0) { | |
result = StringVal::null(); | |
} else { | |
std::string res_s = distribution(dist->elements, dist->bins, dist->count); | |
result = ToStringVal(context, res_s); | |
} | |
context->Free(val.ptr); | |
return result; | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef UDA_DIST_H | |
#define UDA_DIST_H | |
#include <impala_udf/udf.h> | |
using namespace impala_udf; | |
void DistributionInit(FunctionContext* context, StringVal* val); | |
void DistributionUpdate(FunctionContext* context, const DoubleVal& input, const IntVal& bins, StringVal* val); | |
void DistributionMerge(FunctionContext* context, const StringVal& src, StringVal* dst); | |
const StringVal DistributionSerialize(FunctionContext* context, const StringVal& val); | |
StringVal DistributionFinalize(FunctionContext* context, const StringVal& val); | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment