Skip to content

Instantly share code, notes, and snippets.

@programmatix
Last active October 17, 2023 14:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save programmatix/8346412e10029a625d6d6d550e5e06eb to your computer and use it in GitHub Desktop.
Save programmatix/8346412e10029a625d6d6d550e5e06eb to your computer and use it in GitHub Desktop.
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2021-Present Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <couchbase/cluster.hxx>
#include <couchbase/transactions.hxx>
#include <core/impl/subdoc/command_bundle.hxx>
#include <asio.hpp>
#include <fmt/chrono.h>
#include <fmt/format.h>
#include <tao/json/to_string.hpp>
#include <chrono>
#include <future>
#include <random>
#include <system_error>
struct program_arguments {
std::string connection_string{ "couchbase://127.0.0.1" };
std::string username{ "Administrator" };
std::string password{ "password" };
std::string bucket_name{ "default" };
std::string scope_name{ couchbase::scope::default_name };
std::string collection_name{ couchbase::collection::default_name };
std::size_t number_of_keys{ 1'000 };
std::size_t number_of_transactions{ 100 };
std::size_t number_of_keys_per_transaction{ 100 };
std::size_t document_body_size{ 1'024 };
std::chrono::seconds transaction_expiration_time{ 120 };
static auto load_from_environment() -> program_arguments
{
program_arguments arguments;
if (const auto* val = getenv("CB_CONNECTION_STRING"); val != nullptr && val[0] != '\0') {
arguments.connection_string = val;
}
if (const auto* val = getenv("CB_USERNAME"); val != nullptr && val[0] != '\0') {
arguments.username = val;
}
if (const auto* val = getenv("CB_PASSWORD"); val != nullptr && val[0] != '\0') {
arguments.password = val;
}
if (const auto* val = getenv("CB_BUCKET_NAME"); val != nullptr && val[0] != '\0') {
arguments.bucket_name = val;
}
if (const auto* val = getenv("CB_SCOPE_NAME"); val != nullptr && val[0] != '\0') {
arguments.scope_name = val;
}
if (const auto* val = getenv("CB_COLLECTION_NAME"); val != nullptr && val[0] != '\0') {
arguments.collection_name = val;
}
if (const auto* val = getenv("CB_NUMBER_OF_KEYS"); val != nullptr && val[0] != '\0') {
char* end = nullptr;
auto int_val = std::strtoul(val, &end, 10);
if (end != val) {
arguments.number_of_keys = int_val;
}
}
if (const auto* val = getenv("CB_NUMBER_OF_TRANSACTIONS"); val != nullptr && val[0] != '\0') {
char* end = nullptr;
auto int_val = std::strtoul(val, &end, 10);
if (end != val) {
arguments.number_of_transactions = int_val;
}
}
if (const auto* val = getenv("CB_NUMBER_OF_KEYS_PER_TRANSACTION"); val != nullptr && val[0] != '\0') {
char* end = nullptr;
auto int_val = std::strtoul(val, &end, 10);
if (end != val) {
arguments.number_of_keys_per_transaction = int_val;
}
}
if (const auto* val = getenv("CB_DOCUMENT_BODY_SIZE"); val != nullptr && val[0] != '\0') {
char* end = nullptr;
auto int_val = std::strtoul(val, &end, 10);
if (end != val) {
arguments.document_body_size = int_val;
}
}
if (const auto* val = getenv("CB_TRANSACTION_EXPIRATION_TIME"); val != nullptr && val[0] != '\0') {
char* end = nullptr;
auto int_val = std::strtoul(val, &end, 10);
if (end != val) {
arguments.transaction_expiration_time = std::chrono::seconds{ int_val };
}
}
return arguments;
}
};
std::string
random_text(std::size_t length)
{
std::string alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
static thread_local std::mt19937_64 gen{ std::random_device()() };
std::uniform_int_distribution<std::size_t> dis(0, alphabet.size() - 1);
std::string text(length, '-');
for (std::size_t i = 0; i < length; ++i) {
text[i] = alphabet[dis(gen)];
}
return text;
}
auto
generate_document(std::size_t document_body_size) -> tao::json::value
{
return {
{ "size", document_body_size },
{ "text", random_text(document_body_size) },
};
}
void
run_transactions_sync_workload(const std::vector<std::string> document_ids,
const std::shared_ptr<couchbase::transactions::transactions>& transactions,
const couchbase::collection& collection,
const program_arguments& arguments)
{
if (arguments.number_of_keys <= 0 || arguments.number_of_keys_per_transaction <= 0) {
return;
}
auto start = std::chrono::system_clock::now();
{
std::map<std::string, std::size_t> errors;
// std::vector<transaction_promise> results;
// results.resize(arguments.number_of_transactions);
auto exec_start = std::chrono::system_clock::now();
for (std::size_t i = 0; i < arguments.number_of_transactions; ++i) {
auto result =
transactions->run([&collection, &document_ids, &arguments, &errors](couchbase::transactions::attempt_context& attempt) {
std::vector<std::string> selected_keys;
std::sample(document_ids.begin(),
document_ids.end(),
std::back_insert_iterator(selected_keys),
arguments.number_of_keys_per_transaction,
std::mt19937_64{ std::random_device()() });
for (const auto& id : selected_keys) {
auto get_result = attempt.get(collection, id);
}
});
}
auto exec_end = std::chrono::system_clock::now();
fmt::print("\rExecuted {} sync transactions with {} GET operations in {}ms ({}us, {}s), average latency: {}ms\n",
arguments.number_of_transactions,
arguments.number_of_keys_per_transaction,
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count(),
std::chrono::duration_cast<std::chrono::microseconds>(exec_end - exec_start).count(),
std::chrono::duration_cast<std::chrono::seconds>(exec_end - exec_start).count(),
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count() /
arguments.number_of_keys_per_transaction);
}
}
void
run_transactions_async_workload(const std::vector<std::string> document_ids,
const std::shared_ptr<couchbase::transactions::transactions>& transactions,
const couchbase::collection& collection,
const program_arguments& arguments)
{
if (arguments.number_of_keys <= 0 || arguments.number_of_keys_per_transaction <= 0) {
return;
}
auto start = std::chrono::system_clock::now();
{
std::map<std::string, std::size_t> errors;
using transaction_promise =
std::promise<std::pair<couchbase::transaction_error_context, couchbase::transactions::transaction_result>>;
std::vector<transaction_promise> results;
results.resize(arguments.number_of_transactions);
auto schedule_start = std::chrono::system_clock::now();
for (std::size_t i = 0; i < arguments.number_of_transactions; ++i) {
transactions->run(
[&collection, &document_ids, &arguments, &errors](couchbase::transactions::async_attempt_context& attempt) {
std::vector<std::string> selected_keys;
std::sample(document_ids.begin(),
document_ids.end(),
std::back_insert_iterator(selected_keys),
arguments.number_of_keys_per_transaction,
std::mt19937_64{ std::random_device()() });
for (const auto& id : selected_keys) {
attempt.get(collection, id, [&attempt, &collection, id, &arguments, &errors](auto ctx, auto res) {
//fmt::print("did transactions get\n");
if (ctx.ec()) {
errors[ctx.ec().message()]++;
}
});
}
},
[&promise = results[i]](auto err, auto result) {
promise.set_value({ err, result });
});
}
auto schedule_end = std::chrono::system_clock::now();
fmt::print("\rScheduled {} transactions with {} GET operations in {}ms ({}us, {}s)\n",
arguments.number_of_transactions,
arguments.number_of_keys_per_transaction,
std::chrono::duration_cast<std::chrono::milliseconds>(schedule_end - schedule_start).count(),
std::chrono::duration_cast<std::chrono::microseconds>(schedule_end - schedule_start).count(),
std::chrono::duration_cast<std::chrono::seconds>(schedule_end - schedule_start).count());
std::map<std::string, std::size_t> transactions_errors;
auto exec_start = std::chrono::system_clock::now();
for (auto& promise : results) {
auto [err, result] = promise.get_future().get();
if (err.ec()) {
transactions_errors[fmt::format("error={}, cause={}", err.ec().message(), err.cause().message())]++;
}
}
auto exec_end = std::chrono::system_clock::now();
fmt::print("\rExecuted {} transactions with {} GET operations in {}ms ({}us, {}s), average latency: {}ms\n",
arguments.number_of_transactions,
arguments.number_of_keys_per_transaction,
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count(),
std::chrono::duration_cast<std::chrono::microseconds>(exec_end - exec_start).count(),
std::chrono::duration_cast<std::chrono::seconds>(exec_end - exec_start).count(),
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count() /
arguments.number_of_keys_per_transaction);
if (transactions_errors.empty()) {
fmt::print("\tAll transactions completed successfully\n");
} else {
fmt::print("\tSome transactions completed with errors:\n");
for (auto [error, hits] : transactions_errors) {
fmt::print("\t\t{}: {}\n", error, hits);
}
}
if (errors.empty()) {
fmt::print("\tAll operations completed successfully\n");
} else {
fmt::print("\tSome operations completed with errors:\n");
for (auto [error, hits] : errors) {
fmt::print("\t\t{}: {}\n", error, hits);
}
}
}
auto end = std::chrono::system_clock::now();
fmt::print("Total time for bulk execution {}ms ({}us, {}s)\n",
std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count(),
std::chrono::duration_cast<std::chrono::microseconds>(end - start).count(),
std::chrono::duration_cast<std::chrono::seconds>(end - start).count());
}
void
run_kv_get_workload(const std::vector<std::string> document_ids,
const couchbase::collection& collection,
const program_arguments& arguments)
{
if (arguments.number_of_keys <= 0 || arguments.number_of_keys_per_transaction <= 0) {
return;
}
auto start = std::chrono::system_clock::now();
{
std::map<std::string, std::size_t> errors;
auto exec_start = std::chrono::system_clock::now();
for (std::size_t i = 0; i < arguments.number_of_transactions; ++i) {
std::vector<std::string> selected_keys;
std::sample(document_ids.begin(),
document_ids.end(),
std::back_insert_iterator(selected_keys),
arguments.number_of_keys_per_transaction,
std::mt19937_64{ std::random_device()() });
for (const auto& id : selected_keys) {
auto future = collection.get(id);
auto [ctx, result] = future.get();
if (ctx.ec()) {
errors[ctx.ec().message()]++;
}
}
}
auto exec_end = std::chrono::system_clock::now();
fmt::print("\rExecuted {} batches with {} KV GET operations in {}ms ({}us, {}s), average latency: {}ms\n",
arguments.number_of_transactions,
arguments.number_of_keys_per_transaction,
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count(),
std::chrono::duration_cast<std::chrono::microseconds>(exec_end - exec_start).count(),
std::chrono::duration_cast<std::chrono::seconds>(exec_end - exec_start).count(),
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count() /
arguments.number_of_keys_per_transaction);
if (errors.empty()) {
fmt::print("\tAll operations completed successfully\n");
} else {
fmt::print("\tSome operations completed with errors:\n");
for (auto [error, hits] : errors) {
fmt::print("\t\t{}: {}\n", error, hits);
}
}
}
auto end = std::chrono::system_clock::now();
fmt::print("Total time for bulk execution {}ms ({}us, {}s)\n",
std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count(),
std::chrono::duration_cast<std::chrono::microseconds>(end - start).count(),
std::chrono::duration_cast<std::chrono::seconds>(end - start).count());
}
void
run_kv_lookup_in_workload(const std::vector<std::string> document_ids, const couchbase::collection& collection, const program_arguments& arguments)
{
if (arguments.number_of_keys <= 0 || arguments.number_of_keys_per_transaction <= 0) {
return;
}
auto start = std::chrono::system_clock::now();
{
std::map<std::string, std::size_t> errors;
auto exec_start = std::chrono::system_clock::now();
std::string uuid = "ca08c84a-1e29-4964-a84e-b5981ffe8a13";
for (std::size_t i = 0; i < arguments.number_of_transactions; ++i) {
std::vector<std::string> selected_keys;
std::sample(document_ids.begin(),
document_ids.end(),
std::back_insert_iterator(selected_keys),
arguments.number_of_keys_per_transaction,
std::mt19937_64{ std::random_device()() });
for (const auto& id : selected_keys) {
auto specs = couchbase::lookup_in_specs{
couchbase::lookup_in_specs::get("txn.id1." + uuid).xattr(),
couchbase::lookup_in_specs::get("txn.id2." + uuid).xattr(),
couchbase::lookup_in_specs::get("txn.id3." + uuid).xattr(),
couchbase::lookup_in_specs::get("txn.id4." + uuid).xattr(),
couchbase::lookup_in_specs::get("txn.op.stgd").xattr(),
couchbase::lookup_in_specs::get("txn.atr.bkt").xattr(),
couchbase::lookup_in_specs::get("txn.atr.scp").xattr(),
couchbase::lookup_in_specs::get("txn.atr.coll").xattr(),
couchbase::lookup_in_specs::get("txn.restore").xattr(),
couchbase::lookup_in_specs::get("txn.op.type").xattr(),
couchbase::lookup_in_specs::get(couchbase::subdoc::lookup_in_macro::document).xattr(),
couchbase::lookup_in_specs::get("txn.op.crc32").xattr(),
couchbase::lookup_in_specs::get("txn.fc").xattr(),
couchbase::lookup_in_specs::get(""),
};
auto future = collection.lookup_in(id, specs);
auto [ctx, result] = future.get();
if (ctx.ec()) {
errors[ctx.ec().message()]++;
}
}
}
auto exec_end = std::chrono::system_clock::now();
fmt::print("\rExecuted {} batches with {} KV lookup_in operations in {}ms ({}us, {}s), average latency: {}ms\n",
arguments.number_of_transactions,
arguments.number_of_keys_per_transaction,
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count(),
std::chrono::duration_cast<std::chrono::microseconds>(exec_end - exec_start).count(),
std::chrono::duration_cast<std::chrono::seconds>(exec_end - exec_start).count(),
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count() /
arguments.number_of_keys_per_transaction);
if (errors.empty()) {
fmt::print("\tAll operations completed successfully\n");
} else {
fmt::print("\tSome operations completed with errors:\n");
for (auto [error, hits] : errors) {
fmt::print("\t\t{}: {}\n", error, hits);
}
}
}
}
void
run_kv_lookup_in_workload_optimised(const std::vector<std::string> document_ids,
const couchbase::collection& collection,
const program_arguments& arguments)
{
if (arguments.number_of_keys <= 0 || arguments.number_of_keys_per_transaction <= 0) {
return;
}
auto start = std::chrono::system_clock::now();
{
std::map<std::string, std::size_t> errors;
std::string uuid = "ca08c84a-1e29-4964-a84e-b5981ffe8a13";
std::vector<couchbase::subdoc::get> specsList = std::vector{
couchbase::lookup_in_specs::get("txn.id1." + uuid).xattr(),
couchbase::lookup_in_specs::get("txn.id2." + uuid).xattr(),
couchbase::lookup_in_specs::get("txn.id3." + uuid).xattr(),
couchbase::lookup_in_specs::get("txn.id4." + uuid).xattr(),
couchbase::lookup_in_specs::get("txn.op.stgd").xattr(),
couchbase::lookup_in_specs::get("txn.atr.bkt").xattr(),
couchbase::lookup_in_specs::get("txn.atr.scp").xattr(),
couchbase::lookup_in_specs::get("txn.atr.coll").xattr(),
couchbase::lookup_in_specs::get("txn.restore").xattr(),
couchbase::lookup_in_specs::get("txn.op.type").xattr(),
couchbase::lookup_in_specs::get(couchbase::subdoc::lookup_in_macro::document).xattr(),
couchbase::lookup_in_specs::get("txn.op.crc32").xattr(),
couchbase::lookup_in_specs::get("txn.fc").xattr(),
couchbase::lookup_in_specs::get(""),
};
couchbase::core::impl::subdoc::command_bundle cb;
size_t vecSize = specsList.size();
for (unsigned int i = 0; i < vecSize; i++) {
auto operation = specsList[i];
operation.encodePublic(cb);
}
auto ptr = std::make_shared<couchbase::core::impl::subdoc::command_bundle>(cb);
auto specs = couchbase::lookup_in_specs(ptr);
auto exec_start = std::chrono::system_clock::now();
for (std::size_t i = 0; i < arguments.number_of_transactions; ++i) {
std::vector<std::string> selected_keys;
std::sample(document_ids.begin(),
document_ids.end(),
std::back_insert_iterator(selected_keys),
arguments.number_of_keys_per_transaction,
std::mt19937_64{ std::random_device()() });
for (const auto& id : selected_keys) {
auto future = collection.lookup_in(id, specs);
auto [ctx, result] = future.get();
if (ctx.ec()) {
errors[ctx.ec().message()]++;
}
}
}
auto exec_end = std::chrono::system_clock::now();
fmt::print("\rExecuted {} batches with {} KV optimised lookup_in operations in {}ms ({}us, {}s), average latency: {}ms\n",
arguments.number_of_transactions,
arguments.number_of_keys_per_transaction,
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count(),
std::chrono::duration_cast<std::chrono::microseconds>(exec_end - exec_start).count(),
std::chrono::duration_cast<std::chrono::seconds>(exec_end - exec_start).count(),
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count() /
arguments.number_of_keys_per_transaction);
if (errors.empty()) {
fmt::print("\tAll operations completed successfully\n");
} else {
fmt::print("\tSome operations completed with errors:\n");
for (auto [error, hits] : errors) {
fmt::print("\t\t{}: {}\n", error, hits);
}
}
}
}
int
main()
{
auto arguments = program_arguments::load_from_environment();
fmt::print("CB_CONNECTION_STRING={}\n", arguments.connection_string);
fmt::print("CB_USERNAME={}\n", arguments.username);
fmt::print("CB_PASSWORD={}\n", arguments.password);
fmt::print("CB_BUCKET_NAME={}\n", arguments.bucket_name);
fmt::print("CB_SCOPE_NAME={}\n", arguments.scope_name);
fmt::print("CB_COLLECTION_NAME={}\n", arguments.collection_name);
fmt::print("CB_NUMBER_OF_KEYS={}\n", arguments.number_of_keys);
fmt::print("CB_NUMBER_OF_TRANSACTIONS={}\n", arguments.number_of_transactions);
fmt::print("CB_NUMBER_OF_KEYS_PER_TRANSACTION={}\n", arguments.number_of_keys_per_transaction);
fmt::print("CB_DOCUMENT_BODY_SIZE={}\n", arguments.document_body_size);
fmt::print("CB_TRANSACTION_EXPIRATION_TIME={}\n", arguments.transaction_expiration_time.count());
asio::io_context io;
auto guard = asio::make_work_guard(io);
std::thread io_thread([&io]() { io.run(); });
auto options = couchbase::cluster_options(arguments.username, arguments.password);
options.apply_profile("wan_development");
options.transactions().expiration_time(arguments.transaction_expiration_time);
auto [cluster, ec] = couchbase::cluster::connect(io, arguments.connection_string, options).get();
if (ec) {
fmt::print("Unable to connect to cluster at \"{}\", error: {}\n", arguments.connection_string, ec.message());
} else {
auto transactions = cluster.transactions();
auto collection = cluster.bucket(arguments.bucket_name).scope(arguments.scope_name).collection(arguments.collection_name);
const std::string document_id_prefix{ "tx_mix" };
std::vector<std::string> document_ids;
auto document_body = generate_document(arguments.document_body_size);
document_ids.reserve(arguments.number_of_keys);
for (std::size_t i = 0; i < arguments.number_of_keys; ++i) {
auto document_id = fmt::format("{}_{:06d}", document_id_prefix, i);
document_ids.emplace_back(document_id);
collection.upsert(document_id, document_body);
}
fmt::print(
"Using {} IDs in interval [\"{}\"...\"{}\"]\n", document_ids.size(), document_ids[0], document_ids[document_ids.size() - 1]);
run_kv_get_workload(document_ids, collection, arguments);
run_kv_lookup_in_workload(document_ids, collection, arguments);
run_kv_lookup_in_workload_optimised(document_ids, collection, arguments);
// run_transactions_async_workload(document_ids, transactions, collection, arguments);
//run_transactions_sync_workload(document_ids, transactions, collection, arguments);
}
cluster.close();
guard.reset();
io_thread.join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment