Last active
October 17, 2023 14:29
-
-
Save programmatix/8346412e10029a625d6d6d550e5e06eb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ | |
/* | |
* Copyright 2021-Present Couchbase, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
#include <couchbase/cluster.hxx> | |
#include <couchbase/transactions.hxx> | |
#include <core/impl/subdoc/command_bundle.hxx> | |
#include <asio.hpp> | |
#include <fmt/chrono.h> | |
#include <fmt/format.h> | |
#include <tao/json/to_string.hpp> | |
#include <chrono> | |
#include <future> | |
#include <random> | |
#include <system_error> | |
struct program_arguments { | |
std::string connection_string{ "couchbase://127.0.0.1" }; | |
std::string username{ "Administrator" }; | |
std::string password{ "password" }; | |
std::string bucket_name{ "default" }; | |
std::string scope_name{ couchbase::scope::default_name }; | |
std::string collection_name{ couchbase::collection::default_name }; | |
std::size_t number_of_keys{ 1'000 }; | |
std::size_t number_of_transactions{ 100 }; | |
std::size_t number_of_keys_per_transaction{ 100 }; | |
std::size_t document_body_size{ 1'024 }; | |
std::chrono::seconds transaction_expiration_time{ 120 }; | |
static auto load_from_environment() -> program_arguments | |
{ | |
program_arguments arguments; | |
if (const auto* val = getenv("CB_CONNECTION_STRING"); val != nullptr && val[0] != '\0') { | |
arguments.connection_string = val; | |
} | |
if (const auto* val = getenv("CB_USERNAME"); val != nullptr && val[0] != '\0') { | |
arguments.username = val; | |
} | |
if (const auto* val = getenv("CB_PASSWORD"); val != nullptr && val[0] != '\0') { | |
arguments.password = val; | |
} | |
if (const auto* val = getenv("CB_BUCKET_NAME"); val != nullptr && val[0] != '\0') { | |
arguments.bucket_name = val; | |
} | |
if (const auto* val = getenv("CB_SCOPE_NAME"); val != nullptr && val[0] != '\0') { | |
arguments.scope_name = val; | |
} | |
if (const auto* val = getenv("CB_COLLECTION_NAME"); val != nullptr && val[0] != '\0') { | |
arguments.collection_name = val; | |
} | |
if (const auto* val = getenv("CB_NUMBER_OF_KEYS"); val != nullptr && val[0] != '\0') { | |
char* end = nullptr; | |
auto int_val = std::strtoul(val, &end, 10); | |
if (end != val) { | |
arguments.number_of_keys = int_val; | |
} | |
} | |
if (const auto* val = getenv("CB_NUMBER_OF_TRANSACTIONS"); val != nullptr && val[0] != '\0') { | |
char* end = nullptr; | |
auto int_val = std::strtoul(val, &end, 10); | |
if (end != val) { | |
arguments.number_of_transactions = int_val; | |
} | |
} | |
if (const auto* val = getenv("CB_NUMBER_OF_KEYS_PER_TRANSACTION"); val != nullptr && val[0] != '\0') { | |
char* end = nullptr; | |
auto int_val = std::strtoul(val, &end, 10); | |
if (end != val) { | |
arguments.number_of_keys_per_transaction = int_val; | |
} | |
} | |
if (const auto* val = getenv("CB_DOCUMENT_BODY_SIZE"); val != nullptr && val[0] != '\0') { | |
char* end = nullptr; | |
auto int_val = std::strtoul(val, &end, 10); | |
if (end != val) { | |
arguments.document_body_size = int_val; | |
} | |
} | |
if (const auto* val = getenv("CB_TRANSACTION_EXPIRATION_TIME"); val != nullptr && val[0] != '\0') { | |
char* end = nullptr; | |
auto int_val = std::strtoul(val, &end, 10); | |
if (end != val) { | |
arguments.transaction_expiration_time = std::chrono::seconds{ int_val }; | |
} | |
} | |
return arguments; | |
} | |
}; | |
std::string | |
random_text(std::size_t length) | |
{ | |
std::string alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; | |
static thread_local std::mt19937_64 gen{ std::random_device()() }; | |
std::uniform_int_distribution<std::size_t> dis(0, alphabet.size() - 1); | |
std::string text(length, '-'); | |
for (std::size_t i = 0; i < length; ++i) { | |
text[i] = alphabet[dis(gen)]; | |
} | |
return text; | |
} | |
auto | |
generate_document(std::size_t document_body_size) -> tao::json::value | |
{ | |
return { | |
{ "size", document_body_size }, | |
{ "text", random_text(document_body_size) }, | |
}; | |
} | |
void | |
run_transactions_sync_workload(const std::vector<std::string> document_ids, | |
const std::shared_ptr<couchbase::transactions::transactions>& transactions, | |
const couchbase::collection& collection, | |
const program_arguments& arguments) | |
{ | |
if (arguments.number_of_keys <= 0 || arguments.number_of_keys_per_transaction <= 0) { | |
return; | |
} | |
auto start = std::chrono::system_clock::now(); | |
{ | |
std::map<std::string, std::size_t> errors; | |
// std::vector<transaction_promise> results; | |
// results.resize(arguments.number_of_transactions); | |
auto exec_start = std::chrono::system_clock::now(); | |
for (std::size_t i = 0; i < arguments.number_of_transactions; ++i) { | |
auto result = | |
transactions->run([&collection, &document_ids, &arguments, &errors](couchbase::transactions::attempt_context& attempt) { | |
std::vector<std::string> selected_keys; | |
std::sample(document_ids.begin(), | |
document_ids.end(), | |
std::back_insert_iterator(selected_keys), | |
arguments.number_of_keys_per_transaction, | |
std::mt19937_64{ std::random_device()() }); | |
for (const auto& id : selected_keys) { | |
auto get_result = attempt.get(collection, id); | |
} | |
}); | |
} | |
auto exec_end = std::chrono::system_clock::now(); | |
fmt::print("\rExecuted {} sync transactions with {} GET operations in {}ms ({}us, {}s), average latency: {}ms\n", | |
arguments.number_of_transactions, | |
arguments.number_of_keys_per_transaction, | |
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count(), | |
std::chrono::duration_cast<std::chrono::microseconds>(exec_end - exec_start).count(), | |
std::chrono::duration_cast<std::chrono::seconds>(exec_end - exec_start).count(), | |
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count() / | |
arguments.number_of_keys_per_transaction); | |
} | |
} | |
void | |
run_transactions_async_workload(const std::vector<std::string> document_ids, | |
const std::shared_ptr<couchbase::transactions::transactions>& transactions, | |
const couchbase::collection& collection, | |
const program_arguments& arguments) | |
{ | |
if (arguments.number_of_keys <= 0 || arguments.number_of_keys_per_transaction <= 0) { | |
return; | |
} | |
auto start = std::chrono::system_clock::now(); | |
{ | |
std::map<std::string, std::size_t> errors; | |
using transaction_promise = | |
std::promise<std::pair<couchbase::transaction_error_context, couchbase::transactions::transaction_result>>; | |
std::vector<transaction_promise> results; | |
results.resize(arguments.number_of_transactions); | |
auto schedule_start = std::chrono::system_clock::now(); | |
for (std::size_t i = 0; i < arguments.number_of_transactions; ++i) { | |
transactions->run( | |
[&collection, &document_ids, &arguments, &errors](couchbase::transactions::async_attempt_context& attempt) { | |
std::vector<std::string> selected_keys; | |
std::sample(document_ids.begin(), | |
document_ids.end(), | |
std::back_insert_iterator(selected_keys), | |
arguments.number_of_keys_per_transaction, | |
std::mt19937_64{ std::random_device()() }); | |
for (const auto& id : selected_keys) { | |
attempt.get(collection, id, [&attempt, &collection, id, &arguments, &errors](auto ctx, auto res) { | |
//fmt::print("did transactions get\n"); | |
if (ctx.ec()) { | |
errors[ctx.ec().message()]++; | |
} | |
}); | |
} | |
}, | |
[&promise = results[i]](auto err, auto result) { | |
promise.set_value({ err, result }); | |
}); | |
} | |
auto schedule_end = std::chrono::system_clock::now(); | |
fmt::print("\rScheduled {} transactions with {} GET operations in {}ms ({}us, {}s)\n", | |
arguments.number_of_transactions, | |
arguments.number_of_keys_per_transaction, | |
std::chrono::duration_cast<std::chrono::milliseconds>(schedule_end - schedule_start).count(), | |
std::chrono::duration_cast<std::chrono::microseconds>(schedule_end - schedule_start).count(), | |
std::chrono::duration_cast<std::chrono::seconds>(schedule_end - schedule_start).count()); | |
std::map<std::string, std::size_t> transactions_errors; | |
auto exec_start = std::chrono::system_clock::now(); | |
for (auto& promise : results) { | |
auto [err, result] = promise.get_future().get(); | |
if (err.ec()) { | |
transactions_errors[fmt::format("error={}, cause={}", err.ec().message(), err.cause().message())]++; | |
} | |
} | |
auto exec_end = std::chrono::system_clock::now(); | |
fmt::print("\rExecuted {} transactions with {} GET operations in {}ms ({}us, {}s), average latency: {}ms\n", | |
arguments.number_of_transactions, | |
arguments.number_of_keys_per_transaction, | |
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count(), | |
std::chrono::duration_cast<std::chrono::microseconds>(exec_end - exec_start).count(), | |
std::chrono::duration_cast<std::chrono::seconds>(exec_end - exec_start).count(), | |
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count() / | |
arguments.number_of_keys_per_transaction); | |
if (transactions_errors.empty()) { | |
fmt::print("\tAll transactions completed successfully\n"); | |
} else { | |
fmt::print("\tSome transactions completed with errors:\n"); | |
for (auto [error, hits] : transactions_errors) { | |
fmt::print("\t\t{}: {}\n", error, hits); | |
} | |
} | |
if (errors.empty()) { | |
fmt::print("\tAll operations completed successfully\n"); | |
} else { | |
fmt::print("\tSome operations completed with errors:\n"); | |
for (auto [error, hits] : errors) { | |
fmt::print("\t\t{}: {}\n", error, hits); | |
} | |
} | |
} | |
auto end = std::chrono::system_clock::now(); | |
fmt::print("Total time for bulk execution {}ms ({}us, {}s)\n", | |
std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count(), | |
std::chrono::duration_cast<std::chrono::microseconds>(end - start).count(), | |
std::chrono::duration_cast<std::chrono::seconds>(end - start).count()); | |
} | |
void | |
run_kv_get_workload(const std::vector<std::string> document_ids, | |
const couchbase::collection& collection, | |
const program_arguments& arguments) | |
{ | |
if (arguments.number_of_keys <= 0 || arguments.number_of_keys_per_transaction <= 0) { | |
return; | |
} | |
auto start = std::chrono::system_clock::now(); | |
{ | |
std::map<std::string, std::size_t> errors; | |
auto exec_start = std::chrono::system_clock::now(); | |
for (std::size_t i = 0; i < arguments.number_of_transactions; ++i) { | |
std::vector<std::string> selected_keys; | |
std::sample(document_ids.begin(), | |
document_ids.end(), | |
std::back_insert_iterator(selected_keys), | |
arguments.number_of_keys_per_transaction, | |
std::mt19937_64{ std::random_device()() }); | |
for (const auto& id : selected_keys) { | |
auto future = collection.get(id); | |
auto [ctx, result] = future.get(); | |
if (ctx.ec()) { | |
errors[ctx.ec().message()]++; | |
} | |
} | |
} | |
auto exec_end = std::chrono::system_clock::now(); | |
fmt::print("\rExecuted {} batches with {} KV GET operations in {}ms ({}us, {}s), average latency: {}ms\n", | |
arguments.number_of_transactions, | |
arguments.number_of_keys_per_transaction, | |
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count(), | |
std::chrono::duration_cast<std::chrono::microseconds>(exec_end - exec_start).count(), | |
std::chrono::duration_cast<std::chrono::seconds>(exec_end - exec_start).count(), | |
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count() / | |
arguments.number_of_keys_per_transaction); | |
if (errors.empty()) { | |
fmt::print("\tAll operations completed successfully\n"); | |
} else { | |
fmt::print("\tSome operations completed with errors:\n"); | |
for (auto [error, hits] : errors) { | |
fmt::print("\t\t{}: {}\n", error, hits); | |
} | |
} | |
} | |
auto end = std::chrono::system_clock::now(); | |
fmt::print("Total time for bulk execution {}ms ({}us, {}s)\n", | |
std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count(), | |
std::chrono::duration_cast<std::chrono::microseconds>(end - start).count(), | |
std::chrono::duration_cast<std::chrono::seconds>(end - start).count()); | |
} | |
void | |
run_kv_lookup_in_workload(const std::vector<std::string> document_ids, const couchbase::collection& collection, const program_arguments& arguments) | |
{ | |
if (arguments.number_of_keys <= 0 || arguments.number_of_keys_per_transaction <= 0) { | |
return; | |
} | |
auto start = std::chrono::system_clock::now(); | |
{ | |
std::map<std::string, std::size_t> errors; | |
auto exec_start = std::chrono::system_clock::now(); | |
std::string uuid = "ca08c84a-1e29-4964-a84e-b5981ffe8a13"; | |
for (std::size_t i = 0; i < arguments.number_of_transactions; ++i) { | |
std::vector<std::string> selected_keys; | |
std::sample(document_ids.begin(), | |
document_ids.end(), | |
std::back_insert_iterator(selected_keys), | |
arguments.number_of_keys_per_transaction, | |
std::mt19937_64{ std::random_device()() }); | |
for (const auto& id : selected_keys) { | |
auto specs = couchbase::lookup_in_specs{ | |
couchbase::lookup_in_specs::get("txn.id1." + uuid).xattr(), | |
couchbase::lookup_in_specs::get("txn.id2." + uuid).xattr(), | |
couchbase::lookup_in_specs::get("txn.id3." + uuid).xattr(), | |
couchbase::lookup_in_specs::get("txn.id4." + uuid).xattr(), | |
couchbase::lookup_in_specs::get("txn.op.stgd").xattr(), | |
couchbase::lookup_in_specs::get("txn.atr.bkt").xattr(), | |
couchbase::lookup_in_specs::get("txn.atr.scp").xattr(), | |
couchbase::lookup_in_specs::get("txn.atr.coll").xattr(), | |
couchbase::lookup_in_specs::get("txn.restore").xattr(), | |
couchbase::lookup_in_specs::get("txn.op.type").xattr(), | |
couchbase::lookup_in_specs::get(couchbase::subdoc::lookup_in_macro::document).xattr(), | |
couchbase::lookup_in_specs::get("txn.op.crc32").xattr(), | |
couchbase::lookup_in_specs::get("txn.fc").xattr(), | |
couchbase::lookup_in_specs::get(""), | |
}; | |
auto future = collection.lookup_in(id, specs); | |
auto [ctx, result] = future.get(); | |
if (ctx.ec()) { | |
errors[ctx.ec().message()]++; | |
} | |
} | |
} | |
auto exec_end = std::chrono::system_clock::now(); | |
fmt::print("\rExecuted {} batches with {} KV lookup_in operations in {}ms ({}us, {}s), average latency: {}ms\n", | |
arguments.number_of_transactions, | |
arguments.number_of_keys_per_transaction, | |
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count(), | |
std::chrono::duration_cast<std::chrono::microseconds>(exec_end - exec_start).count(), | |
std::chrono::duration_cast<std::chrono::seconds>(exec_end - exec_start).count(), | |
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count() / | |
arguments.number_of_keys_per_transaction); | |
if (errors.empty()) { | |
fmt::print("\tAll operations completed successfully\n"); | |
} else { | |
fmt::print("\tSome operations completed with errors:\n"); | |
for (auto [error, hits] : errors) { | |
fmt::print("\t\t{}: {}\n", error, hits); | |
} | |
} | |
} | |
} | |
void | |
run_kv_lookup_in_workload_optimised(const std::vector<std::string> document_ids, | |
const couchbase::collection& collection, | |
const program_arguments& arguments) | |
{ | |
if (arguments.number_of_keys <= 0 || arguments.number_of_keys_per_transaction <= 0) { | |
return; | |
} | |
auto start = std::chrono::system_clock::now(); | |
{ | |
std::map<std::string, std::size_t> errors; | |
std::string uuid = "ca08c84a-1e29-4964-a84e-b5981ffe8a13"; | |
std::vector<couchbase::subdoc::get> specsList = std::vector{ | |
couchbase::lookup_in_specs::get("txn.id1." + uuid).xattr(), | |
couchbase::lookup_in_specs::get("txn.id2." + uuid).xattr(), | |
couchbase::lookup_in_specs::get("txn.id3." + uuid).xattr(), | |
couchbase::lookup_in_specs::get("txn.id4." + uuid).xattr(), | |
couchbase::lookup_in_specs::get("txn.op.stgd").xattr(), | |
couchbase::lookup_in_specs::get("txn.atr.bkt").xattr(), | |
couchbase::lookup_in_specs::get("txn.atr.scp").xattr(), | |
couchbase::lookup_in_specs::get("txn.atr.coll").xattr(), | |
couchbase::lookup_in_specs::get("txn.restore").xattr(), | |
couchbase::lookup_in_specs::get("txn.op.type").xattr(), | |
couchbase::lookup_in_specs::get(couchbase::subdoc::lookup_in_macro::document).xattr(), | |
couchbase::lookup_in_specs::get("txn.op.crc32").xattr(), | |
couchbase::lookup_in_specs::get("txn.fc").xattr(), | |
couchbase::lookup_in_specs::get(""), | |
}; | |
couchbase::core::impl::subdoc::command_bundle cb; | |
size_t vecSize = specsList.size(); | |
for (unsigned int i = 0; i < vecSize; i++) { | |
auto operation = specsList[i]; | |
operation.encodePublic(cb); | |
} | |
auto ptr = std::make_shared<couchbase::core::impl::subdoc::command_bundle>(cb); | |
auto specs = couchbase::lookup_in_specs(ptr); | |
auto exec_start = std::chrono::system_clock::now(); | |
for (std::size_t i = 0; i < arguments.number_of_transactions; ++i) { | |
std::vector<std::string> selected_keys; | |
std::sample(document_ids.begin(), | |
document_ids.end(), | |
std::back_insert_iterator(selected_keys), | |
arguments.number_of_keys_per_transaction, | |
std::mt19937_64{ std::random_device()() }); | |
for (const auto& id : selected_keys) { | |
auto future = collection.lookup_in(id, specs); | |
auto [ctx, result] = future.get(); | |
if (ctx.ec()) { | |
errors[ctx.ec().message()]++; | |
} | |
} | |
} | |
auto exec_end = std::chrono::system_clock::now(); | |
fmt::print("\rExecuted {} batches with {} KV optimised lookup_in operations in {}ms ({}us, {}s), average latency: {}ms\n", | |
arguments.number_of_transactions, | |
arguments.number_of_keys_per_transaction, | |
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count(), | |
std::chrono::duration_cast<std::chrono::microseconds>(exec_end - exec_start).count(), | |
std::chrono::duration_cast<std::chrono::seconds>(exec_end - exec_start).count(), | |
std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start).count() / | |
arguments.number_of_keys_per_transaction); | |
if (errors.empty()) { | |
fmt::print("\tAll operations completed successfully\n"); | |
} else { | |
fmt::print("\tSome operations completed with errors:\n"); | |
for (auto [error, hits] : errors) { | |
fmt::print("\t\t{}: {}\n", error, hits); | |
} | |
} | |
} | |
} | |
int | |
main() | |
{ | |
auto arguments = program_arguments::load_from_environment(); | |
fmt::print("CB_CONNECTION_STRING={}\n", arguments.connection_string); | |
fmt::print("CB_USERNAME={}\n", arguments.username); | |
fmt::print("CB_PASSWORD={}\n", arguments.password); | |
fmt::print("CB_BUCKET_NAME={}\n", arguments.bucket_name); | |
fmt::print("CB_SCOPE_NAME={}\n", arguments.scope_name); | |
fmt::print("CB_COLLECTION_NAME={}\n", arguments.collection_name); | |
fmt::print("CB_NUMBER_OF_KEYS={}\n", arguments.number_of_keys); | |
fmt::print("CB_NUMBER_OF_TRANSACTIONS={}\n", arguments.number_of_transactions); | |
fmt::print("CB_NUMBER_OF_KEYS_PER_TRANSACTION={}\n", arguments.number_of_keys_per_transaction); | |
fmt::print("CB_DOCUMENT_BODY_SIZE={}\n", arguments.document_body_size); | |
fmt::print("CB_TRANSACTION_EXPIRATION_TIME={}\n", arguments.transaction_expiration_time.count()); | |
asio::io_context io; | |
auto guard = asio::make_work_guard(io); | |
std::thread io_thread([&io]() { io.run(); }); | |
auto options = couchbase::cluster_options(arguments.username, arguments.password); | |
options.apply_profile("wan_development"); | |
options.transactions().expiration_time(arguments.transaction_expiration_time); | |
auto [cluster, ec] = couchbase::cluster::connect(io, arguments.connection_string, options).get(); | |
if (ec) { | |
fmt::print("Unable to connect to cluster at \"{}\", error: {}\n", arguments.connection_string, ec.message()); | |
} else { | |
auto transactions = cluster.transactions(); | |
auto collection = cluster.bucket(arguments.bucket_name).scope(arguments.scope_name).collection(arguments.collection_name); | |
const std::string document_id_prefix{ "tx_mix" }; | |
std::vector<std::string> document_ids; | |
auto document_body = generate_document(arguments.document_body_size); | |
document_ids.reserve(arguments.number_of_keys); | |
for (std::size_t i = 0; i < arguments.number_of_keys; ++i) { | |
auto document_id = fmt::format("{}_{:06d}", document_id_prefix, i); | |
document_ids.emplace_back(document_id); | |
collection.upsert(document_id, document_body); | |
} | |
fmt::print( | |
"Using {} IDs in interval [\"{}\"...\"{}\"]\n", document_ids.size(), document_ids[0], document_ids[document_ids.size() - 1]); | |
run_kv_get_workload(document_ids, collection, arguments); | |
run_kv_lookup_in_workload(document_ids, collection, arguments); | |
run_kv_lookup_in_workload_optimised(document_ids, collection, arguments); | |
// run_transactions_async_workload(document_ids, transactions, collection, arguments); | |
//run_transactions_sync_workload(document_ids, transactions, collection, arguments); | |
} | |
cluster.close(); | |
guard.reset(); | |
io_thread.join(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment