Skip to content

Instantly share code, notes, and snippets.

@t2ym
Last active June 9, 2021 06:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save t2ym/a185c7daafbd4f938321a7e4018f90a3 to your computer and use it in GitHub Desktop.
Save t2ym/a185c7daafbd4f938321a7e4018f90a3 to your computer and use it in GitHub Desktop.
Prototype for accessor of memberOf attributes for LDAP users to be used for nicexprs.h middleware
/*
* Copyright (c) 2021, Tetsuya Mori <t2y3141592@gmail.com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <stdio.h>
#include <ldap.h>
#include <inttypes.h>
#include <stdlib.h>
#include <signal.h>
#include <iostream>
#include <sstream>
#include <list>
#include <memory>
#include <chrono>
#include <thread>
#include <future>
#define HOST_CONNECT_LOG 1
#define ASYNC_LDAP_ACCESS 1
#if ASYNC_LDAP_ACCESS
#include <boost/asio.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
// boost custom error_code based on the boilerplate
// at https://www.boost.org/doc/libs/1_76_0/libs/outcome/doc/html/motivation/plug_error_code2.html
#include <boost/system/error_code.hpp>
enum class LdapErrc { // actually a bool
Success = 0,
Error = 1 // not enumerate all errors from LDAP
};
namespace boost {
namespace system {
template <> struct is_error_code_enum<LdapErrc> : std::true_type {};
}
}
namespace detail {
class LdapErrc_category : public boost::system::error_category {
public:
virtual const char *name() const noexcept override final { return "LdapError"; }
virtual std::string message(int err) const override final {
char *err_msg = ldap_err2string(err);
return err_msg ? err_msg : "unknown LDAP error";
}
virtual boost::system::error_condition default_error_condition(int err) const noexcept override final {
// I have no mapping for this code
return boost::system::error_condition(err, *this);
}
};
}
#define THIS_MODULE_API_DECL extern inline
THIS_MODULE_API_DECL const detail::LdapErrc_category &LdapErrc_category() {
static detail::LdapErrc_category err;
return err;
}
inline boost::system::error_code make_error_code(LdapErrc err, int ldap_err) {
// trick to make an error_code instance with ldap_err; err is actually ignored
return {static_cast<int>(ldap_err), LdapErrc_category()};
}
class async_adaptor {
public:
enum { SOCKET_NOT_ASSIGNED = -1 };
typedef std::function<void (const boost::system::error_code &err)> async_cb;
async_adaptor(boost::asio::io_context &context)
: socket_fd(SOCKET_NOT_ASSIGNED), context(context), async_socket(context), canary_socket(context), async_resolver(context) {}
~async_adaptor() {
assign(SOCKET_NOT_ASSIGNED);
}
void assign(int fd) {
if (socket_fd >= 0) {
async_socket.cancel();
async_socket.release();
}
if ((socket_fd = fd) >= 0) {
async_socket.assign(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0).protocol(), socket_fd);
}
else {
socket_fd = SOCKET_NOT_ASSIGNED;
}
}
int socket_fd;
boost::asio::io_context &context;
boost::asio::ip::tcp::socket async_socket;
boost::asio::ip::tcp::socket canary_socket; // check tcp port connectivity before connection to mitigate side-effects of blocking SSL_connect()
boost::asio::ip::tcp::resolver async_resolver;
};
#endif // ASYNC_LDAP_ACCESS
class ldap_exception: public std::exception {
public:
static void throw_on_error(int err) { if (err != LDAP_SUCCESS) throw ldap_exception(err); }
using std::exception::exception;
ldap_exception(int err): err(err) {
char *err_msg = ldap_err2string(err);
if (err_msg) {
message = err_msg;
}
else {
message = "unknown error ";
message += err;
}
}
virtual const char *what() const noexcept { return message.c_str(); }
int err;
std::string message;
};
class ldap_query_spec {
public:
ldap_query_spec(std::string user_dn, std::string search_filter, int scope = LDAP_SCOPE_BASE)
: user_dn(user_dn), search_filter(search_filter), scope(scope) {}
virtual ~ldap_query_spec() {}
virtual char **attributes() = 0;
virtual void store(LDAP *ldap, LDAPMessage *msg, const char *attr, char *value, std::size_t len) = 0;
virtual void clear_result() = 0;
virtual bool is_target_attribute(const char *attr) = 0;
std::string user_dn;
std::string search_filter;
int scope;
// result must be defined in derived classes
};
template <class query_spec>
class ldap_search {
public:
typedef enum {
_LDAP_RES_TIMEOUT = 0,
_LDAP_RES_ERROR = -1
} ldap_result_err;
typedef enum {
TRUE = 1 /* not comparable */, FALSE = 0
} ldap_boolean_int;
typedef enum {
ACTION_RETHROW_EXCEPTION,
ACTION_FAILOVER_TO_NEXT_HOST,
ACTION_RETURN_NULL_RESULT
} action_on_ldap_exception;
struct ldap_host {
std::string host_name;
std::chrono::steady_clock::time_point last_successful_access;
std::chrono::steady_clock::time_point last_non_critical_error; // ACTION_RETURN_NULL_RESULT
std::chrono::steady_clock::time_point last_critical_failure; // ACTION_RETHROW_EXCEPTION
int connected_counter;
int non_critical_error_counter;
int critical_failure_counter;
};
ldap_search(std::string hosts, std::string ca_file, std::string bind_user, std::string bind_password,
int port = 636, time_t network_timeout_sec = 5, time_t search_timeout_sec = 5, int size_limit = 0)
: ldap(nullptr), ldap_version(LDAP_VERSION3), ca_file(ca_file), ldap_port(port),
network_timeout({network_timeout_sec, 0}), search_timeout({search_timeout_sec, 0}),
size_limit(size_limit), bind_user(bind_user), bind_password(bind_password) {
set_ldap_hosts(hosts);
}
#if ASYNC_LDAP_ACCESS
// Each instance is associated to an io_context and not shared among multiple threads
ldap_search(boost::asio::io_context &context, std::string hosts, std::string ca_file, std::string bind_user, std::string bind_password,
int port = 636, time_t network_timeout_sec = 5, time_t search_timeout_sec = 5, int size_limit = 0, int max_concurrent_requests = 100)
: adaptor(std::make_shared<async_adaptor>(context)),
ldap(nullptr), ldap_version(LDAP_VERSION3), ca_file(ca_file), ldap_port(port),
network_timeout({network_timeout_sec, 0}), search_timeout({search_timeout_sec, 0}), demultiplexer(max_concurrent_requests),
size_limit(size_limit), bind_user(bind_user), bind_password(bind_password) {
set_ldap_hosts(hosts);
}
#endif // ASYNC_LDAP_ACCESS
~ldap_search() {
disconnect();
}
ldap_search &sync_connect() {
try {
return do_sync_connect();
}
catch (ldap_exception &e) {
return sync_failover();
}
}
ldap_search &disconnect() {
if (ldap) {
#if HOST_CONNECT_LOG
std::cerr << "disconnect()" << std::endl;
#endif
ldap_unbind_ext_s(ldap, nullptr, nullptr);
ldap = nullptr;
}
#if ASYNC_LDAP_ACCESS
demultiplexer.connected = false;
#endif
return *this;
}
void sync_search(query_spec &spec) {
for (;;) {
try {
do_sync_search(spec);
current_host->last_successful_access = std::chrono::steady_clock::now();
return;
}
catch (ldap_exception &e) {
action_on_ldap_exception action = get_action_for_ldap_error(e.err);
switch (action) {
case ACTION_FAILOVER_TO_NEXT_HOST:
sync_failover();
break;
case ACTION_RETURN_NULL_RESULT:
current_host->last_non_critical_error = std::chrono::steady_clock::now();
current_host->non_critical_error_counter++;
spec.result = nullptr;
return;
case ACTION_RETHROW_EXCEPTION:
default:
current_host->last_critical_failure = std::chrono::steady_clock::now();
current_host->critical_failure_counter++;
throw;
}
}
}
}
protected:
void set_ldap_hosts(std::string &hosts) {
std::istringstream iss(hosts + ",");
std::string host;
while (!std::getline(iss, host, ',').eof()) {
ldap_hosts.push_back(ldap_host{
host,
std::chrono::steady_clock::time_point::min(),
std::chrono::steady_clock::time_point::min(),
std::chrono::steady_clock::time_point::min(),
0, 0, 0
});
}
#if HOST_CONNECT_LOG
for (auto i: ldap_hosts) {
std::cerr << "ldap_host:" << i.host_name << std::endl;
}
#endif
current_host = ldap_hosts.begin();
}
virtual std::string get_ldap_url() { return "ldaps://" + current_host->host_name + ":" + std::to_string(ldap_port); }
void do_connect_common() {
//int debug_level = 65535;
//ldap_set_option(ldap, LDAP_OPT_DEBUG_LEVEL, &debug_level);
ldap_exception::throw_on_error(ldap_initialize(&ldap, get_ldap_url().c_str()));
ldap_exception::throw_on_error(ldap_set_option(ldap, LDAP_OPT_PROTOCOL_VERSION, &ldap_version));
ldap_exception::throw_on_error(ldap_set_option(ldap, LDAP_OPT_NETWORK_TIMEOUT, &network_timeout));
ldap_exception::throw_on_error(ldap_set_option(ldap, LDAP_OPT_SIZELIMIT, &size_limit));
int opt = LDAP_OPT_X_TLS_NEVER; // no StartTLS
ldap_exception::throw_on_error(ldap_set_option(ldap, LDAP_OPT_X_TLS_REQUIRE_CERT, &opt));
char *current_ca_file_ptr = nullptr;
ldap_exception::throw_on_error(ldap_get_option(NULL, LDAP_OPT_X_TLS_CACERTFILE, &current_ca_file_ptr));
bool preserve_current_ca_file = false;
if (current_ca_file_ptr) {
std::string current_ca_file(current_ca_file_ptr);
ldap_memfree(current_ca_file_ptr);
if (ca_file == current_ca_file) {
preserve_current_ca_file = true;
}
}
if (!preserve_current_ca_file) {
ldap_exception::throw_on_error(ldap_set_option(NULL, LDAP_OPT_X_TLS_CACERTFILE, ca_file.c_str()));
}
}
ldap_search &do_sync_connect() {
#if HOST_CONNECT_LOG
std::cerr << "do_sync_connect() on " << get_ldap_url() << std::endl;
#endif
do_connect_common();
BerValue *cred_p, password_ber({bind_password.size(), &bind_password[0]});
ldap_exception::throw_on_error(ldap_sasl_bind_s(ldap, bind_user.c_str(), LDAP_SASL_SIMPLE, &password_ber, nullptr, nullptr, &cred_p));
return *this;
}
ldap_search &sync_reconnect() {
disconnect();
return do_sync_connect();
}
ldap_search &sync_failover() {
auto previous_host = current_host;
int round = 0;
bool connected = false;
bool giving_up = false;
while (!connected) {
current_host++;
if (current_host == ldap_hosts.end()) {
current_host = ldap_hosts.begin();
}
if (current_host == previous_host) {
round++;
}
try {
switch (round) {
case 0:
// no wait on round 0
break;
case 1:
if (current_host == previous_host) {
// no wait on previous_host on round 1
break;
}
// fall-through
case 2:
case 3:
// insert a wait on round 1, 2, 3 for synchronous search
std::this_thread::sleep_for(
std::chrono::seconds(network_timeout.tv_sec) -
(std::chrono::steady_clock::now() - current_host->last_critical_failure));
break;
default:
// give up connection on rount 4 for synchronous search
giving_up = true;
break;
}
sync_reconnect();
connected = true;
}
catch (ldap_exception &e) {
current_host->last_critical_failure = std::chrono::steady_clock::now();
current_host->critical_failure_counter++;
if (giving_up) {
throw;
}
}
}
return *this;
}
action_on_ldap_exception get_action_for_ldap_error(int err) {
action_on_ldap_exception action = ACTION_RETHROW_EXCEPTION;
switch (err) {
/* LDAPv3 (RFC 4511) codes */
case LDAP_SUCCESS: // Success
break; // unexpected
case LDAP_TIMELIMIT_EXCEEDED: // Time limit exceeded
action = ACTION_FAILOVER_TO_NEXT_HOST; break;
case LDAP_REFERRAL: // Referral
action = ACTION_RETURN_NULL_RESULT; break;
case LDAP_ADMINLIMIT_EXCEEDED: // Administrative limit exceeded
action = ACTION_FAILOVER_TO_NEXT_HOST; break;
case LDAP_SASL_BIND_IN_PROGRESS: // SASL bind in progress
action = ACTION_FAILOVER_TO_NEXT_HOST; break;
case LDAP_NO_SUCH_OBJECT: // No such object
action = ACTION_RETURN_NULL_RESULT; break;
case LDAP_BUSY: // Server is busy
case LDAP_UNAVAILABLE: // Server is unavailable
action = ACTION_FAILOVER_TO_NEXT_HOST; break;
case LDAP_LOOP_DETECT: // Loop detected
action = ACTION_RETURN_NULL_RESULT; break;
/* LDAPv2 (RFC 1777) codes */
case LDAP_PARTIAL_RESULTS: // Partial results and referral received
case LDAP_IS_LEAF: // Entry is a leaf
action = ACTION_RETURN_NULL_RESULT; break;
/* Proxied Authorization Control (RFC 4370 and I-D) codes */
case LDAP_PROXIED_AUTHORIZATION_DENIED: // Proxied Authorization Denied
case LDAP_X_PROXY_AUTHZ_FAILURE: // Proxy Authorization Failure (X)
action = ACTION_FAILOVER_TO_NEXT_HOST; break;
/* API codes - renumbered since draft-ietf-ldapext-ldap-c-api */
case LDAP_SERVER_DOWN: // Can't contact LDAP server
action = ACTION_FAILOVER_TO_NEXT_HOST; break;
case LDAP_TIMEOUT: // Timed out
action = ACTION_FAILOVER_TO_NEXT_HOST; break;
#if EXPLICIT_CASE_ENTRIES_FOR_DEFAULT // ACTION_RETHROW_EXCEPTION
/* LDAPv3 (RFC 4511) codes */
case LDAP_OPERATIONS_ERROR: // Operations error
case LDAP_PROTOCOL_ERROR: // Protocol error
case LDAP_SIZELIMIT_EXCEEDED: // Size limit exceeded
case LDAP_COMPARE_FALSE: // Compare False
case LDAP_COMPARE_TRUE: // Compare True
case LDAP_STRONG_AUTH_NOT_SUPPORTED: // Authentication method not supported
case LDAP_STRONG_AUTH_REQUIRED: // Strong(er) authentication required
case LDAP_UNAVAILABLE_CRITICAL_EXTENSION: // Critical extension is unavailable
case LDAP_CONFIDENTIALITY_REQUIRED: // Confidentiality required
case LDAP_NO_SUCH_ATTRIBUTE: // No such attribute
case LDAP_UNDEFINED_TYPE: // Undefined attribute type
case LDAP_INAPPROPRIATE_MATCHING: // Inappropriate matching
case LDAP_CONSTRAINT_VIOLATION: // Constraint violation
case LDAP_TYPE_OR_VALUE_EXISTS: // Type or value exists
case LDAP_INVALID_SYNTAX: // Invalid syntax
case LDAP_ALIAS_PROBLEM: // Alias problem
case LDAP_INVALID_DN_SYNTAX: // Invalid DN syntax
case LDAP_ALIAS_DEREF_PROBLEM: // Alias dereferencing problem
case LDAP_INAPPROPRIATE_AUTH: // Inappropriate authentication
case LDAP_INVALID_CREDENTIALS: // Invalid credentials
case LDAP_INSUFFICIENT_ACCESS: // Insufficient access
case LDAP_UNWILLING_TO_PERFORM: // Server is unwilling to perform
case LDAP_NAMING_VIOLATION: // Naming violation
case LDAP_OBJECT_CLASS_VIOLATION: // Object class violation
case LDAP_NOT_ALLOWED_ON_NONLEAF: // Operation not allowed on non-leaf
case LDAP_NOT_ALLOWED_ON_RDN: // Operation not allowed on RDN
case LDAP_ALREADY_EXISTS: // Already exists
case LDAP_NO_OBJECT_CLASS_MODS: // Cannot modify object class
case LDAP_AFFECTS_MULTIPLE_DSAS: // Operation affects multiple DSAs
/* Virtual List View draft */
case LDAP_VLV_ERROR: // Virtual List View error
case LDAP_OTHER: // Other (e.g., implementation specific) error
/* Connection-less LDAP (CLDAP - RFC 1798) code */
case LDAP_RESULTS_TOO_LARGE: // Results too large
/* Cancel Operation (RFC 3909) codes */
case LDAP_CANCELLED: // Cancelled
case LDAP_NO_SUCH_OPERATION: // No Operation to Cancel
case LDAP_TOO_LATE: // Too Late to Cancel
case LDAP_CANNOT_CANCEL: // Cannot Cancel
/* Assert Control (RFC 4528 and old internet-draft) codes */
case LDAP_ASSERTION_FAILED: // Assertion Failed
case LDAP_X_ASSERTION_FAILED: // Assertion Failed (X)
/* Content Sync Operation (RFC 4533 and I-D) codes */
case LDAP_SYNC_REFRESH_REQUIRED: // Content Sync Refresh Required
case LDAP_X_SYNC_REFRESH_REQUIRED: // Content Sync Refresh Required (X)
/* No-Op Control (draft-zeilenga-ldap-noop) code */
case LDAP_X_NO_OPERATION: // No Operation (X)
/* Client Update Protocol (RFC 3928) codes */
case LDAP_CUP_RESOURCES_EXHAUSTED: // LCUP Resources Exhausted
case LDAP_CUP_SECURITY_VIOLATION: // LCUP Security Violation
case LDAP_CUP_INVALID_DATA: // LCUP Invalid Data
case LDAP_CUP_UNSUPPORTED_SCHEME: // LCUP Unsupported Scheme
case LDAP_CUP_RELOAD_REQUIRED: // LCUP Reload Required
case LDAP_TXN_SPECIFY_OKAY: // TXN specify okay
case LDAP_TXN_ID_INVALID: // TXN ID is invalid
/* API codes - renumbered since draft-ietf-ldapext-ldap-c-api */
case LDAP_LOCAL_ERROR: // Local error
case LDAP_ENCODING_ERROR: // Encoding error
case LDAP_DECODING_ERROR: // Decoding error
case LDAP_AUTH_UNKNOWN: // Unknown authentication method
case LDAP_FILTER_ERROR: // Bad search filter
case LDAP_USER_CANCELLED: // User cancelled operation
case LDAP_PARAM_ERROR: // Bad parameter to an ldap routine
case LDAP_NO_MEMORY: // Out of memory
case LDAP_CONNECT_ERROR: // Connect error
case LDAP_NOT_SUPPORTED: // Not Supported
case LDAP_CONTROL_NOT_FOUND: // Control not found
case LDAP_NO_RESULTS_RETURNED: // No results returned
case LDAP_MORE_RESULTS_TO_RETURN: // More results to return
case LDAP_CLIENT_LOOP: // Client Loop
case LDAP_REFERRAL_LIMIT_EXCEEDED: // Referral Limit Exceeded
case LDAP_X_CONNECTING: // Connecting (X)
break;
#endif
default:
break;
}
return action;
}
void do_sync_search(query_spec &spec) {
int err;
int msg_id;
err = ldap_search_ext(ldap, spec.user_dn.c_str(), spec.scope, spec.search_filter.c_str(), spec.attributes(), 0, nullptr, nullptr, &search_timeout, size_limit, &msg_id);
ldap_exception::throw_on_error(err);
LDAPMessage *msg;
int msg_type;
BerElement *ber;
do {
msg = nullptr;
switch (msg_type = ldap_result(ldap, msg_id, LDAP_MSG_ONE, &search_timeout, &msg)) {
case _LDAP_RES_ERROR:
ldap_get_option(ldap, LDAP_OPT_RESULT_CODE, &err);
ldap_exception::throw_on_error(err);
break;
case _LDAP_RES_TIMEOUT:
ldap_abandon_ext(ldap, msg_id, nullptr, nullptr);
ldap_exception::throw_on_error(LDAP_TIMEOUT);
break;
case LDAP_RES_SEARCH_ENTRY:
ber = nullptr;
for (const char *attr = ldap_first_attribute(ldap, msg, &ber); attr; attr = ldap_next_attribute(ldap, msg, ber)) {
if (spec.is_target_attribute(attr)) {
BerValue **values = ldap_get_values_len(ldap, msg, attr);
int len = ldap_count_values_len(values);
for(int i = 0; i < len; i++) {
spec.store(ldap, msg, attr, values[i]->bv_val, values[i]->bv_len);
}
ldap_value_free_len(values);
}
ber_memfree((void *)attr);
}
if (ber) {
ber_free(ber, FALSE);
}
ldap_msgfree(msg);
break;
// TODO: LDAP_RES_SEARCH_REFERENCE
case LDAP_RES_SEARCH_RESULT:
ldap_parse_result(ldap, msg, &err, nullptr, nullptr, nullptr, nullptr, FALSE);
ldap_msgfree(msg);
ldap_exception::throw_on_error(err);
break;
default:
if (msg) {
ldap_msgfree(msg);
}
break;
}
} while (msg_type != LDAP_RES_SEARCH_RESULT);
}
#if ASYNC_LDAP_ACCESS
public:
typedef std::function<void (const boost::system::error_code &err)> connect_cb;
typedef std::function<void (const boost::system::error_code &err, decltype(query_spec::result) result)> search_cb;
void async_connect(connect_cb cb) {
if (demultiplexer.connected) {
// already connected
static boost::system::error_code err;
cb(err);
return;
}
if (demultiplexer.failover_in_progress || demultiplexer.connecting) {
auto original_cb = failover_status.cb;
// hook backed-up original cb; this hooking may be nested by multiple calls of async_connect()
failover_status.cb = [this, cb, original_cb](const boost::system::error_code &err){
original_cb(err);
cb(err);
};
}
else {
demultiplexer.connecting = true;
connect_status.cb = [this, cb](const boost::system::error_code &err){
if (!err) {
cb(err);
handle_async_request(err); // in case async_member_of() has been called without calling async_connect(); does nothing if there are no such requests
}
else {
async_failover(cb, err);
}
};
do_async_connect();
}
}
void async_search(std::shared_ptr<query_spec> spec, search_cb cb) {
auto status = std::make_shared<query_status>(query_status{spec, 0, cb});
demultiplexer.request_wait_queue.push_back(status);
if (demultiplexer.connected) {
handle_async_request(status->err);
}
else {
if (demultiplexer.failover_in_progress || demultiplexer.connecting) {
// callback will be called
}
else {
// not connected nor failover_in_progress, nor connecting
async_connect([](const boost::system::error_code &err){
// handle_async_request() will handle the cb for the request
});
}
}
}
protected:
// A macro to call a callback with an error code if it is not successful
#define CALLBACK_ON_ERROR(cb, err) if (err) { cb(err); return; }
// A catch-clause macro to call a callback with an error code created from an exception,
// which has been created from the same error code,
// so that the error code can be propagated to the original caller which is waiting for a callback call,
// which cannot catch exceptions during the async process
// As exceptions are exceptional, performance overheads by these conversions should be negligible for normal operations
#define CALLBACK_WITH_CAUGHT_EXCEPTION(cb) \
catch (ldap_exception &e) { cb(make_error_code(LdapErrc::Error, e.err)); }\
catch (std::system_error &e) { cb(boost::system::error_code((int)e.code().value(), boost::system::system_category())); }\
catch (...) { cb(make_error_code(LdapErrc::Error, LDAP_LOCAL_ERROR)); } // TODO: properly handle other error categories
void do_async_connect() {
#if HOST_CONNECT_LOG
std::cerr << "do_async_connect() on " << get_ldap_url() << std::endl;
#endif
try {
adaptor->async_resolver.async_resolve(
current_host->host_name, std::to_string(ldap_port),
boost::bind(&ldap_search::on_hostname_resolved, this, boost::asio::placeholders::error, boost::asio::placeholders::iterator));
}
CALLBACK_WITH_CAUGHT_EXCEPTION(connect_status.cb);
}
// resolve ldap hostname asynchronously before ldap_initialize to cache the host IP address locally
void on_hostname_resolved(const boost::system::error_code &err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) {
CALLBACK_ON_ERROR(connect_status.cb, err);
try {
connect_status.is_tcp_connected = false;
connect_status.is_tcp_connect_timed_out = false;
connect_status.timer = std::make_shared<boost::asio::steady_timer>(
adaptor->context, std::chrono::milliseconds(network_timeout.tv_sec * 1000));
boost::asio::async_connect(adaptor->canary_socket, endpoint_iterator,
boost::bind(&ldap_search::on_tcp_port_connectable, this, boost::asio::placeholders::error));
connect_status.timer->async_wait(
boost::bind(&ldap_search::on_tcp_connect_timeout, this, boost::asio::placeholders::error));
}
CALLBACK_WITH_CAUGHT_EXCEPTION(connect_status.cb);
}
// connect aynchronously to ldaps tcp port with the canary socket to mitigate blocking SSL_connect() in ldap_sasl_bind()
void on_tcp_port_connectable(const boost::system::error_code &err) {
connect_status.is_tcp_connected = true; // even on an error status for on_tcp_connect_timeout to judge cancellation
if (!connect_status.is_tcp_connect_timed_out) {
connect_status.timer->cancel();
}
CALLBACK_ON_ERROR(connect_status.cb, err);
try {
adaptor->canary_socket.close();
on_ready_to_connect(err);
}
CALLBACK_WITH_CAUGHT_EXCEPTION(connect_status.cb);
}
void on_tcp_connect_timeout(const boost::system::error_code &err) {
if (!err && !connect_status.is_tcp_connected) {
connect_status.is_tcp_connect_timed_out = true;
adaptor->canary_socket.cancel();
}
}
void on_ready_to_connect(const boost::system::error_code &err) {
CALLBACK_ON_ERROR(connect_status.cb, err);
try {
do_connect_common();
ldap_exception::throw_on_error(ldap_set_option(ldap, LDAP_OPT_CONNECT_ASYNC, LDAP_OPT_ON));
int socket_fd = async_adaptor::SOCKET_NOT_ASSIGNED;
BerValue password_ber({bind_password.size(), &bind_password[0]});
// ldap_sasl_bind() blocks on SSL_connect(), whose side effects are mitigated by the tcp port connectivity check with the canary socket beforehand
ldap_exception::throw_on_error(ldap_sasl_bind(ldap, bind_user.c_str(), LDAP_SASL_SIMPLE, &password_ber, nullptr, nullptr, &connect_status.bind_msg_id));
ldap_exception::throw_on_error(ldap_get_option(ldap, LDAP_OPT_DESC, &socket_fd));
connect_status.timer = std::make_shared<boost::asio::steady_timer>(
adaptor->context, std::chrono::milliseconds(network_timeout.tv_sec * 1000));
adaptor->assign(socket_fd);
connect_status.is_tls_connected = false;
connect_status.is_tls_connect_timed_out = false;
adaptor->async_socket.async_wait(boost::asio::ip::tcp::socket::wait_write,
boost::bind(&ldap_search::on_tls_connected, this, boost::asio::placeholders::error));
connect_status.timer->async_wait(
boost::bind(&ldap_search::on_tls_connect_timeout, this, boost::asio::placeholders::error));
}
CALLBACK_WITH_CAUGHT_EXCEPTION(connect_status.cb);
}
void on_tls_connected(const boost::system::error_code &err) {
connect_status.is_tls_connected = true; // even on an error status for on_tls_connect_timeout to judge cancellation
if (!connect_status.is_tls_connect_timed_out) {
connect_status.timer->cancel();
}
CALLBACK_ON_ERROR(connect_status.cb, err);
try {
int _err;
LDAPMessage *msg = nullptr;
struct timeval zero_tv = {0,0};
int msg_type = ldap_result(ldap, connect_status.bind_msg_id, LDAP_MSG_ONE, &zero_tv, &msg);
switch (msg_type) {
case _LDAP_RES_ERROR:
ldap_get_option(this->ldap, LDAP_OPT_RESULT_CODE, &_err);
ldap_exception::throw_on_error(_err);
break;
case _LDAP_RES_TIMEOUT:
adaptor->async_socket.async_wait(boost::asio::ip::tcp::socket::wait_read,
boost::bind(&ldap_search::on_bind, this, boost::asio::placeholders::error));
break;
case LDAP_RES_BIND:
ldap_parse_result(ldap, msg, &_err, nullptr, nullptr, nullptr, nullptr, FALSE);
ldap_msgfree(msg);
ldap_exception::throw_on_error(_err);
demultiplexer.connected = true;
demultiplexer.connecting = false;
if (demultiplexer.failover_in_progress) {
demultiplexer.failover_in_progress = false;
}
current_host->connected_counter++;
connect_status.cb(err);
break;
default:
if (msg) {
ldap_msgfree(msg);
}
ldap_exception::throw_on_error(LDAP_PROTOCOL_ERROR); // treat as a protocol error
break;
}
}
CALLBACK_WITH_CAUGHT_EXCEPTION(connect_status.cb);
}
void on_tls_connect_timeout(const boost::system::error_code &err) {
if (!err && !connect_status.is_tls_connected) {
connect_status.is_tls_connect_timed_out = true;
adaptor->async_socket.cancel();
}
}
void on_bind(const boost::system::error_code &err) {
CALLBACK_ON_ERROR(connect_status.cb, err);
try {
int _err;
LDAPMessage *msg = nullptr;
struct timeval zero_tv = {0,0};
int msg_type = ldap_result(ldap, connect_status.bind_msg_id, LDAP_MSG_ONE, &zero_tv, &msg);
switch (msg_type) {
case _LDAP_RES_ERROR:
ldap_get_option(ldap, LDAP_OPT_RESULT_CODE, &_err);
ldap_exception::throw_on_error(_err);
break;
case _LDAP_RES_TIMEOUT: // unexpected
ldap_abandon_ext(ldap, connect_status.bind_msg_id, nullptr, nullptr);
ldap_exception::throw_on_error(LDAP_TIMEOUT);
break;
case LDAP_RES_BIND:
ldap_parse_result(ldap, msg, &_err, nullptr, nullptr, nullptr, nullptr, FALSE);
ldap_msgfree(msg);
ldap_exception::throw_on_error(_err);
demultiplexer.connected = true;
demultiplexer.connecting = false;
if (demultiplexer.failover_in_progress) {
demultiplexer.failover_in_progress = false;
}
current_host->connected_counter++;
connect_status.cb(err); // callback on success
break;
default:
if (msg) {
ldap_msgfree(msg);
}
ldap_exception::throw_on_error(LDAP_PROTOCOL_ERROR); // treat as a protocol error
break;
}
}
CALLBACK_WITH_CAUGHT_EXCEPTION(connect_status.cb);
}
void async_failover(connect_cb cb, const boost::system::error_code &err) {
connect_status.cb = cb;
failover_status.round = 0;
failover_status.cb = connect_status.cb; // backup the original cb
failover_status.giving_up = false;
failover_status.previous_host = current_host;
connect_status.cb = [this](const boost::system::error_code &err){
if (!err) {
connect_status.cb = failover_status.cb;
connect_status.cb(err);
}
else {
if (failover_status.giving_up) {
current_host->last_critical_failure = std::chrono::steady_clock::now();
current_host->critical_failure_counter++;
connect_status.cb = failover_status.cb;
connect_status.cb(err);
}
else {
do_async_failover(err);
}
}
};
do_async_failover(err);
}
void do_async_failover(const boost::system::error_code &err) {
cancel_async_waiters();
disconnect();
current_host->last_critical_failure = std::chrono::steady_clock::now();
current_host->critical_failure_counter++;
current_host++;
if (current_host == ldap_hosts.end()) {
current_host = ldap_hosts.begin();
}
if (current_host == failover_status.previous_host) {
failover_status.round++;
}
switch (failover_status.round) {
case 0:
// no wait on round 0
break;
case 1:
if (current_host == failover_status.previous_host) {
// no wait on previous_host on round 1
break;
}
// fall-through
case 2:
case 3:
// insert a wait on round 1, 2, 3 for asynchronous search
failover_status.timer = std::make_shared<boost::asio::steady_timer>(
adaptor->context,
std::chrono::seconds(network_timeout.tv_sec) -
(std::chrono::steady_clock::now() - current_host->last_critical_failure));
failover_status.timer->async_wait(
boost::bind(&ldap_search::async_reconnect, this, boost::asio::placeholders::error));
return;
default:
// give up connection on rount 4 for asynchronous search
failover_status.giving_up = true;
break;
}
async_reconnect(err);
}
void async_reconnect(const boost::system::error_code &err) {
push_back_incomplete_requests();
disconnect();
do_async_connect();
}
struct query_status {
std::shared_ptr<query_spec> spec;
int msg_id;
search_cb cb;
boost::system::error_code err;
};
struct query_demultiplexer {
query_demultiplexer(int max_concurrent_requests = 100)
: active_requests_on_last_busy(0), max_concurrent_requests(max_concurrent_requests),
busy_until(std::chrono::steady_clock::time_point::min()),
waiting_for_reading(false), waiting_for_timeout(false), read_timer_cancelled(false), connected(false), connecting(false), failover_in_progress(false),
read_wait_issued_count(0), read_wait_fulfilled_count(0), read_wait_cancelled_count(0), last_read_wait_issued(std::chrono::steady_clock::time_point::min()),
read_timeout_issued_count(0), read_timeout_fulfilled_count(0), read_timeout_cancelled_count(0), read_timeout_cancelling_count(0) {}
int active_requests_on_last_busy;
int max_concurrent_requests;
std::chrono::steady_clock::time_point busy_until;
std::list<std::shared_ptr<query_status>> request_wait_queue;
std::map<int, std::shared_ptr<query_status>> response_wait_map;
bool waiting_for_reading;
bool waiting_for_timeout;
int read_wait_issued_count;
int read_wait_fulfilled_count;
int read_wait_cancelled_count;
int read_timeout_issued_count;
int read_timeout_fulfilled_count;
int read_timeout_cancelled_count;
int read_timeout_cancelling_count;
std::chrono::steady_clock::time_point last_read_wait_issued;
std::shared_ptr<boost::asio::steady_timer> read_timer;
std::chrono::milliseconds read_timeout;
bool read_timer_cancelled;
connect_cb failover_connect_cb;
bool connected;
bool connecting;
bool failover_in_progress;
};
void push_back_incomplete_requests() {
auto wait_map_it = demultiplexer.response_wait_map.rbegin();
auto wait_map_end_it = demultiplexer.response_wait_map.rend();
while (wait_map_it != wait_map_end_it) {
auto status = wait_map_it->second;
// reset status
status->msg_id = -1;
status->spec->clear_result();
status->err = make_error_code(LdapErrc::Success, LDAP_SUCCESS);
// push at the front of request_wait_queue
demultiplexer.request_wait_queue.push_front(status);
wait_map_it++;
}
demultiplexer.response_wait_map.clear();
}
bool callback_or_async_failover(std::shared_ptr<query_status> status, bool force_failover = false) {
bool called_back = true;
if (!status->err) {
current_host->last_successful_access = std::chrono::steady_clock::now();
status->cb(status->err, status->spec->result); // success
}
else {
if (status->err.category() == LdapErrc_category()) {
action_on_ldap_exception action = get_action_for_ldap_error(status->err.value());
if (force_failover) {
action = ACTION_FAILOVER_TO_NEXT_HOST;
}
switch (action) {
case ACTION_FAILOVER_TO_NEXT_HOST:
demultiplexer.failover_connect_cb = [this](const boost::system::error_code &err){
if (!err) {
demultiplexer.failover_in_progress = false;
handle_async_request(err); // restart after failover
}
else {
cancel_all_waiting_requests(err); // clean up request_wait_queue
disconnect();
demultiplexer.failover_in_progress = false;
demultiplexer.connecting = false;
}
};
demultiplexer.failover_in_progress = true;
called_back = false;
async_failover(demultiplexer.failover_connect_cb, status->err);
break;
case ACTION_RETURN_NULL_RESULT:
current_host->last_non_critical_error = std::chrono::steady_clock::now();
current_host->non_critical_error_counter++;
status->cb(status->err, nullptr);
break;
case ACTION_RETHROW_EXCEPTION: // error is handed as a callback error_code for an async request
default:
current_host->last_critical_failure = std::chrono::steady_clock::now();
current_host->critical_failure_counter++;
status->cb(status->err, nullptr);
break;
}
}
else {
status->cb(status->err, status->spec->result);
}
}
return called_back;
}
void cancel_all_waiting_requests(const boost::system::error_code &err) {
auto it = demultiplexer.request_wait_queue.begin();
auto end_it = demultiplexer.request_wait_queue.end();
while (it != end_it) {
(*it)->cb(err, nullptr);
demultiplexer.request_wait_queue.pop_front();
it = demultiplexer.request_wait_queue.begin();
}
}
void handle_async_request(const boost::system::error_code &err) {
auto wait_queue_end_it = demultiplexer.request_wait_queue.end();
auto wait_queue_it = demultiplexer.request_wait_queue.begin();
int _err = LDAP_SUCCESS;
bool failover_in_progress = demultiplexer.failover_in_progress;
if (!demultiplexer.connected) {
return;
}
if (failover_in_progress) {
return;
}
while (wait_queue_it != wait_queue_end_it) {
auto now = std::chrono::steady_clock::now();
if (demultiplexer.busy_until > now) {
// busy_until has not come yet
if (demultiplexer.response_wait_map.size() > std::min(demultiplexer.max_concurrent_requests, demultiplexer.active_requests_on_last_busy * 8 / 10)) {
// active requests are more than 80% of the active requests on the last busy or the fixed limit
break;
}
}
else {
if (demultiplexer.active_requests_on_last_busy == 0) {
// has not reached busy state yet
if (demultiplexer.response_wait_map.size() > demultiplexer.max_concurrent_requests) {
// the fixed limit
break;
}
}
else {
auto time_passed_since_busy = now - demultiplexer.busy_until;
auto ratio = 1000 * time_passed_since_busy.count() / std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::seconds(search_timeout.tv_sec)).count();
if (demultiplexer.response_wait_map.size() > std::min(demultiplexer.max_concurrent_requests, (int)(demultiplexer.active_requests_on_last_busy * ratio / 1000))) {
// active requests are more than ratio times of the active requests on the last busy or the fixed limit
// ratio is the time passed since busy_until divided by search_timeout
break;
}
}
}
auto status = *wait_queue_it;
// search_timeout here does not mean a blocking operation but the server-side timeout threshold in query processing
_err = ldap_search_ext(ldap, status->spec->user_dn.c_str(), status->spec->scope, status->spec->search_filter.c_str(), status->spec->attributes(), 0, nullptr, nullptr, &search_timeout, size_limit, &(status->msg_id));
if (_err == LDAP_BUSY) {
demultiplexer.busy_until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::seconds(search_timeout.tv_sec));
demultiplexer.active_requests_on_last_busy = demultiplexer.response_wait_map.size(); // valid for search_timeout
break;
}
if (_err) {
status->err = make_error_code(LdapErrc::Error, _err);
if (callback_or_async_failover(status)) {
_err = LDAP_SUCCESS; // reset error
}
else {
failover_in_progress = true;
break;
}
}
else {
demultiplexer.response_wait_map.emplace(status->msg_id, status);
}
demultiplexer.request_wait_queue.pop_front();
wait_queue_it = demultiplexer.request_wait_queue.begin(); // pick up the next request
}
if (!failover_in_progress) {
setup_async_waiters_if_necessary();
}
}
void on_read_timeout(const boost::system::error_code &err) {
demultiplexer.waiting_for_timeout = false;
if (err || demultiplexer.read_timer_cancelled) {
demultiplexer.read_timeout_cancelled_count++;
demultiplexer.read_timer_cancelled = false;
return;
}
demultiplexer.read_timeout_fulfilled_count++;
if (demultiplexer.waiting_for_reading) {
bool to_cancel_read_wait = false;
auto now = std::chrono::steady_clock::now();
if (demultiplexer.response_wait_map.size() == 0) {
to_cancel_read_wait = true;
}
else if (demultiplexer.last_read_wait_issued != std::chrono::steady_clock::time_point::min()) {
auto time_to_cancel = demultiplexer.last_read_wait_issued + std::chrono::milliseconds(1000); // cancel in every second
if (time_to_cancel < now) {
to_cancel_read_wait = true;
}
}
if (to_cancel_read_wait) {
adaptor->async_socket.cancel();
}
}
on_any_response(err);
}
void on_read_wait(const boost::system::error_code &err) {
demultiplexer.waiting_for_reading = false;
if (err) {
demultiplexer.read_wait_cancelled_count++;
return;
}
else {
demultiplexer.read_wait_fulfilled_count++;
}
if (demultiplexer.waiting_for_timeout) {
if (demultiplexer.response_wait_map.size() == 0) {
demultiplexer.read_timeout_cancelling_count++;
demultiplexer.read_timer_cancelled = true;
demultiplexer.read_timer->cancel();
}
}
on_any_response(err);
}
void on_any_response(const boost::system::error_code &err) {
static auto empty_spec = std::make_shared<query_spec>("", "");
auto status_end_it = demultiplexer.response_wait_map.end();
struct timeval zero_tv = {0,0};
int _err;
LDAPMessage *msg;
std::shared_ptr<query_status> status;
typename std::map<int, std::shared_ptr<query_status>>::iterator status_it;
int msg_type;
int msg_id;
BerElement *ber;
bool no_more_response = false;
bool failover_in_progress = demultiplexer.failover_in_progress;
while (!no_more_response && !failover_in_progress) {
msg = nullptr;
msg_type = ldap_result(ldap, LDAP_RES_ANY, LDAP_MSG_ONE, &zero_tv, &msg);
switch (msg_type) {
case _LDAP_RES_ERROR:
ldap_get_option(ldap, LDAP_OPT_RESULT_CODE, &_err);
if (_err) {
auto action = get_action_for_ldap_error(_err);
bool force_failover = false;
switch (action) {
case ACTION_RETHROW_EXCEPTION:
default:
force_failover = true;
// fall through
case ACTION_FAILOVER_TO_NEXT_HOST:
status = std::make_shared<query_status>(query_status{empty_spec, 0, nullptr, make_error_code(LdapErrc::Error, _err)});
callback_or_async_failover(status, force_failover);
failover_in_progress = true;
status_it = status_end_it;
break;
case ACTION_RETURN_NULL_RESULT:
break;
}
}
break;
case _LDAP_RES_TIMEOUT: // no more message to receive non-blockingly
no_more_response = true;
break;
case LDAP_RES_SEARCH_ENTRY: // only 1 entry for each request
msg_id = ldap_msgid(msg);
status_it = demultiplexer.response_wait_map.find(msg_id);
if (status_it == status_end_it) {
// msg_id not found in response_wait_map
}
else {
status = status_it->second;
ber = nullptr;
for (const char *attr = ldap_first_attribute(ldap, msg, &ber); attr; attr = ldap_next_attribute(ldap, msg, ber)) {
if (status->spec->is_target_attribute(attr)) {
BerValue **values = ldap_get_values_len(ldap, msg, attr);
int len = ldap_count_values_len(values);
for(int i = 0; i < len; i++) {
status->spec->store(ldap, msg, attr, values[i]->bv_val, values[i]->bv_len);
}
ldap_value_free_len(values);
}
ber_memfree((void *)attr);
}
if (ber) {
ber_free(ber, FALSE);
}
}
ldap_msgfree(msg);
break;
// TODO: LDAP_RES_SEARCH_REFERENCE
case LDAP_RES_SEARCH_RESULT:
msg_id = ldap_msgid(msg);
status_it = demultiplexer.response_wait_map.find(msg_id);
if (status_it == status_end_it) {
// msg_id not found in response_wait_map
ldap_msgfree(msg);
}
else {
status = status_it->second;
ldap_parse_result(ldap, msg, &_err, nullptr, nullptr, nullptr, nullptr, FALSE);
ldap_msgfree(msg);
status->err = make_error_code((_err ? LdapErrc::Error : LdapErrc::Success), _err);
if (callback_or_async_failover(status)) {
demultiplexer.response_wait_map.erase(status_it);
}
else {
failover_in_progress = true;
}
status_it = status_end_it;
}
break;
default:
if (msg) {
ldap_msgfree(msg);
}
break;
}
}
if (!failover_in_progress && demultiplexer.response_wait_map.size() > 0) {
auto last_response = current_host->last_successful_access;
if (last_response < current_host->last_non_critical_error) {
last_response = current_host->last_non_critical_error;
}
if (last_response < current_host->last_critical_failure) {
last_response = current_host->last_critical_failure;
}
auto now = std::chrono::steady_clock::now();
if (last_response == std::chrono::steady_clock::time_point::min()) {
// timer timed out (100ms) at the first request; the server may be loaded
// treat as a non-critical error to detect timeout after search_timeout duration
// Note: actual timeout occurs 100ms later than search_timeout duration
current_host->last_non_critical_error = now;
last_response = current_host->last_non_critical_error;
}
auto response_timeout_at = last_response + std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::seconds(search_timeout.tv_sec));
if (response_timeout_at < now) {
// no response for search_timeout duration
status = std::make_shared<query_status>(query_status{empty_spec, 0, nullptr, make_error_code(LdapErrc::Error, LDAP_TIMEOUT)});
callback_or_async_failover(status);
}
}
if (!failover_in_progress) {
handle_async_request(err);
}
}
void cancel_async_waiters() {
if (demultiplexer.waiting_for_reading) {
demultiplexer.waiting_for_reading = false;
adaptor->async_socket.cancel();
}
if (demultiplexer.waiting_for_timeout) {
demultiplexer.waiting_for_timeout = false;
demultiplexer.read_timer_cancelled = true;
demultiplexer.read_timer->cancel();
}
}
void setup_async_waiters_if_necessary() {
if (!demultiplexer.waiting_for_reading && demultiplexer.response_wait_map.size() > 0) {
demultiplexer.waiting_for_reading = true;
demultiplexer.read_wait_issued_count++;
demultiplexer.last_read_wait_issued = std::chrono::steady_clock::now();
adaptor->async_socket.async_wait(boost::asio::ip::tcp::socket::wait_read,
boost::bind(&ldap_search::on_read_wait, this, boost::asio::placeholders::error));
}
if (!demultiplexer.waiting_for_timeout && demultiplexer.response_wait_map.size() > 0) {
demultiplexer.waiting_for_timeout = true;
demultiplexer.read_timeout_issued_count++;
demultiplexer.read_timeout = std::chrono::milliseconds(100);
demultiplexer.read_timer = std::make_shared<boost::asio::steady_timer>(adaptor->context);
demultiplexer.read_timer->expires_after(demultiplexer.read_timeout);
demultiplexer.read_timer->async_wait(
boost::bind(&ldap_search::on_read_timeout, this, boost::asio::placeholders::error)
);
}
}
#undef CALLBACK_ON_ERROR
#undef CALLBACK_WITH_CAUGHT_EXCEPTION
public:
void dump(std::ostream &out, std::string prefix = "", bool concise = true) {
out << prefix << "active_requests = " << demultiplexer.response_wait_map.size() << std::endl;
out << prefix << "queued requests = " << demultiplexer.request_wait_queue.size() << std::endl;
if (!concise) {
out << prefix << "waiting_for_reading = " << demultiplexer.waiting_for_reading << std::endl;
out << prefix << "waiting_for_timeout = " << demultiplexer.waiting_for_timeout << std::endl;
out << prefix << "read_wait_issued_count = " << demultiplexer.read_wait_issued_count << std::endl;
out << prefix << "read_wait_fulfilled_count = " << demultiplexer.read_wait_fulfilled_count << std::endl;
out << prefix << "read_wait_cancelled_count = " << demultiplexer.read_wait_cancelled_count << std::endl;
out << prefix << "read_wait_issued_count - (read_wait_fulfilled_count + read_wait_cancelled_count) = "
<< (demultiplexer.read_wait_issued_count - (demultiplexer.read_wait_fulfilled_count + demultiplexer.read_wait_cancelled_count))
<< std::endl;
out << prefix << "read_timeout_issued_count = " << demultiplexer.read_timeout_issued_count << std::endl;
out << prefix << "read_timeout_fulfilled_count = " << demultiplexer.read_timeout_fulfilled_count << std::endl;
out << prefix << "read_timeout_cancelled_count = " << demultiplexer.read_timeout_cancelled_count << std::endl;
out << prefix << "read_timeout_cancelling_count = " << demultiplexer.read_timeout_cancelling_count << std::endl;
out << prefix << "read_timeout_issued_count - (read_timeout_fulfilled_count + read_timeout_cancelled_count) = "
<< (demultiplexer.read_timeout_issued_count - (demultiplexer.read_timeout_fulfilled_count + demultiplexer.read_timeout_cancelled_count))
<< std::endl;
auto host_it = ldap_hosts.begin();
while (host_it != ldap_hosts.end()) {
out << prefix << "ldap_host: " << host_it->host_name << (host_it == current_host ? " (current_host) " : " ")
<< "connected_counter = " << host_it->connected_counter
<< " non_critical_error_counter = " << host_it->non_critical_error_counter
<< " critical_failure_counter = " << host_it->critical_failure_counter << std::endl;
host_it++;
}
}
}
#endif
protected:
LDAP *ldap;
int ldap_version;
struct timeval network_timeout;
struct timeval search_timeout;
int size_limit;
std::string ca_file;
std::list<ldap_host> ldap_hosts;
using host_iterator = typename std::list<ldap_host>::iterator;
host_iterator current_host;
int ldap_port;
std::string bind_user;
std::string bind_password;
#if ASYNC_LDAP_ACCESS
std::shared_ptr<async_adaptor> adaptor;
struct _connect_status {
bool is_tcp_connected;
bool is_tcp_connect_timed_out;
bool is_tls_connected;
bool is_tls_connect_timed_out;
int bind_msg_id;
connect_cb cb;
std::shared_ptr<boost::asio::steady_timer> timer;
} connect_status;
struct _failover_status {
int round;
bool giving_up;
connect_cb cb;
host_iterator previous_host;
std::shared_ptr<boost::asio::steady_timer> timer;
} failover_status;
query_demultiplexer demultiplexer;
#endif
};
class ldap_member_of_query_spec: public ldap_query_spec {
public:
typedef std::shared_ptr<std::list<std::string>> result_type; // this is a simple result type for a single attribute with multiple values; no key (attribute name) is required
//enum class query_type { member_of };
static std::string &member_of() { static std::string member_of = "memberOf"; return member_of; }
static char **_attributes() { static char *attributes[] = { &member_of()[0], nullptr }; return attributes; }
ldap_member_of_query_spec(std::string user_dn, std::string search_filter /*, query_type type = query_type::member_of */)
: ldap_query_spec(user_dn, search_filter, LDAP_SCOPE_BASE), result(std::make_shared<std::list<std::string>>()) /*, type(type) */ {
}
char **attributes() { /* switch (type) {...} */ return ldap_member_of_query_spec::_attributes(); }
void store(LDAP *ldap, LDAPMessage *msg, const char *attr, char *value, std::size_t len) {
//char *dn = ldap_get_dn(ldap, msg); ldap_memfree(dn); // useful for LDAP_SCOPE_ONELEVEL and LDAP_SCOPE_SUBTREE
/* switch (type) {...} */
result->push_back(std::move(std::string(value, len)));
}
void clear_result() {
result->clear();
}
bool is_target_attribute(const char *attr) { /* switch (type) {...} */ return member_of() == attr; }
result_type result;
//query_type type; // optionally define a query_type member to distinguish multiple types of queries in a single query_spec class
};
class ldap_member_of: public ldap_search<ldap_member_of_query_spec> {
public:
using ldap_search<ldap_member_of_query_spec>::ldap_search;
ldap_member_of_query_spec::result_type sync_member_of(std::string user_dn, std::string search_filter = "(objectClass=user)") {
ldap_member_of_query_spec spec{user_dn, search_filter /*, ldap_member_of_query_spec::query_type::member_of */};
sync_search(spec);
return spec.result;
}
#if ASYNC_LDAP_ACCESS
void async_member_of(std::string user_dn, search_cb cb, std::string search_filter = "(objectClass=user)") {
auto spec = std::make_shared<ldap_member_of_query_spec>(user_dn, search_filter /*, ldap_member_of_query_spec::query_type::member_of */);
async_search(spec, cb);
}
#endif
// optionally define methods with query_types other than ldap_member_of_query_spec::query_type::member_of
};
#if ASYNC_LDAP_ACCESS
#define SYNC_REPEAT_ACCESS 0
#endif
int main(int argc, char * argv[])
{
std::cout << argv[0] << " has to be linked against libldap_r library to work in threaded environments" << std::endl;
if (argc < 6) {
#if ASYNC_LDAP_ACCESS
std::cout << argv[0] << " [ldap_hosts_csv] [ca_file] [bind_user] [bind_password] [user_dn] [repetition] [max_concurrent_req] [num_workers]" << std::endl;
#else
std::cout << argv[0] << " [ldap_hosts_csv] [ca_file] [bind_user] [bind_password] [user_dn]" << std::endl;
#endif
return 0;
}
signal(SIGPIPE, SIG_IGN); // ad-hoc fix
std::string ldap_hosts = argv[1];
std::string ca_file = argv[2];
std::string bind_user = argv[3];
std::string bind_password = argv[4];
std::string user_dn = argv[5];
int ldap_port = 636;
int network_timeout = 5; // sec
int search_timeout = 5; // sec
int size_limit = 0;
#if ASYNC_LDAP_ACCESS
int repetition = std::stoi(argc > 6 ? argv[6] : "10000");
int max_concurrent_request = std::stoi(argc > 7 ? argv[7] : "100");
int num_workers = std::stoi(argc > 8 ? argv[8] : "1");
#endif
try {
auto query = std::make_shared<ldap_member_of>(ldap_hosts, ca_file, bind_user, bind_password, ldap_port, network_timeout, search_timeout, size_limit);
query->sync_connect();
std::shared_ptr<std::list<std::string>> result;
try {
result = query->sync_member_of(user_dn);
if (result) {
for (auto group_dn: *result) {
std::cout << "group_dn: " << group_dn << std::endl;
}
}
else {
std::cout << "null result" << std::endl;
}
#if ASYNC_LDAP_ACCESS
#if SYNC_REPEAT_ACCESS
boost::posix_time::ptime mst1;
boost::posix_time::ptime mst2;
mst1 = boost::posix_time::microsec_clock::local_time();
for (int i = 0; i < repetition; i++) {
result = query->sync_member_of(user_dn);
}
mst2 = boost::posix_time::microsec_clock::local_time();
boost::posix_time::time_duration msdiff = mst2 - mst1;
std::cout << "query->sync_member_of() sequentially called " << repetition << " times in " << msdiff.total_milliseconds() << "ms" << std::endl;
#endif
#endif
}
catch (ldap_exception &e) {
switch (e.err) {
case LDAP_TIMEOUT:
default:
throw;
}
}
}
catch (ldap_exception &e) {
std::cerr << "ldap_exception thrown: " << e.err << " " << e.what() << std::endl;
}
catch (std::exception &e) {
std::cerr << "exception thrown: " << e.what() << std::endl;
}
#if ASYNC_LDAP_ACCESS
try {
auto worker = [ldap_hosts, ca_file, bind_user, bind_password, ldap_port, user_dn, network_timeout, search_timeout, size_limit, max_concurrent_request, repetition](int i, std::promise<std::string> p){
boost::asio::io_context io_context;
std::string dump_prefix = "[" + std::to_string(i) + "] ";
auto query = std::make_shared<ldap_member_of>(io_context, ldap_hosts, ca_file, bind_user, bind_password, ldap_port, network_timeout, search_timeout, size_limit, max_concurrent_request);
auto requested = std::make_shared<int>(0);
auto responded = std::make_shared<int>(0);
auto errored = std::make_shared<int>(0);
boost::posix_time::ptime mst1;
boost::posix_time::ptime mst2;
mst1 = boost::posix_time::microsec_clock::local_time();
auto on_response_cb = [i, responded, errored, repetition, mst1, &query, &io_context, dump_prefix](const boost::system::error_code &err, ldap_member_of_query_spec::result_type result){
(*responded)++;
if (!err) {
if (result) {
if (*responded % 10000 == 0 || *responded == repetition) {
std::cout << "[" << i << "] responded = " << *responded << " (including errored = " << *errored << ")" << std::endl;
query->dump(std::cout, dump_prefix);
boost::posix_time::ptime mst4;
mst4 = boost::posix_time::microsec_clock::local_time();
boost::posix_time::time_duration msdiff = mst4 - mst1;
std::cout << "[" << i << "] query->async_member_of() callback called in " << msdiff.total_milliseconds() << "ms" << std::endl;
}
}
else {
std::cout << "null result" << std::endl; // unexpected in normal cases
}
}
else {
(*errored)++;
if (err.category() == LdapErrc_category() && err.value() == LDAP_NO_SUCH_OBJECT) {
std::cout << "no such object" << std::endl;
}
else {
std::cout << "on_response_cb " << *responded << " err = " << (err.category() == LdapErrc_category() ? ldap_err2string(err.value()) : err.message()) << std::endl;
}
}
};
boost::posix_time::ptime mst5 = boost::posix_time::microsec_clock::local_time();
struct timer_container {
timer_container(boost::asio::io_context &io_context): timer(io_context, std::chrono::milliseconds(10)) {}
boost::asio::steady_timer timer;
std::function<void (const boost::system::error_code &err)> request_timer_cb;
};
auto container = std::make_shared<timer_container>(io_context);
container->request_timer_cb = [i, requested, responded, repetition, max_concurrent_request, mst5, &query, &io_context, container, user_dn, on_response_cb](const boost::system::error_code &err) {
while (*requested < repetition && *requested - *responded <= max_concurrent_request * 5) { // until 5 * max_concurrent_request are enqueued
query->async_member_of(user_dn, on_response_cb); // request without explicit async_connect() call
(*requested)++;
}
if (*requested < repetition) {
container->timer.async_wait(container->request_timer_cb);
}
else {
boost::posix_time::ptime mst6 = boost::posix_time::microsec_clock::local_time();
boost::posix_time::time_duration msdiff2 = mst6 - mst5;
std::cout << "[" << i << "] request completed in " << msdiff2.total_milliseconds() << "ms" << std::endl;
container->request_timer_cb = nullptr;
}
};
container->timer.async_wait(container->request_timer_cb);
/*
while (*requested < repetition) {
query->async_member_of(user_dn, on_response_cb); // request without explicit async_connect() call
(*requested)++;
}
*/
query->async_connect([&mst1, &mst2, &query, user_dn, &io_context, repetition](const boost::system::error_code &err){
mst2 = boost::posix_time::microsec_clock::local_time();
boost::posix_time::time_duration msdiff = mst2 - mst1;
std::cout << "query->async_connect() callback called in " << msdiff.total_milliseconds() << "ms" << std::endl;
if (err) {
std::cout << "connect error_code " << err.message() << std::endl;
}
else {
std::cout << "connect successful" << std::endl;
query->async_member_of(user_dn, [](const boost::system::error_code &err, ldap_member_of_query_spec::result_type result){
if (err) {
std::cout << "async_member_of error_code " << err.message() << std::endl;
}
else {
if (result) {
for (auto group_dn: *result) {
std::cout << "group_dn: " << group_dn << std::endl;
}
}
else {
std::cout << "null result" << std::endl;
}
}
});
}
});
io_context.run();
std::cout << "[" << i << "] io_context.run() exited" << std::endl;
std::ostringstream oss;
query->disconnect();
query->dump(oss, dump_prefix, false);
p.set_value(oss.str());
};
std::vector<std::thread> workers;
std::vector<std::future<std::string>> futures;
boost::posix_time::ptime mst1;
boost::posix_time::ptime mst2;
mst1 = boost::posix_time::microsec_clock::local_time();
if (num_workers > 1) {
for (int i = 0; i < num_workers; i++) {
std::promise<std::string> p;
auto f = p.get_future();
futures.push_back(std::move(f));
workers.push_back(std::thread(worker, i, std::move(p)));
}
for (int i = 0; i < num_workers; i++) {
workers[i].join();
std::cout << "[" << i << "] worker joined" << std::endl;
}
for (int i = 0; i < num_workers; i++) {
std::cout << futures[i].get();
}
}
else {
std::promise<std::string> p;
std::future<std::string> f = p.get_future();
worker(0, std::move(p));
std::cout << f.get();
}
mst2 = boost::posix_time::microsec_clock::local_time();
boost::posix_time::time_duration msdiff = mst2 - mst1;
std::cout << repetition << " requests x " << num_workers << " worker thread(s) = "
<< (repetition * num_workers) << " total requests completed in " << msdiff.total_milliseconds() << "ms" << std::endl;
}
catch (ldap_exception &e) {
std::cerr << "ldap_exception thrown: " << e.err << " " << e.what() << std::endl;
}
catch (std::exception &e) {
std::cerr << "exception thrown: " << e.what() << std::endl;
}
#endif // ASYNC_LDAP_ACCESS
return 0;
}
@t2ym
Copy link
Author

t2ym commented May 28, 2021

Issues

  • SIGPIPE signal on disconnect() for reconnection after session timeout from Active Directory Server (MaxConnIdleTime=900)
    • libgnutls is unexpectedly linked instead of openssl for Ubuntu package; Not for sources with configure --with-tls openssl
      • Package: libldap-2.4-2 Version: 2.4.49+dfsg-2ubuntu1.8 Depends: libc6 (>= 2.28), libgnutls30 (>= 3.6.12), libgssapi3-heimdal (>= 1.4.0+git20110226), libsasl2-2 (>= 2.1.27+dfsg), libldap-common
    • reproducible with openssl as well
    • Ad-hoc Fix signal(SIGPIPE, SIG_IGN)
      • sync_failover() works
    • under investigation: Are keepalive requests feasible? -> Needs threads
  • [Potential Issue] in sync_fallback(), sleep_for() might be called with negative duration, which immediately returns without sleeping with GNU libstdc++ and LLVM libc++

Design Issues

@t2ym
Copy link
Author

t2ym commented Jun 5, 2021

[WIP] State machine diagram for async connection and searching

  • tcp connected state is via a canary tcp socket just for checking tcp port connectivity and thus the socket is discarded before its following actual LDAPS connection. This behavior is to mitigate side effects of blocking SSL_connect() calls to an inexistent/down host or port in ldap_sasl_bind()
  • async_member_of() can be called even in disconnected state, which triggers an async_connect process to establish a new connection
  • start failover state has 2 modes, triggered by an async_connect() failure or by an async_member_of() failure
    • They are not precisely represented in the diagram. The 2 modes have different connect_status.cb

@t2ym
Copy link
Author

t2ym commented Jun 9, 2021

Patch to cross-compile libldap from openldap-2.4.59 with Android NDK r21e

  • liblutil must come AFTER libldap in linking
diff -r -p openldap-2.4.59.orig/build/top.mk openldap-2.4.59/build/top.mk
*** openldap-2.4.59.orig/build/top.mk	2021-06-04 03:40:31.000000000 +0900
--- openldap-2.4.59/build/top.mk	2021-06-08 19:04:12.621833780 +0900
*************** LDAP_LIBREWRITE_A = $(LDAP_LIBDIR)/libre
*** 170,177 ****
  LDAP_LIBLUNICODE_A = $(LDAP_LIBDIR)/liblunicode/liblunicode.a
  LDAP_LIBLUTIL_A = $(LDAP_LIBDIR)/liblutil/liblutil.a
  
! LDAP_L = $(LDAP_LIBLUTIL_A) \
! 	$(LDAP_LIBLDAP_LA) $(LDAP_LIBLBER_LA)
  SLAPD_L = $(LDAP_LIBLUNICODE_A) $(LDAP_LIBREWRITE_A) \
  	$(LDAP_LIBLUTIL_A) $(LDAP_LIBLDAP_R_LA) $(LDAP_LIBLBER_LA)
  
--- 170,177 ----
  LDAP_LIBLUNICODE_A = $(LDAP_LIBDIR)/liblunicode/liblunicode.a
  LDAP_LIBLUTIL_A = $(LDAP_LIBDIR)/liblutil/liblutil.a
  
! LDAP_L = \
! 	$(LDAP_LIBLDAP_LA) $(LDAP_LIBLUTIL_A) $(LDAP_LIBLBER_LA)
  SLAPD_L = $(LDAP_LIBLUNICODE_A) $(LDAP_LIBREWRITE_A) \
  	$(LDAP_LIBLUTIL_A) $(LDAP_LIBLDAP_R_LA) $(LDAP_LIBLBER_LA)
  
diff -r -p openldap-2.4.59.orig/libraries/librewrite/Makefile.in openldap-2.4.59/libraries/librewrite/Makefile.in
*** openldap-2.4.59.orig/libraries/librewrite/Makefile.in	2021-06-04 03:40:31.000000000 +0900
--- openldap-2.4.59/libraries/librewrite/Makefile.in	2021-06-08 18:36:32.695442446 +0900
*************** LDAP_LIBDIR= ../../libraries
*** 28,35 ****
  
  LIBRARY = librewrite.a
  PROGRAMS	= rewrite
! XLIBS = $(LIBRARY) $(LDAP_LIBLUTIL_A) \
! 	$(LDAP_LIBLDAP_R_LA) $(LDAP_LIBLBER_LA)
  XXLIBS  = $(SECURITY_LIBS) $(LUTIL_LIBS)
  XXXLIBS = $(LTHREAD_LIBS)
  
--- 28,35 ----
  
  LIBRARY = librewrite.a
  PROGRAMS	= rewrite
! XLIBS = $(LIBRARY) \
! 	$(LDAP_LIBLDAP_R_LA) $(LDAP_LIBLUTIL_A) $(LDAP_LIBLBER_LA)
  XXLIBS  = $(SECURITY_LIBS) $(LUTIL_LIBS)
  XXXLIBS = $(LTHREAD_LIBS)

Configuration to build libldap_r

  • TARGET, PREFIX, CC, etc. have to be configured for Android NDK
  • Dependencies have to be installed into $PREFIX/include, $PREFIX/lib, and $PREFIX/bin
  • make install
    • strip command has to be linked to an appropriate version of llvm-strip in NDK and included in $PATH
./configure --prefix=$PREFIX \
    --host=$TARGET \
    --disable-shared \
    --disable-slapd \
    --with-tls=openssl \
    --with-threads=yes \
    --with-cyrus-sasl=no \
    --with-yielding_select=yes \
    CFLAGS="-g -O2 -I$PREFIX/include " LDFLAGS="-L$PREFIX/lib "

Issues

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment