Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
An example based on Kudu sample C++ client app, showing a bug with RLE column encoding
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <ctime>
#include <iostream>
#include <sstream>
#include <unistd.h>
#include <kudu/client/callbacks.h>
#include "kudu/client/client.h"
#include "kudu/client/row_result.h"
#include "kudu/client/stubs.h"
#include "kudu/client/value.h"
#include "kudu/common/partial_row.h"
#include "kudu/client/shared_ptr.h"
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::KuduColumnSchema;
using kudu::client::KuduColumnStorageAttributes;
using kudu::client::KuduError;
using kudu::client::KuduInsert;
using kudu::client::KuduPredicate;
using kudu::client::KuduRowResult;
using kudu::client::KuduScanner;
using kudu::client::KuduSchema;
using kudu::client::KuduSchemaBuilder;
using kudu::client::KuduSession;
using kudu::client::KuduStatusFunctionCallback;
using kudu::client::KuduTable;
using kudu::client::KuduTableAlterer;
using kudu::client::KuduTableCreator;
using kudu::client::KuduValue;
using kudu::client::sp::shared_ptr;
using kudu::KuduPartialRow;
using kudu::MonoDelta;
using kudu::Status;
using std::string;
using std::stringstream;
using std::vector;
static Status CreateClient(const string& addr,
shared_ptr<KuduClient>* client) {
return KuduClientBuilder()
.add_master_server_addr(addr)
.default_admin_operation_timeout(MonoDelta::FromSeconds(20))
.Build(client);
}
static KuduSchema CreateSchema() {
KuduSchema schema;
KuduSchemaBuilder b;
b.AddColumn("key")->Type(KuduColumnSchema::INT32)->Encoding(KuduColumnStorageAttributes::EncodingType::RLE)->NotNull()->PrimaryKey();
b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
KUDU_CHECK_OK(b.Build(&schema));
return schema;
}
static Status DoesTableExist(const shared_ptr<KuduClient>& client,
const string& table_name,
bool *exists) {
shared_ptr<KuduTable> table;
Status s = client->OpenTable(table_name, &table);
if (s.ok()) {
*exists = true;
} else if (s.IsNotFound()) {
*exists = false;
s = Status::OK();
}
return s;
}
static Status CreateTable(const shared_ptr<KuduClient>& client,
const string& table_name,
const KuduSchema& schema,
int num_tablets) {
// Create the table.
KuduTableCreator* table_creator = client->NewTableCreator();
Status s = table_creator->table_name(table_name)
.schema(&schema)
.num_replicas(1)
.Create();
delete table_creator;
return s;
}
static void StatusCB(void* unused, const Status& status) {
KUDU_LOG(INFO) << "Asynchronous flush finished with status: "
<< status.ToString();
}
static Status InsertRows(const shared_ptr<KuduTable>& table, int num_rows) {
shared_ptr<KuduSession> session = table->client()->NewSession();
KUDU_RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
session->SetTimeoutMillis(5000);
for (int i = 0; i < num_rows; i++) {
KuduInsert* insert = table->NewInsert();
KuduPartialRow* row = insert->mutable_row();
KUDU_CHECK_OK(row->SetInt32("key", 0-i));
KUDU_CHECK_OK(row->SetInt32("int_val", i * 2));
KUDU_CHECK_OK(session->Apply(insert));
}
Status s = session->Flush();
if (s.ok()) {
return s;
}
// Test asynchronous flush.
KuduStatusFunctionCallback<void*> status_cb(&StatusCB, NULL);
session->FlushAsync(&status_cb);
// Look at the session's errors.
vector<KuduError*> errors;
bool overflow;
session->GetPendingErrors(&errors, &overflow);
s = overflow ? Status::IOError("Overflowed pending errors in session") :
errors.front()->status();
while (!errors.empty()) {
delete errors.back();
errors.pop_back();
}
KUDU_RETURN_NOT_OK(s);
// Close the session.
return session->Close();
}
static Status ScanRows(const shared_ptr<KuduTable>& table) {
KuduScanner scanner(table.get());
// This will give a different result
// KuduPredicate* p = table->NewComparisonPredicate(
// "key", KuduPredicate::EQUAL, KuduValue::FromInt(-1));
// KUDU_RETURN_NOT_OK(scanner.AddConjunctPredicate(p));
KUDU_RETURN_NOT_OK(scanner.Open());
vector<KuduRowResult> results;
int count = 0;
while (scanner.HasMoreRows()) {
KUDU_RETURN_NOT_OK(scanner.NextBatch(&results));
for (vector<KuduRowResult>::iterator iter = results.begin();
iter != results.end();
iter++) {
const KuduRowResult &result = *iter;
int32_t val;
KUDU_RETURN_NOT_OK(result.GetInt32("key", &val));
if (val == -1) {
++count;
}
}
results.clear();
}
KUDU_LOG(INFO) << "Rows with key equal to -1: " << count;
return Status::OK();
}
static void LogCb(void* unused,
kudu::client::KuduLogSeverity severity,
const char* filename,
int line_number,
const struct ::tm* time,
const char* message,
size_t message_len) {
KUDU_LOG(INFO) << "Received log message from Kudu client library";
KUDU_LOG(INFO) << " Severity: " << severity;
KUDU_LOG(INFO) << " Filename: " << filename;
KUDU_LOG(INFO) << " Line number: " << line_number;
char time_buf[32];
// Example: Tue Mar 24 11:46:43 2015.
KUDU_CHECK(strftime(time_buf, sizeof(time_buf), "%a %b %d %T %Y", time));
KUDU_LOG(INFO) << " Time: " << time_buf;
KUDU_LOG(INFO) << " Message: " << string(message, message_len);
}
int main(int argc, char* argv[]) {
KUDU_LOG(INFO) << "Running with Kudu client version: " <<
kudu::client::GetShortVersionString();
KUDU_LOG(INFO) << "Long version info: " <<
kudu::client::GetAllVersionInfo();
kudu::client::KuduLoggingFunctionCallback<void*> log_cb(&LogCb, NULL);
kudu::client::InstallLoggingCallback(&log_cb);
if (argc != 2) {
KUDU_LOG(FATAL) << "usage: " << argv[0] << " <master host>";
}
const string master_host = argv[1];
const string kTableName = "test_table";
// Enable verbose debugging for the client library.
kudu::client::SetVerboseLogLevel(0);
// Create and connect a client.
shared_ptr<KuduClient> client;
KUDU_CHECK_OK(CreateClient(master_host, &client));
KUDU_LOG(INFO) << "Created a client connection";
// Disable the verbose logging.
kudu::client::SetVerboseLogLevel(0);
// Create a schema.
KuduSchema schema(CreateSchema());
KUDU_LOG(INFO) << "Created a schema";
// Create a table with that schema.
bool exists;
KUDU_CHECK_OK(DoesTableExist(client, kTableName, &exists));
if (exists) {
client->DeleteTable(kTableName);
KUDU_LOG(INFO) << "Deleting old table before creating new one";
}
KUDU_CHECK_OK(CreateTable(client, kTableName, schema, 10));
KUDU_LOG(INFO) << "Created a table";
// Insert some rows into the table.
shared_ptr<KuduTable> table;
KUDU_CHECK_OK(client->OpenTable(kTableName, &table));
KUDU_CHECK_OK(InsertRows(table, 100000));
KUDU_LOG(INFO) << "Inserted some rows into a table";
// Scan some rows.
KUDU_CHECK_OK(ScanRows(table));
KUDU_LOG(INFO) << "Scanned some rows out of a table";
KUDU_LOG(INFO) << "Waiting for compaction";
sleep(180);
KUDU_CHECK_OK(ScanRows(table));
KUDU_LOG(INFO) << "Scanned some rows out of a table";
// Done!
KUDU_LOG(INFO) << "Done";
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment