Skip to content

Instantly share code, notes, and snippets.

@t2ym
Last active June 9, 2021 06:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save t2ym/a185c7daafbd4f938321a7e4018f90a3 to your computer and use it in GitHub Desktop.
Save t2ym/a185c7daafbd4f938321a7e4018f90a3 to your computer and use it in GitHub Desktop.
Prototype for accessor of memberOf attributes for LDAP users to be used for nicexprs.h middleware
/*
* Copyright (c) 2021, Tetsuya Mori <t2y3141592@gmail.com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <stdio.h>
#include <ldap.h>
#include <inttypes.h>
#include <stdlib.h>
#include <signal.h>
#include <iostream>
#include <sstream>
#include <list>
#include <memory>
#include <chrono>
#include <thread>
#include <future>
#define HOST_CONNECT_LOG 1
#define ASYNC_LDAP_ACCESS 1
#if ASYNC_LDAP_ACCESS
#include <boost/asio.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
// boost custom error_code based on the boilerplate
// at https://www.boost.org/doc/libs/1_76_0/libs/outcome/doc/html/motivation/plug_error_code2.html
#include <boost/system/error_code.hpp>
enum class LdapErrc { // actually a bool
Success = 0,
Error = 1 // not enumerate all errors from LDAP
};
namespace boost {
namespace system {
template <> struct is_error_code_enum<LdapErrc> : std::true_type {};
}
}
namespace detail {
class LdapErrc_category : public boost::system::error_category {
public:
virtual const char *name() const noexcept override final { return "LdapError"; }
virtual std::string message(int err) const override final {
char *err_msg = ldap_err2string(err);
return err_msg ? err_msg : "unknown LDAP error";
}
virtual boost::system::error_condition default_error_condition(int err) const noexcept override final {
// I have no mapping for this code
return boost::system::error_condition(err, *this);
}
};
}
#define THIS_MODULE_API_DECL extern inline
THIS_MODULE_API_DECL const detail::LdapErrc_category &LdapErrc_category() {
static detail::LdapErrc_category err;
return err;
}
inline boost::system::error_code make_error_code(LdapErrc err, int ldap_err) {
// trick to make an error_code instance with ldap_err; err is actually ignored
return {static_cast<int>(ldap_err), LdapErrc_category()};
}
class async_adaptor {
public:
enum { SOCKET_NOT_ASSIGNED = -1 };
typedef std::function<void (const boost::system::error_code &err)> async_cb;
async_adaptor(boost::asio::io_context &context)
: socket_fd(SOCKET_NOT_ASSIGNED), context(context), async_socket(context), canary_socket(context), async_resolver(context) {}
~async_adaptor() {
assign(SOCKET_NOT_ASSIGNED);
}
void assign(int fd) {
if (socket_fd >= 0) {
async_socket.cancel();
async_socket.release();
}
if ((socket_fd = fd) >= 0) {
async_socket.assign(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0).protocol(), socket_fd);
}
else {
socket_fd = SOCKET_NOT_ASSIGNED;
}
}
int socket_fd;
boost::asio::io_context &context;
boost::asio::ip::tcp::socket async_socket;
boost::asio::ip::tcp::socket canary_socket; // check tcp port connectivity before connection to mitigate side-effects of blocking SSL_connect()
boost::asio::ip::tcp::resolver async_resolver;
};
#endif // ASYNC_LDAP_ACCESS
class ldap_exception: public std::exception {
public:
static void throw_on_error(int err) { if (err != LDAP_SUCCESS) throw ldap_exception(err); }
using std::exception::exception;
ldap_exception(int err): err(err) {
char *err_msg = ldap_err2string(err);
if (err_msg) {
message = err_msg;
}
else {
message = "unknown error ";
message += err;
}
}
virtual const char *what() const noexcept { return message.c_str(); }
int err;
std::string message;
};
class ldap_query_spec {
public:
ldap_query_spec(std::string user_dn, std::string search_filter, int scope = LDAP_SCOPE_BASE)
: user_dn(user_dn), search_filter(search_filter), scope(scope) {}
virtual ~ldap_query_spec() {}
virtual char **attributes() = 0;
virtual void store(LDAP *ldap, LDAPMessage *msg, const char *attr, char *value, std::size_t len) = 0;
virtual void clear_result() = 0;
virtual bool is_target_attribute(const char *attr) = 0;
std::string user_dn;
std::string search_filter;
int scope;
// result must be defined in derived classes
};
template <class query_spec>
class ldap_search {
public:
typedef enum {
_LDAP_RES_TIMEOUT = 0,
_LDAP_RES_ERROR = -1
} ldap_result_err;
typedef enum {
TRUE = 1 /* not comparable */, FALSE = 0
} ldap_boolean_int;
typedef enum {
ACTION_RETHROW_EXCEPTION,
ACTION_FAILOVER_TO_NEXT_HOST,
ACTION_RETURN_NULL_RESULT
} action_on_ldap_exception;
struct ldap_host {
std::string host_name;
std::chrono::steady_clock::time_point last_successful_access;
std::chrono::steady_clock::time_point last_non_critical_error; // ACTION_RETURN_NULL_RESULT
std::chrono::steady_clock::time_point last_critical_failure; // ACTION_RETHROW_EXCEPTION
int connected_counter;
int non_critical_error_counter;
int critical_failure_counter;
};
ldap_search(std::string hosts, std::string ca_file, std::string bind_user, std::string bind_password,
int port = 636, time_t network_timeout_sec = 5, time_t search_timeout_sec = 5, int size_limit = 0)
: ldap(nullptr), ldap_version(LDAP_VERSION3), ca_file(ca_file), ldap_port(port),
network_timeout({network_timeout_sec, 0}), search_timeout({search_timeout_sec, 0}),
size_limit(size_limit), bind_user(bind_user), bind_password(bind_password) {
set_ldap_hosts(hosts);
}
#if ASYNC_LDAP_ACCESS
// Each instance is associated to an io_context and not shared among multiple threads
ldap_search(boost::asio::io_context &context, std::string hosts, std::string ca_file, std::string bind_user, std::string bind_password,
int port = 636, time_t network_timeout_sec = 5, time_t search_timeout_sec = 5, int size_limit = 0, int max_concurrent_requests = 100)
: adaptor(std::make_shared<async_adaptor>(context)),
ldap(nullptr), ldap_version(LDAP_VERSION3), ca_file(ca_file), ldap_port(port),
network_timeout({network_timeout_sec, 0}), search_timeout({search_timeout_sec, 0}), demultiplexer(max_concurrent_requests),
size_limit(size_limit), bind_user(bind_user), bind_password(bind_password) {
set_ldap_hosts(hosts);
}
#endif // ASYNC_LDAP_ACCESS
~ldap_search() {
disconnect();
}
ldap_search &sync_connect() {
try {
return do_sync_connect();
}
catch (ldap_exception &e) {
return sync_failover();
}
}
ldap_search &disconnect() {
if (ldap) {
#if HOST_CONNECT_LOG
std::cerr << "disconnect()" << std::endl;
#endif
ldap_unbind_ext_s(ldap, nullptr, nullptr);
ldap = nullptr;
}
#if ASYNC_LDAP_ACCESS
demultiplexer.connected = false;
#endif
return *this;
}
void sync_search(query_spec &spec) {
for (;;) {
try {
do_sync_search(spec);
current_host->last_successful_access = std::chrono::steady_clock::now();
return;
}
catch (ldap_exception &e) {
action_on_ldap_exception action = get_action_for_ldap_error(e.err);
switch (action) {
case ACTION_FAILOVER_TO_NEXT_HOST:
sync_failover();
break;
case ACTION_RETURN_NULL_RESULT:
current_host->last_non_critical_error = std::chrono::steady_clock::now();
current_host->non_critical_error_counter++;
spec.result = nullptr;
return;
case ACTION_RETHROW_EXCEPTION:
default:
current_host->last_critical_failure = std::chrono::steady_clock::now();
current_host->critical_failure_counter++;
throw;
}
}
}
}
protected:
void set_ldap_hosts(std::string &hosts) {
std::istringstream iss(hosts + ",");
std::string host;
while (!std::getline(iss, host, ',').eof()) {
ldap_hosts.push_back(ldap_host{
host,
std::chrono::steady_clock::time_point::min(),
std::chrono::steady_clock::time_point::min(),
std::chrono::steady_clock::time_point::min(),
0, 0, 0
});
}
#if HOST_CONNECT_LOG
for (auto i: ldap_hosts) {
std::cerr << "ldap_host:" << i.host_name << std::endl;
}
#endif
current_host = ldap_hosts.begin();
}
virtual std::string get_ldap_url() { return "ldaps://" + current_host->host_name + ":" + std::to_string(ldap_port); }
void do_connect_common() {
//int debug_level = 65535;
//ldap_set_option(ldap, LDAP_OPT_DEBUG_LEVEL, &debug_level);
ldap_exception::throw_on_error(ldap_initialize(&ldap, get_ldap_url().c_str()));
ldap_exception::throw_on_error(ldap_set_option(ldap, LDAP_OPT_PROTOCOL_VERSION, &ldap_version));
ldap_exception::throw_on_error(ldap_set_option(ldap, LDAP_OPT_NETWORK_TIMEOUT, &network_timeout));
ldap_exception::throw_on_error(ldap_set_option(ldap, LDAP_OPT_SIZELIMIT, &size_limit));
int opt = LDAP_OPT_X_TLS_NEVER; // no StartTLS
ldap_exception::throw_on_error(ldap_set_option(ldap, LDAP_OPT_X_TLS_REQUIRE_CERT, &opt));
char *current_ca_file_ptr = nullptr;
ldap_exception::throw_on_error(ldap_get_option(NULL, LDAP_OPT_X_TLS_CACERTFILE, &current_ca_file_ptr));
bool preserve_current_ca_file = false;
if (current_ca_file_ptr) {
std::string current_ca_file(current_ca_file_ptr);
ldap_memfree(current_ca_file_ptr);
if (ca_file == current_ca_file) {
preserve_current_ca_file = true;
}
}
if (!preserve_current_ca_file) {
ldap_exception::throw_on_error(ldap_set_option(NULL, LDAP_OPT_X_TLS_CACERTFILE, ca_file.c_str()));
}
}
ldap_search &do_sync_connect() {
#if HOST_CONNECT_LOG
std::cerr << "do_sync_connect() on " << get_ldap_url() << std::endl;
#endif
do_connect_common();
BerValue *cred_p, password_ber({bind_password.size(), &bind_password[0]});
ldap_exception::throw_on_error(ldap_sasl_bind_s(ldap, bind_user.c_str(), LDAP_SASL_SIMPLE, &password_ber, nullptr, nullptr, &cred_p));
return *this;
}
ldap_search &sync_reconnect() {
disconnect();
return do_sync_connect();
}
ldap_search &sync_failover() {
auto previous_host = current_host;
int round = 0;
bool connected = false;
bool giving_up = false;
while (!connected) {
current_host++;
if (current_host == ldap_hosts.end()) {
current_host = ldap_hosts.begin();
}
if (current_host == previous_host) {
round++;
}
try {
switch (round) {
case 0:
// no wait on round 0
break;
case 1:
if (current_host == previous_host) {
// no wait on previous_host on round 1
break;
}
// fall-through
case 2:
case 3:
// insert a wait on round 1, 2, 3 for synchronous search
std::this_thread::sleep_for(
std::chrono::seconds(network_timeout.tv_sec) -
(std::chrono::steady_clock::now() - current_host->last_critical_failure));
break;
default:
// give up connection on rount 4 for synchronous search
giving_up = true;
break;
}
sync_reconnect();
connected = true;
}
catch (ldap_exception &e) {
current_host->last_critical_failure = std::chrono::steady_clock::now();
current_host->critical_failure_counter++;
if (giving_up) {
throw;
}
}
}
return *this;
}
action_on_ldap_exception get_action_for_ldap_error(int err) {
action_on_ldap_exception action = ACTION_RETHROW_EXCEPTION;
switch (err) {
/* LDAPv3 (RFC 4511) codes */
case LDAP_SUCCESS: // Success
break; // unexpected
case LDAP_TIMELIMIT_EXCEEDED: // Time limit exceeded
action = ACTION_FAILOVER_TO_NEXT_HOST; break;
case LDAP_REFERRAL: // Referral
action = ACTION_RETURN_NULL_RESULT; break;
case LDAP_ADMINLIMIT_EXCEEDED: // Administrative limit exceeded
action = ACTION_FAILOVER_TO_NEXT_HOST; break;
case LDAP_SASL_BIND_IN_PROGRESS: // SASL bind in progress
action = ACTION_FAILOVER_TO_NEXT_HOST; break;
case LDAP_NO_SUCH_OBJECT: // No such object
action = ACTION_RETURN_NULL_RESULT; break;
case LDAP_BUSY: // Server is busy
case LDAP_UNAVAILABLE: // Server is unavailable
action = ACTION_FAILOVER_TO_NEXT_HOST; break;
case LDAP_LOOP_DETECT: // Loop detected
action = ACTION_RETURN_NULL_RESULT; break;
/* LDAPv2 (RFC 1777) codes */
case LDAP_PARTIAL_RESULTS: // Partial results and referral received
case LDAP_IS_LEAF: // Entry is a leaf
action = ACTION_RETURN_NULL_RESULT; break;
/* Proxied Authorization Control (RFC 4370 and I-D) codes */
case LDAP_PROXIED_AUTHORIZATION_DENIED: // Proxied Authorization Denied
case LDAP_X_PROXY_AUTHZ_FAILURE: // Proxy Authorization Failure (X)
action = ACTION_FAILOVER_TO_NEXT_HOST; break;
/* API codes - renumbered since draft-ietf-ldapext-ldap-c-api */
case LDAP_SERVER_DOWN: // Can't contact LDAP server
action = ACTION_FAILOVER_TO_NEXT_HOST; break;
case LDAP_TIMEOUT: // Timed out
action = ACTION_FAILOVER_TO_NEXT_HOST; break;
#if EXPLICIT_CASE_ENTRIES_FOR_DEFAULT // ACTION_RETHROW_EXCEPTION
/* LDAPv3 (RFC 4511) codes */
case LDAP_OPERATIONS_ERROR: // Operations error
case LDAP_PROTOCOL_ERROR: // Protocol error
case LDAP_SIZELIMIT_EXCEEDED: // Size limit exceeded
case LDAP_COMPARE_FALSE: // Compare False
case LDAP_COMPARE_TRUE: // Compare True
case LDAP_STRONG_AUTH_NOT_SUPPORTED: // Authentication method not supported
case LDAP_STRONG_AUTH_REQUIRED: // Strong(er) authentication required
case LDAP_UNAVAILABLE_CRITICAL_EXTENSION: // Critical extension is unavailable
case LDAP_CONFIDENTIALITY_REQUIRED: // Confidentiality required
case LDAP_NO_SUCH_ATTRIBUTE: // No such attribute
case LDAP_UNDEFINED_TYPE: // Undefined attribute type
case LDAP_INAPPROPRIATE_MATCHING: // Inappropriate matching
case LDAP_CONSTRAINT_VIOLATION: // Constraint violation
case LDAP_TYPE_OR_VALUE_EXISTS: // Type or value exists
case LDAP_INVALID_SYNTAX: // Invalid syntax
case LDAP_ALIAS_PROBLEM: // Alias problem
case LDAP_INVALID_DN_SYNTAX: // Invalid DN syntax
case LDAP_ALIAS_DEREF_PROBLEM: // Alias dereferencing problem
case LDAP_INAPPROPRIATE_AUTH: // Inappropriate authentication
case LDAP_INVALID_CREDENTIALS: // Invalid credentials
case LDAP_INSUFFICIENT_ACCESS: // Insufficient access
case LDAP_UNWILLING_TO_PERFORM: // Server is unwilling to perform
case LDAP_NAMING_VIOLATION: // Naming violation
case LDAP_OBJECT_CLASS_VIOLATION: // Object class violation
case LDAP_NOT_ALLOWED_ON_NONLEAF: // Operation not allowed on non-leaf
case LDAP_NOT_ALLOWED_ON_RDN: // Operation not allowed on RDN
case LDAP_ALREADY_EXISTS: // Already exists
case LDAP_NO_OBJECT_CLASS_MODS: // Cannot modify object class
case LDAP_AFFECTS_MULTIPLE_DSAS: // Operation affects multiple DSAs
/* Virtual List View draft */
case LDAP_VLV_ERROR: // Virtual List View error
case LDAP_OTHER: // Other (e.g., implementation specific) error
/* Connection-less LDAP (CLDAP - RFC 1798) code */
case LDAP_RESULTS_TOO_LARGE: // Results too large
/* Cancel Operation (RFC 3909) codes */
case LDAP_CANCELLED: // Cancelled
case LDAP_NO_SUCH_OPERATION: // No Operation to Cancel
case LDAP_TOO_LATE: // Too Late to Cancel
case LDAP_CANNOT_CANCEL: // Cannot Cancel
/* Assert Control (RFC 4528 and old internet-draft) codes */
case LDAP_ASSERTION_FAILED: // Assertion Failed
case LDAP_X_ASSERTION_FAILED: // Assertion Failed (X)
/* Content Sync Operation (RFC 4533 and I-D) codes */
case LDAP_SYNC_REFRESH_REQUIRED: // Content Sync Refresh Required
case LDAP_X_SYNC_REFRESH_REQUIRED: // Content Sync Refresh Required (X)
/* No-Op Control (draft-zeilenga-ldap-noop) code */
case LDAP_X_NO_OPERATION: // No Operation (X)
/* Client Update Protocol (RFC 3928) codes */
case LDAP_CUP_RESOURCES_EXHAUSTED: // LCUP Resources Exhausted
case LDAP_CUP_SECURITY_VIOLATION: // LCUP Security Violation
case LDAP_CUP_INVALID_DATA: // LCUP Invalid Data
case LDAP_CUP_UNSUPPORTED_SCHEME: // LCUP Unsupported Scheme
case LDAP_CUP_RELOAD_REQUIRED: // LCUP Reload Required
case LDAP_TXN_SPECIFY_OKAY: // TXN specify okay
case LDAP_TXN_ID_INVALID: // TXN ID is invalid
/* API codes - renumbered since draft-ietf-ldapext-ldap-c-api */
case LDAP_LOCAL_ERROR: // Local error
case LDAP_ENCODING_ERROR: // Encoding error
case LDAP_DECODING_ERROR: // Decoding error
case LDAP_AUTH_UNKNOWN: // Unknown authentication method
case LDAP_FILTER_ERROR: // Bad search filter
case LDAP_USER_CANCELLED: // User cancelled operation
case LDAP_PARAM_ERROR: // Bad parameter to an ldap routine
case LDAP_NO_MEMORY: // Out of memory
case LDAP_CONNECT_ERROR: // Connect error
case LDAP_NOT_SUPPORTED: // Not Supported
case LDAP_CONTROL_NOT_FOUND: // Control not found
case LDAP_NO_RESULTS_RETURNED: // No results returned
case LDAP_MORE_RESULTS_TO_RETURN: // More results to return
case LDAP_CLIENT_LOOP: // Client Loop
case LDAP_REFERRAL_LIMIT_EXCEEDED: // Referral Limit Exceeded
case LDAP_X_CONNECTING: // Connecting (X)
break;
#endif
default:
break;
}
return action;
}
void do_sync_search(query_spec &spec) {
int err;
int msg_id;
err = ldap_search_ext(ldap, spec.user_dn.c_str(), spec.scope, spec.search_filter.c_str(), spec.attributes(), 0, nullptr, nullptr, &search_timeout, size_limit, &msg_id);
ldap_exception::throw_on_error(err);
LDAPMessage *msg;
int msg_type;
BerElement *ber;
do {
msg = nullptr;
switch (msg_type = ldap_result(ldap, msg_id, LDAP_MSG_ONE, &search_timeout, &msg)) {
case _LDAP_RES_ERROR:
ldap_get_option(ldap, LDAP_OPT_RESULT_CODE, &err);
ldap_exception::throw_on_error(err);
break;
case _LDAP_RES_TIMEOUT:
ldap_abandon_ext(ldap, msg_id, nullptr, nullptr);
ldap_exception::throw_on_error(LDAP_TIMEOUT);
break;
case LDAP_RES_SEARCH_ENTRY:
ber = nullptr;
for (const char *attr = ldap_first_attribute(ldap, msg, &ber); attr; attr = ldap_next_attribute(ldap, msg, ber)) {
if (spec.is_target_attribute(attr)) {
BerValue **values = ldap_get_values_len(ldap, msg, attr);
int len = ldap_count_values_len(values);
for(int i = 0; i < len; i++) {
spec.store(ldap, msg, attr, values[i]->bv_val, values[i]->bv_len);
}
ldap_value_free_len(values);
}
ber_memfree((void *)attr);
}
if (ber) {
ber_free(ber, FALSE);
}
ldap_msgfree(msg);
break;
// TODO: LDAP_RES_SEARCH_REFERENCE
case LDAP_RES_SEARCH_RESULT:
ldap_parse_result(ldap, msg, &err, nullptr, nullptr, nullptr, nullptr, FALSE);
ldap_msgfree(msg);
ldap_exception::throw_on_error(err);
break;
default:
if (msg) {
ldap_msgfree(msg);
}
break;
}
} while (msg_type != LDAP_RES_SEARCH_RESULT);
}
#if ASYNC_LDAP_ACCESS
public:
typedef std::function<void (const boost::system::error_code &err)> connect_cb;
typedef std::function<void (const boost::system::error_code &err, decltype(query_spec::result) result)> search_cb;
void async_connect(connect_cb cb) {
if (demultiplexer.connected) {
// already connected
static boost::system::error_code err;
cb(err);
return;
}
if (demultiplexer.failover_in_progress || demultiplexer.connecting) {
auto original_cb = failover_status.cb;
// hook backed-up original cb; this hooking may be nested by multiple calls of async_connect()
failover_status.cb = [this, cb, original_cb](const boost::system::error_code &err){
original_cb(err);
cb(err);
};
}
else {
demultiplexer.connecting = true;
connect_status.cb = [this, cb](const boost::system::error_code &err){
if (!err) {
cb(err);
handle_async_request(err); // in case async_member_of() has been called without calling async_connect(); does nothing if there are no such requests
}
else {
async_failover(cb, err);
}
};
do_async_connect();
}
}
void async_search(std::shared_ptr<query_spec> spec, search_cb cb) {
auto status = std::make_shared<query_status>(query_status{spec, 0, cb});
demultiplexer.request_wait_queue.push_back(status);
if (demultiplexer.connected) {
handle_async_request(status->err);
}
else {
if (demultiplexer.failover_in_progress || demultiplexer.connecting) {
// callback will be called
}
else {
// not connected nor failover_in_progress, nor connecting
async_connect([](const boost::system::error_code &err){
// handle_async_request() will handle the cb for the request
});
}
}
}
protected:
// A macro to call a callback with an error code if it is not successful
#define CALLBACK_ON_ERROR(cb, err) if (err) { cb(err); return; }
// A catch-clause macro to call a callback with an error code created from an exception,
// which has been created from the same error code,
// so that the error code can be propagated to the original caller which is waiting for a callback call,
// which cannot catch exceptions during the async process
// As exceptions are exceptional, performance overheads by these conversions should be negligible for normal operations
#define CALLBACK_WITH_CAUGHT_EXCEPTION(cb) \
catch (ldap_exception &e) { cb(make_error_code(LdapErrc::Error, e.err)); }\
catch (std::system_error &e) { cb(boost::system::error_code((int)e.code().value(), boost::system::system_category())); }\
catch (...) { cb(make_error_code(LdapErrc::Error, LDAP_LOCAL_ERROR)); } // TODO: properly handle other error categories
void do_async_connect() {
#if HOST_CONNECT_LOG
std::cerr << "do_async_connect() on " << get_ldap_url() << std::endl;
#endif
try {
adaptor->async_resolver.async_resolve(
current_host->host_name, std::to_string(ldap_port),
boost::bind(&ldap_search::on_hostname_resolved, this, boost::asio::placeholders::error, boost::asio::placeholders::iterator));
}
CALLBACK_WITH_CAUGHT_EXCEPTION(connect_status.cb);
}
// resolve ldap hostname asynchronously before ldap_initialize to cache the host IP address locally
void on_hostname_resolved(const boost::system::error_code &err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) {
CALLBACK_ON_ERROR(connect_status.cb, err);
try {
connect_status.is_tcp_connected = false;
connect_status.is_tcp_connect_timed_out = false;
connect_status.timer = std::make_shared<boost::asio::steady_timer>(
adaptor->context, std::chrono::milliseconds(network_timeout.tv_sec * 1000));
boost::asio::async_connect(adaptor->canary_socket, endpoint_iterator,
boost::bind(&ldap_search::on_tcp_port_connectable, this, boost::asio::placeholders::error));
connect_status.timer->async_wait(
boost::bind(&ldap_search::on_tcp_connect_timeout, this, boost::asio::placeholders::error));
}
CALLBACK_WITH_CAUGHT_EXCEPTION(connect_status.cb);
}
// connect aynchronously to ldaps tcp port with the canary socket to mitigate blocking SSL_connect() in ldap_sasl_bind()
void on_tcp_port_connectable(const boost::system::error_code &err) {
connect_status.is_tcp_connected = true; // even on an error status for on_tcp_connect_timeout to judge cancellation
if (!connect_status.is_tcp_connect_timed_out) {
connect_status.timer->cancel();
}
CALLBACK_ON_ERROR(connect_status.cb, err);
try {
adaptor->canary_socket.close();
on_ready_to_connect(err);
}
CALLBACK_WITH_CAUGHT_EXCEPTION(connect_status.cb);
}
void on_tcp_connect_timeout(const boost::system::error_code &err) {
if (!err && !connect_status.is_tcp_connected) {
connect_status.is_tcp_connect_timed_out = true;
adaptor->canary_socket.cancel();
}
}
void on_ready_to_connect(const boost::system::error_code &err) {
CALLBACK_ON_ERROR(connect_status.cb, err);
try {
do_connect_common();
ldap_exception::throw_on_error(ldap_set_option(ldap, LDAP_OPT_CONNECT_ASYNC, LDAP_OPT_ON));
int socket_fd = async_adaptor::SOCKET_NOT_ASSIGNED;
BerValue password_ber({bind_password.size(), &bind_password[0]});
// ldap_sasl_bind() blocks on SSL_connect(), whose side effects are mitigated by the tcp port connectivity check with the canary socket beforehand
ldap_exception::throw_on_error(ldap_sasl_bind(ldap, bind_user.c_str(), LDAP_SASL_SIMPLE, &password_ber, nullptr, nullptr, &connect_status.bind_msg_id));
ldap_exception::throw_on_error(ldap_get_option(ldap, LDAP_OPT_DESC, &socket_fd));
connect_status.timer = std::make_shared<boost::asio::steady_timer>(
adaptor->context, std::chrono::milliseconds(network_timeout.tv_sec * 1000));
adaptor->assign(socket_fd);
connect_status.is_tls_connected = false;
connect_status.is_tls_connect_timed_out = false;
adaptor->async_socket.async_wait(boost::asio::ip::tcp::socket::wait_write,
boost::bind(&ldap_search::on_tls_connected, this, boost::asio::placeholders::error));
connect_status.timer->async_wait(
boost::bind(&ldap_search::on_tls_connect_timeout, this, boost::asio::placeholders::error));
}
CALLBACK_WITH_CAUGHT_EXCEPTION(connect_status.cb);
}
void on_tls_connected(const boost::system::error_code &err) {
connect_status.is_tls_connected = true; // even on an error status for on_tls_connect_timeout to judge cancellation
if (!connect_status.is_tls_connect_timed_out) {
connect_status.timer->cancel();
}
CALLBACK_ON_ERROR(connect_status.cb, err);
try {
int _err;
LDAPMessage *msg = nullptr;
struct timeval zero_tv = {0,0};
int msg_type = ldap_result(ldap, connect_status.bind_msg_id, LDAP_MSG_ONE, &zero_tv, &msg);
switch (msg_type) {
case _LDAP_RES_ERROR:
ldap_get_option(this->ldap, LDAP_OPT_RESULT_CODE, &_err);
ldap_exception::throw_on_error(_err);
break;
case _LDAP_RES_TIMEOUT:
adaptor->async_socket.async_wait(boost::asio::ip::tcp::socket::wait_read,
boost::bind(&ldap_search::on_bind, this, boost::asio::placeholders::error));
break;
case LDAP_RES_BIND:
ldap_parse_result(ldap, msg, &_err, nullptr, nullptr, nullptr, nullptr, FALSE);
ldap_msgfree(msg);
ldap_exception::throw_on_error(_err);
demultiplexer.connected = true;
demultiplexer.connecting = false;
if (demultiplexer.failover_in_progress) {
demultiplexer.failover_in_progress = false;
}
current_host->connected_counter++;
connect_status.cb(err);
break;
default:
if (msg) {
ldap_msgfree(msg);
}
ldap_exception::throw_on_error(LDAP_PROTOCOL_ERROR); // treat as a protocol error
break;
}
}
CALLBACK_WITH_CAUGHT_EXCEPTION(connect_status.cb);
}
void on_tls_connect_timeout(const boost::system::error_code &err) {
if (!err && !connect_status.is_tls_connected) {
connect_status.is_tls_connect_timed_out = true;
adaptor->async_socket.cancel();
}
}
void on_bind(const boost::system::error_code &err) {
CALLBACK_ON_ERROR(connect_status.cb, err);
try {
int _err;
LDAPMessage *msg = nullptr;
struct timeval zero_tv = {0,0};
int msg_type = ldap_result(ldap, connect_status.bind_msg_id, LDAP_MSG_ONE, &zero_tv, &msg);
switch (msg_type) {
case _LDAP_RES_ERROR:
ldap_get_option(ldap, LDAP_OPT_RESULT_CODE, &_err);
ldap_exception::throw_on_error(_err);
break;
case _LDAP_RES_TIMEOUT: // unexpected
ldap_abandon_ext(ldap, connect_status.bind_msg_id, nullptr, nullptr);
ldap_exception::throw_on_error(LDAP_TIMEOUT);
break;
case LDAP_RES_BIND:
ldap_parse_result(ldap, msg, &_err, nullptr, nullptr, nullptr, nullptr, FALSE);
ldap_msgfree(msg);
ldap_exception::throw_on_error(_err);
demultiplexer.connected = true;
demultiplexer.connecting = false;
if (demultiplexer.failover_in_progress) {
demultiplexer.failover_in_progress = false;
}
current_host->connected_counter++;
connect_status.cb(err); // callback on success
break;
default:
if (msg) {
ldap_msgfree(msg);
}
ldap_exception::throw_on_error(LDAP_PROTOCOL_ERROR); // treat as a protocol error
break;
}
}
CALLBACK_WITH_CAUGHT_EXCEPTION(connect_status.cb);
}
void async_failover(connect_cb cb, const boost::system::error_code &err) {
connect_status.cb = cb;
failover_status.round = 0;
failover_status.cb = connect_status.cb; // backup the original cb
failover_status.giving_up = false;
failover_status.previous_host = current_host;
connect_status.cb = [this](const boost::system::error_code &err){
if (!err) {
connect_status.cb = failover_status.cb;
connect_status.cb(err);
}
else {
if (failover_status.giving_up) {
current_host->last_critical_failure = std::chrono::steady_clock::now();
current_host->critical_failure_counter++;
connect_status.cb = failover_status.cb;
connect_status.cb(err);
}
else {
do_async_failover(err);
}
}
};
do_async_failover(err);
}
void do_async_failover(const boost::system::error_code &err) {
cancel_async_waiters();
disconnect();
current_host->last_critical_failure = std::chrono::steady_clock::now();
current_host->critical_failure_counter++;
current_host++;
if (current_host == ldap_hosts.end()) {
current_host = ldap_hosts.begin();
}
if (current_host == failover_status.previous_host) {
failover_status.round++;
}
switch (failover_status.round) {
case 0:
// no wait on round 0
break;
case 1:
if (current_host == failover_status.previous_host) {
// no wait on previous_host on round 1
break;
}
// fall-through
case 2:
case 3:
// insert a wait on round 1, 2, 3 for asynchronous search
failover_status.timer = std::make_shared<boost::asio::steady_timer>(
adaptor->context,
std::chrono::seconds(network_timeout.tv_sec) -
(std::chrono::steady_clock::now() - current_host->last_critical_failure));
failover_status.timer->async_wait(
boost::bind(&ldap_search::async_reconnect, this, boost::asio::placeholders::error));
return;
default:
// give up connection on rount 4 for asynchronous search
failover_status.giving_up = true;
break;
}
async_reconnect(err);
}
void async_reconnect(const boost::system::error_code &err) {
push_back_incomplete_requests();
disconnect();
do_async_connect();
}
struct query_status {
std::shared_ptr<query_spec> spec;
int msg_id;
search_cb cb;
boost::system::error_code err;
};
struct query_demultiplexer {
query_demultiplexer(int max_concurrent_requests = 100)
: active_requests_on_last_busy(0), max_concurrent_requests(max_concurrent_requests),
busy_until(std::chrono::steady_clock::time_point::min()),
waiting_for_reading(false), waiting_for_timeout(false), read_timer_cancelled(false), connected(false), connecting(false), failover_in_progress(false),
read_wait_issued_count(0), read_wait_fulfilled_count(0), read_wait_cancelled_count(0), last_read_wait_issued(std::chrono::steady_clock::time_point::min()),
read_timeout_issued_count(0), read_timeout_fulfilled_count(0), read_timeout_cancelled_count(0), read_timeout_cancelling_count(0) {}
int active_requests_on_last_busy;
int max_concurrent_requests;
std::chrono::steady_clock::time_point busy_until;
std::list<std::shared_ptr<query_status>> request_wait_queue;
std::map<int, std::shared_ptr<query_status>> response_wait_map;
bool waiting_for_reading;
bool waiting_for_timeout;
int read_wait_issued_count;
int read_wait_fulfilled_count;
int read_wait_cancelled_count;
int read_timeout_issued_count;
int read_timeout_fulfilled_count;
int read_timeout_cancelled_count;
int read_timeout_cancelling_count;
std::chrono::steady_clock::time_point last_read_wait_issued;
std::shared_ptr<boost::asio::steady_timer> read_timer;
std::chrono::milliseconds read_timeout;
bool read_timer_cancelled;
connect_cb failover_connect_cb;
bool connected;
bool connecting;
bool failover_in_progress;
};
void push_back_incomplete_requests() {
auto wait_map_it = demultiplexer.response_wait_map.rbegin();
auto wait_map_end_it = demultiplexer.response_wait_map.rend();
while (wait_map_it != wait_map_end_it) {
auto status = wait_map_it->second;
// reset status
status->msg_id = -1;
status->spec->clear_result();
status->err = make_error_code(LdapErrc::Success, LDAP_SUCCESS);
// push at the front of request_wait_queue
demultiplexer.request_wait_queue.push_front(status);
wait_map_it++;
}
demultiplexer.response_wait_map.clear();
}
bool callback_or_async_failover(std::shared_ptr<query_status> status, bool force_failover = false) {
bool called_back = true;
if (!status->err) {
current_host->last_successful_access = std::chrono::steady_clock::now();
status->cb(status->err, status->spec->result); // success
}
else {
if (status->err.category() == LdapErrc_category()) {
action_on_ldap_exception action = get_action_for_ldap_error(status->err.value());
if (force_failover) {
action = ACTION_FAILOVER_TO_NEXT_HOST;
}
switch (action) {
case ACTION_FAILOVER_TO_NEXT_HOST:
demultiplexer.failover_connect_cb = [this](const boost::system::error_code &err){
if (!err) {
demultiplexer.failover_in_progress = false;
handle_async_request(err); // restart after failover
}
else {
cancel_all_waiting_requests(err); // clean up request_wait_queue
disconnect();
demultiplexer.failover_in_progress = false;
demultiplexer.connecting = false;
}
};
demultiplexer.failover_in_progress = true;
called_back = false;
async_failover(demultiplexer.failover_connect_cb, status->err);
break;
case ACTION_RETURN_NULL_RESULT:
current_host->last_non_critical_error = std::chrono::steady_clock::now();
current_host->non_critical_error_counter++;
status->cb(status->err, nullptr);
break;
case ACTION_RETHROW_EXCEPTION: // error is handed as a callback error_code for an async request
default:
current_host->last_critical_failure = std::chrono::steady_clock::now();
current_host->critical_failure_counter++;
status->cb(status->err, nullptr);
break;
}
}
else {
status->cb(status->err, status->spec->result);
}
}
return called_back;
}
void cancel_all_waiting_requests(const boost::system::error_code &err) {
auto it = demultiplexer.request_wait_queue.begin();
auto end_it = demultiplexer.request_wait_queue.end();
while (it != end_it) {
(*it)->cb(err, nullptr);
demultiplexer.request_wait_queue.pop_front();
it = demultiplexer.request_wait_queue.begin();
}
}
void handle_async_request(const boost::system::error_code &err) {
auto wait_queue_end_it = demultiplexer.request_wait_queue.end();
auto wait_queue_it = demultiplexer.request_wait_queue.begin();
int _err = LDAP_SUCCESS;
bool failover_in_progress = demultiplexer.failover_in_progress;
if (!demultiplexer.connected) {
return;
}
if (failover_in_progress) {
return;
}
while (wait_queue_it != wait_queue_end_it) {
auto now = std::chrono::steady_clock::now();
if (demultiplexer.busy_until > now) {
// busy_until has not come yet
if (demultiplexer.response_wait_map.size() > std::min(demultiplexer.max_concurrent_requests, demultiplexer.active_requests_on_last_busy * 8 / 10)) {
// active requests are more than 80% of the active requests on the last busy or the fixed limit
break;
}
}
else {
if (demultiplexer.active_requests_on_last_busy == 0) {
// has not reached busy state yet
if (demultiplexer.response_wait_map.size() > demultiplexer.max_concurrent_requests) {
// the fixed limit
break;
}
}
else {
auto time_passed_since_busy = now - demultiplexer.busy_until;
auto ratio = 1000 * time_passed_since_busy.count() / std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::seconds(search_timeout.tv_sec)).count();
if (demultiplexer.response_wait_map.size() > std::min(demultiplexer.max_concurrent_requests, (int)(demultiplexer.active_requests_on_last_busy * ratio / 1000))) {
// active requests are more than ratio times of the active requests on the last busy or the fixed limit
// ratio is the time passed since busy_until divided by search_timeout
break;
}
}
}
auto status = *wait_queue_it;
// search_timeout here does not mean a blocking operation but the server-side timeout threshold in query processing
_err = ldap_search_ext(ldap, status->spec->user_dn.c_str(), status->spec->scope, status->spec->search_filter.c_str(), status->spec->attributes(), 0, nullptr, nullptr, &search_timeout, size_limit, &(status->msg_id));
if (_err == LDAP_BUSY) {
demultiplexer.busy_until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::seconds(search_timeout.tv_sec));
demultiplexer.active_requests_on_last_busy = demultiplexer.response_wait_map.size(); // valid for search_timeout
break;
}
if (_err) {
status->err = make_error_code(LdapErrc::Error, _err);
if (callback_or_async_failover(status)) {
_err = LDAP_SUCCESS; // reset error
}
else {
failover_in_progress = true;
break;
}
}
else {
demultiplexer.response_wait_map.emplace(status->msg_id, status);
}
demultiplexer.request_wait_queue.pop_front();
wait_queue_it = demultiplexer.request_wait_queue.begin(); // pick up the next request
}
if (!failover_in_progress) {
setup_async_waiters_if_necessary();
}
}
void on_read_timeout(const boost::system::error_code &err) {
demultiplexer.waiting_for_timeout = false;
if (err || demultiplexer.read_timer_cancelled) {
demultiplexer.read_timeout_cancelled_count++;
demultiplexer.read_timer_cancelled = false;
return;
}
demultiplexer.read_timeout_fulfilled_count++;
if (demultiplexer.waiting_for_reading) {
bool to_cancel_read_wait = false;
auto now = std::chrono::steady_clock::now();
if (demultiplexer.response_wait_map.size() == 0) {
to_cancel_read_wait = true;
}
else if (demultiplexer.last_read_wait_issued != std::chrono::steady_clock::time_point::min()) {
auto time_to_cancel = demultiplexer.last_read_wait_issued + std::chrono::milliseconds(1000); // cancel in every second
if (time_to_cancel < now) {
to_cancel_read_wait = true;
}
}
if (to_cancel_read_wait) {
adaptor->async_socket.cancel();
}
}
on_any_response(err);
}
void on_read_wait(const boost::system::error_code &err) {
demultiplexer.waiting_for_reading = false;
if (err) {
demultiplexer.read_wait_cancelled_count++;
return;
}
else {
demultiplexer.read_wait_fulfilled_count++;
}
if (demultiplexer.waiting_for_timeout) {
if (demultiplexer.response_wait_map.size() == 0) {
demultiplexer.read_timeout_cancelling_count++;
demultiplexer.read_timer_cancelled = true;
demultiplexer.read_timer->cancel();
}
}
on_any_response(err);
}
void on_any_response(const boost::system::error_code &err) {
static auto empty_spec = std::make_shared<query_spec>("", "");
auto status_end_it = demultiplexer.response_wait_map.end();
struct timeval zero_tv = {0,0};
int _err;
LDAPMessage *msg;
std::shared_ptr<query_status> status;
typename std::map<int, std::shared_ptr<query_status>>::iterator status_it;
int msg_type;
int msg_id;
BerElement *ber;
bool no_more_response = false;
bool failover_in_progress = demultiplexer.failover_in_progress;
while (!no_more_response && !failover_in_progress) {
msg = nullptr;
msg_type = ldap_result(ldap, LDAP_RES_ANY, LDAP_MSG_ONE, &zero_tv, &msg);
switch (msg_type) {
case _LDAP_RES_ERROR:
ldap_get_option(ldap, LDAP_OPT_RESULT_CODE, &_err);
if (_err) {
auto action = get_action_for_ldap_error(_err);
bool force_failover = false;
switch (action) {
case ACTION_RETHROW_EXCEPTION:
default:
force_failover = true;
// fall through
case ACTION_FAILOVER_TO_NEXT_HOST:
status = std::make_shared<query_status>(query_status{empty_spec, 0, nullptr, make_error_code(LdapErrc::Error, _err)});
callback_or_async_failover(status, force_failover);
failover_in_progress = true;
status_it = status_end_it;
break;
case ACTION_RETURN_NULL_RESULT:
break;
}
}
break;
case _LDAP_RES_TIMEOUT: // no more message to receive non-blockingly
no_more_response = true;
break;
case LDAP_RES_SEARCH_ENTRY: // only 1 entry for each request
msg_id = ldap_msgid(msg);
status_it = demultiplexer.response_wait_map.find(msg_id);
if (status_it == status_end_it) {
// msg_id not found in response_wait_map
}
else {
status = status_it->second;
ber = nullptr;
for (const char *attr = ldap_first_attribute(ldap, msg, &ber); attr; attr = ldap_next_attribute(ldap, msg, ber)) {
if (status->spec->is_target_attribute(attr)) {
BerValue **values = ldap_get_values_len(ldap, msg, attr);
int len = ldap_count_values_len(values);
for(int i = 0; i < len; i++) {
status->spec->store(ldap, msg, attr, values[i]->bv_val, values[i]->bv_len);
}
ldap_value_free_len(values);
}
ber_memfree((void *)attr);
}
if (ber) {
ber_free(ber, FALSE);
}
}
ldap_msgfree(msg);
break;
// TODO: LDAP_RES_SEARCH_REFERENCE
case LDAP_RES_SEARCH_RESULT:
msg_id = ldap_msgid(msg);
status_it = demultiplexer.response_wait_map.find(msg_id);
if (status_it == status_end_it) {
// msg_id not found in response_wait_map
ldap_msgfree(msg);
}
else {
status = status_it->second;
ldap_parse_result(ldap, msg, &_err, nullptr, nullptr, nullptr, nullptr, FALSE);
ldap_msgfree(msg);
status->err = make_error_code((_err ? LdapErrc::Error : LdapErrc::Success), _err);
if (callback_or_async_failover(status)) {
demultiplexer.response_wait_map.erase(status_it);
}
else {
failover_in_progress = true;
}
status_it = status_end_it;
}
break;
default:
if (msg) {
ldap_msgfree(msg);
}
break;
}
}
if (!failover_in_progress && demultiplexer.response_wait_map.size() > 0) {
auto last_response = current_host->last_successful_access;
if (last_response < current_host->last_non_critical_error) {
last_response = current_host->last_non_critical_error;
}
if (last_response < current_host->last_critical_failure) {
last_response = current_host->last_critical_failure;
}
auto now = std::chrono::steady_clock::now();
if (last_response == std::chrono::steady_clock::time_point::min()) {
// timer timed out (100ms) at the first request; the server may be loaded
// treat as a non-critical error to detect timeout after search_timeout duration
// Note: actual timeout occurs 100ms later than search_timeout duration
current_host->last_non_critical_error = now;
last_response = current_host->last_non_critical_error;
}
auto response_timeout_at = last_response + std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::seconds(search_timeout.tv_sec));
if (response_timeout_at < now) {
// no response for search_timeout duration
status = std::make_shared<query_status>(query_status{empty_spec, 0, nullptr, make_error_code(LdapErrc::Error, LDAP_TIMEOUT)});
callback_or_async_failover(status);
}
}
if (!failover_in_progress) {
handle_async_request(err);
}
}
void cancel_async_waiters() {
if (demultiplexer.waiting_for_reading) {
demultiplexer.waiting_for_reading = false;
adaptor->async_socket.cancel();
}
if (demultiplexer.waiting_for_timeout) {
demultiplexer.waiting_for_timeout = false;
demultiplexer.read_timer_cancelled = true;
demultiplexer.read_timer->cancel();
}
}
void setup_async_waiters_if_necessary() {
if (!demultiplexer.waiting_for_reading && demultiplexer.response_wait_map.size() > 0) {
demultiplexer.waiting_for_reading = true;
demultiplexer.read_wait_issued_count++;
demultiplexer.last_read_wait_issued = std::chrono::steady_clock::now();
adaptor->async_socket.async_wait(boost::asio::ip::tcp::socket::wait_read,
boost::bind(&ldap_search::on_read_wait, this, boost::asio::placeholders::error));
}
if (!demultiplexer.waiting_for_timeout && demultiplexer.response_wait_map.size() > 0) {
demultiplexer.waiting_for_timeout = true;
demultiplexer.read_timeout_issued_count++;
demultiplexer.read_timeout = std::chrono::milliseconds(100);
demultiplexer.read_timer = std::make_shared<boost::asio::steady_timer>(adaptor->context);
demultiplexer.read_timer->expires_after(demultiplexer.read_timeout);
demultiplexer.read_timer->async_wait(
boost::bind(&ldap_search::on_read_timeout, this, boost::asio::placeholders::error)
);
}
}
#undef CALLBACK_ON_ERROR
#undef CALLBACK_WITH_CAUGHT_EXCEPTION
public:
void dump(std::ostream &out, std::string prefix = "", bool concise = true) {
out << prefix << "active_requests = " << demultiplexer.response_wait_map.size() << std::endl;
out << prefix << "queued requests = " << demultiplexer.request_wait_queue.size() << std::endl;
if (!concise) {
out << prefix << "waiting_for_reading = " << demultiplexer.waiting_for_reading << std::endl;
out << prefix << "waiting_for_timeout = " << demultiplexer.waiting_for_timeout << std::endl;
out << prefix << "read_wait_issued_count = " << demultiplexer.read_wait_issued_count << std::endl;
out << prefix << "read_wait_fulfilled_count = " << demultiplexer.read_wait_fulfilled_count << std::endl;
out << prefix << "read_wait_cancelled_count = " << demultiplexer.read_wait_cancelled_count << std::endl;
out << prefix << "read_wait_issued_count - (read_wait_fulfilled_count + read_wait_cancelled_count) = "
<< (demultiplexer.read_wait_issued_count - (demultiplexer.read_wait_fulfilled_count + demultiplexer.read_wait_cancelled_count))
<< std::endl;
out << prefix << "read_timeout_issued_count = " << demultiplexer.read_timeout_issued_count << std::endl;
out << prefix << "read_timeout_fulfilled_count = " << demultiplexer.read_timeout_fulfilled_count << std::endl;
out << prefix << "read_timeout_cancelled_count = " << demultiplexer.read_timeout_cancelled_count << std::endl;
out << prefix << "read_timeout_cancelling_count = " << demultiplexer.read_timeout_cancelling_count << std::endl;
out << prefix << "read_timeout_issued_count - (read_timeout_fulfilled_count + read_timeout_cancelled_count) = "
<< (demultiplexer.read_timeout_issued_count - (demultiplexer.read_timeout_fulfilled_count + demultiplexer.read_timeout_cancelled_count))
<< std::endl;
auto host_it = ldap_hosts.begin();
while (host_it != ldap_hosts.end()) {
out << prefix << "ldap_host: " << host_it->host_name << (host_it == current_host ? " (current_host) " : " ")
<< "connected_counter = " << host_it->connected_counter
<< " non_critical_error_counter = " << host_it->non_critical_error_counter
<< " critical_failure_counter = " << host_it->critical_failure_counter << std::endl;
host_it++;
}
}
}
#endif
protected:
LDAP *ldap;
int ldap_version;
struct timeval network_timeout;
struct timeval search_timeout;
int size_limit;
std::string ca_file;
std::list<ldap_host> ldap_hosts;
using host_iterator = typename std::list<ldap_host>::iterator;
host_iterator current_host;
int ldap_port;
std::string bind_user;
std::string bind_password;
#if ASYNC_LDAP_ACCESS
std::shared_ptr<async_adaptor> adaptor;
struct _connect_status {
bool is_tcp_connected;
bool is_tcp_connect_timed_out;
bool is_tls_connected;
bool is_tls_connect_timed_out;
int bind_msg_id;
connect_cb cb;
std::shared_ptr<boost::asio::steady_timer> timer;
} connect_status;
struct _failover_status {
int round;
bool giving_up;
connect_cb cb;
host_iterator previous_host;
std::shared_ptr<boost::asio::steady_timer> timer;
} failover_status;
query_demultiplexer demultiplexer;
#endif
};
class ldap_member_of_query_spec: public ldap_query_spec {
public:
typedef std::shared_ptr<std::list<std::string>> result_type; // this is a simple result type for a single attribute with multiple values; no key (attribute name) is required
//enum class query_type { member_of };
static std::string &member_of() { static std::string member_of = "memberOf"; return member_of; }
static char **_attributes() { static char *attributes[] = { &member_of()[0], nullptr }; return attributes; }
ldap_member_of_query_spec(std::string user_dn, std::string search_filter /*, query_type type = query_type::member_of */)
: ldap_query_spec(user_dn, search_filter, LDAP_SCOPE_BASE), result(std::make_shared<std::list<std::string>>()) /*, type(type) */ {
}
char **attributes() { /* switch (type) {...} */ return ldap_member_of_query_spec::_attributes(); }
void store(LDAP *ldap, LDAPMessage *msg, const char *attr, char *value, std::size_t len) {
//char *dn = ldap_get_dn(ldap, msg); ldap_memfree(dn); // useful for LDAP_SCOPE_ONELEVEL and LDAP_SCOPE_SUBTREE
/* switch (type) {...} */
result->push_back(std::move(std::string(value, len)));
}
void clear_result() {
result->clear();
}
bool is_target_attribute(const char *attr) { /* switch (type) {...} */ return member_of() == attr; }
result_type result;
//query_type type; // optionally define a query_type member to distinguish multiple types of queries in a single query_spec class
};
class ldap_member_of: public ldap_search<ldap_member_of_query_spec> {
public:
using ldap_search<ldap_member_of_query_spec>::ldap_search;
ldap_member_of_query_spec::result_type sync_member_of(std::string user_dn, std::string search_filter = "(objectClass=user)") {
ldap_member_of_query_spec spec{user_dn, search_filter /*, ldap_member_of_query_spec::query_type::member_of */};
sync_search(spec);
return spec.result;
}
#if ASYNC_LDAP_ACCESS
void async_member_of(std::string user_dn, search_cb cb, std::string search_filter = "(objectClass=user)") {
auto spec = std::make_shared<ldap_member_of_query_spec>(user_dn, search_filter /*, ldap_member_of_query_spec::query_type::member_of */);
async_search(spec, cb);
}
#endif
// optionally define methods with query_types other than ldap_member_of_query_spec::query_type::member_of
};
#if ASYNC_LDAP_ACCESS
#define SYNC_REPEAT_ACCESS 0
#endif
int main(int argc, char * argv[])
{
std::cout << argv[0] << " has to be linked against libldap_r library to work in threaded environments" << std::endl;
if (argc < 6) {
#if ASYNC_LDAP_ACCESS
std::cout << argv[0] << " [ldap_hosts_csv] [ca_file] [bind_user] [bind_password] [user_dn] [repetition] [max_concurrent_req] [num_workers]" << std::endl;
#else
std::cout << argv[0] << " [ldap_hosts_csv] [ca_file] [bind_user] [bind_password] [user_dn]" << std::endl;
#endif
return 0;
}
signal(SIGPIPE, SIG_IGN); // ad-hoc fix
std::string ldap_hosts = argv[1];
std::string ca_file = argv[2];
std::string bind_user = argv[3];
std::string bind_password = argv[4];
std::string user_dn = argv[5];
int ldap_port = 636;
int network_timeout = 5; // sec
int search_timeout = 5; // sec
int size_limit = 0;
#if ASYNC_LDAP_ACCESS
int repetition = std::stoi(argc > 6 ? argv[6] : "10000");
int max_concurrent_request = std::stoi(argc > 7 ? argv[7] : "100");
int num_workers = std::stoi(argc > 8 ? argv[8] : "1");
#endif
try {
auto query = std::make_shared<ldap_member_of>(ldap_hosts, ca_file, bind_user, bind_password, ldap_port, network_timeout, search_timeout, size_limit);
query->sync_connect();
std::shared_ptr<std::list<std::string>> result;
try {
result = query->sync_member_of(user_dn);
if (result) {
for (auto group_dn: *result) {
std::cout << "group_dn: " << group_dn << std::endl;
}
}
else {
std::cout << "null result" << std::endl;
}
#if ASYNC_LDAP_ACCESS
#if SYNC_REPEAT_ACCESS
boost::posix_time::ptime mst1;
boost::posix_time::ptime mst2;
mst1 = boost::posix_time::microsec_clock::local_time();
for (int i = 0; i < repetition; i++) {
result = query->sync_member_of(user_dn);
}
mst2 = boost::posix_time::microsec_clock::local_time();
boost::posix_time::time_duration msdiff = mst2 - mst1;
std::cout << "query->sync_member_of() sequentially called " << repetition << " times in " << msdiff.total_milliseconds() << "ms" << std::endl;
#endif
#endif
}
catch (ldap_exception &e) {
switch (e.err) {
case LDAP_TIMEOUT:
default:
throw;
}
}
}
catch (ldap_exception &e) {
std::cerr << "ldap_exception thrown: " << e.err << " " << e.what() << std::endl;
}
catch (std::exception &e) {
std::cerr << "exception thrown: " << e.what() << std::endl;
}
#if ASYNC_LDAP_ACCESS
try {
auto worker = [ldap_hosts, ca_file, bind_user, bind_password, ldap_port, user_dn, network_timeout, search_timeout, size_limit, max_concurrent_request, repetition](int i, std::promise<std::string> p){
boost::asio::io_context io_context;
std::string dump_prefix = "[" + std::to_string(i) + "] ";
auto query = std::make_shared<ldap_member_of>(io_context, ldap_hosts, ca_file, bind_user, bind_password, ldap_port, network_timeout, search_timeout, size_limit, max_concurrent_request);
auto requested = std::make_shared<int>(0);
auto responded = std::make_shared<int>(0);
auto errored = std::make_shared<int>(0);
boost::posix_time::ptime mst1;
boost::posix_time::ptime mst2;
mst1 = boost::posix_time::microsec_clock::local_time();
auto on_response_cb = [i, responded, errored, repetition, mst1, &query, &io_context, dump_prefix](const boost::system::error_code &err, ldap_member_of_query_spec::result_type result){
(*responded)++;
if (!err) {
if (result) {
if (*responded % 10000 == 0 || *responded == repetition) {
std::cout << "[" << i << "] responded = " << *responded << " (including errored = " << *errored << ")" << std::endl;
query->dump(std::cout, dump_prefix);
boost::posix_time::ptime mst4;
mst4 = boost::posix_time::microsec_clock::local_time();
boost::posix_time::time_duration msdiff = mst4 - mst1;
std::cout << "[" << i << "] query->async_member_of() callback called in " << msdiff.total_milliseconds() << "ms" << std::endl;
}
}
else {
std::cout << "null result" << std::endl; // unexpected in normal cases
}
}
else {
(*errored)++;
if (err.category() == LdapErrc_category() && err.value() == LDAP_NO_SUCH_OBJECT) {
std::cout << "no such object" << std::endl;
}
else {
std::cout << "on_response_cb " << *responded << " err = " << (err.category() == LdapErrc_category() ? ldap_err2string(err.value()) : err.message()) << std::endl;
}
}
};
boost::posix_time::ptime mst5 = boost::posix_time::microsec_clock::local_time();
struct timer_container {
timer_container(boost::asio::io_context &io_context): timer(io_context, std::chrono::milliseconds(10)) {}
boost::asio::steady_timer timer;
std::function<void (const boost::system::error_code &err)> request_timer_cb;
};
auto container = std::make_shared<timer_container>(io_context);
container->request_timer_cb = [i, requested, responded, repetition, max_concurrent_request, mst5, &query, &io_context, container, user_dn, on_response_cb](const boost::system::error_code &err) {
while (*requested < repetition && *requested - *responded <= max_concurrent_request * 5) { // until 5 * max_concurrent_request are enqueued
query->async_member_of(user_dn, on_response_cb); // request without explicit async_connect() call
(*requested)++;
}
if (*requested < repetition) {
container->timer.async_wait(container->request_timer_cb);
}
else {
boost::posix_time::ptime mst6 = boost::posix_time::microsec_clock::local_time();
boost::posix_time::time_duration msdiff2 = mst6 - mst5;
std::cout << "[" << i << "] request completed in " << msdiff2.total_milliseconds() << "ms" << std::endl;
container->request_timer_cb = nullptr;
}
};
container->timer.async_wait(container->request_timer_cb);
/*
while (*requested < repetition) {
query->async_member_of(user_dn, on_response_cb); // request without explicit async_connect() call
(*requested)++;
}
*/
query->async_connect([&mst1, &mst2, &query, user_dn, &io_context, repetition](const boost::system::error_code &err){
mst2 = boost::posix_time::microsec_clock::local_time();
boost::posix_time::time_duration msdiff = mst2 - mst1;
std::cout << "query->async_connect() callback called in " << msdiff.total_milliseconds() << "ms" << std::endl;
if (err) {
std::cout << "connect error_code " << err.message() << std::endl;
}
else {
std::cout << "connect successful" << std::endl;
query->async_member_of(user_dn, [](const boost::system::error_code &err, ldap_member_of_query_spec::result_type result){
if (err) {
std::cout << "async_member_of error_code " << err.message() << std::endl;
}
else {
if (result) {
for (auto group_dn: *result) {
std::cout << "group_dn: " << group_dn << std::endl;
}
}
else {
std::cout << "null result" << std::endl;
}
}
});
}
});
io_context.run();
std::cout << "[" << i << "] io_context.run() exited" << std::endl;
std::ostringstream oss;
query->disconnect();
query->dump(oss, dump_prefix, false);
p.set_value(oss.str());
};
std::vector<std::thread> workers;
std::vector<std::future<std::string>> futures;
boost::posix_time::ptime mst1;
boost::posix_time::ptime mst2;
mst1 = boost::posix_time::microsec_clock::local_time();
if (num_workers > 1) {
for (int i = 0; i < num_workers; i++) {
std::promise<std::string> p;
auto f = p.get_future();
futures.push_back(std::move(f));
workers.push_back(std::thread(worker, i, std::move(p)));
}
for (int i = 0; i < num_workers; i++) {
workers[i].join();
std::cout << "[" << i << "] worker joined" << std::endl;
}
for (int i = 0; i < num_workers; i++) {
std::cout << futures[i].get();
}
}
else {
std::promise<std::string> p;
std::future<std::string> f = p.get_future();
worker(0, std::move(p));
std::cout << f.get();
}
mst2 = boost::posix_time::microsec_clock::local_time();
boost::posix_time::time_duration msdiff = mst2 - mst1;
std::cout << repetition << " requests x " << num_workers << " worker thread(s) = "
<< (repetition * num_workers) << " total requests completed in " << msdiff.total_milliseconds() << "ms" << std::endl;
}
catch (ldap_exception &e) {
std::cerr << "ldap_exception thrown: " << e.err << " " << e.what() << std::endl;
}
catch (std::exception &e) {
std::cerr << "exception thrown: " << e.what() << std::endl;
}
#endif // ASYNC_LDAP_ACCESS
return 0;
}
@t2ym
Copy link
Author

t2ym commented Jun 9, 2021

Patch to cross-compile libldap from openldap-2.4.59 with Android NDK r21e

  • liblutil must come AFTER libldap in linking
diff -r -p openldap-2.4.59.orig/build/top.mk openldap-2.4.59/build/top.mk
*** openldap-2.4.59.orig/build/top.mk	2021-06-04 03:40:31.000000000 +0900
--- openldap-2.4.59/build/top.mk	2021-06-08 19:04:12.621833780 +0900
*************** LDAP_LIBREWRITE_A = $(LDAP_LIBDIR)/libre
*** 170,177 ****
  LDAP_LIBLUNICODE_A = $(LDAP_LIBDIR)/liblunicode/liblunicode.a
  LDAP_LIBLUTIL_A = $(LDAP_LIBDIR)/liblutil/liblutil.a
  
! LDAP_L = $(LDAP_LIBLUTIL_A) \
! 	$(LDAP_LIBLDAP_LA) $(LDAP_LIBLBER_LA)
  SLAPD_L = $(LDAP_LIBLUNICODE_A) $(LDAP_LIBREWRITE_A) \
  	$(LDAP_LIBLUTIL_A) $(LDAP_LIBLDAP_R_LA) $(LDAP_LIBLBER_LA)
  
--- 170,177 ----
  LDAP_LIBLUNICODE_A = $(LDAP_LIBDIR)/liblunicode/liblunicode.a
  LDAP_LIBLUTIL_A = $(LDAP_LIBDIR)/liblutil/liblutil.a
  
! LDAP_L = \
! 	$(LDAP_LIBLDAP_LA) $(LDAP_LIBLUTIL_A) $(LDAP_LIBLBER_LA)
  SLAPD_L = $(LDAP_LIBLUNICODE_A) $(LDAP_LIBREWRITE_A) \
  	$(LDAP_LIBLUTIL_A) $(LDAP_LIBLDAP_R_LA) $(LDAP_LIBLBER_LA)
  
diff -r -p openldap-2.4.59.orig/libraries/librewrite/Makefile.in openldap-2.4.59/libraries/librewrite/Makefile.in
*** openldap-2.4.59.orig/libraries/librewrite/Makefile.in	2021-06-04 03:40:31.000000000 +0900
--- openldap-2.4.59/libraries/librewrite/Makefile.in	2021-06-08 18:36:32.695442446 +0900
*************** LDAP_LIBDIR= ../../libraries
*** 28,35 ****
  
  LIBRARY = librewrite.a
  PROGRAMS	= rewrite
! XLIBS = $(LIBRARY) $(LDAP_LIBLUTIL_A) \
! 	$(LDAP_LIBLDAP_R_LA) $(LDAP_LIBLBER_LA)
  XXLIBS  = $(SECURITY_LIBS) $(LUTIL_LIBS)
  XXXLIBS = $(LTHREAD_LIBS)
  
--- 28,35 ----
  
  LIBRARY = librewrite.a
  PROGRAMS	= rewrite
! XLIBS = $(LIBRARY) \
! 	$(LDAP_LIBLDAP_R_LA) $(LDAP_LIBLUTIL_A) $(LDAP_LIBLBER_LA)
  XXLIBS  = $(SECURITY_LIBS) $(LUTIL_LIBS)
  XXXLIBS = $(LTHREAD_LIBS)

Configuration to build libldap_r

  • TARGET, PREFIX, CC, etc. have to be configured for Android NDK
  • Dependencies have to be installed into $PREFIX/include, $PREFIX/lib, and $PREFIX/bin
  • make install
    • strip command has to be linked to an appropriate version of llvm-strip in NDK and included in $PATH
./configure --prefix=$PREFIX \
    --host=$TARGET \
    --disable-shared \
    --disable-slapd \
    --with-tls=openssl \
    --with-threads=yes \
    --with-cyrus-sasl=no \
    --with-yielding_select=yes \
    CFLAGS="-g -O2 -I$PREFIX/include " LDFLAGS="-L$PREFIX/lib "

Issues

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment