Skip to content

Instantly share code, notes, and snippets.

@vyskocilm
Last active January 26, 2016 14:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save vyskocilm/c9963b3fe95180bfc7f8 to your computer and use it in GitHub Desktop.
Save vyskocilm/c9963b3fe95180bfc7f8 to your computer and use it in GitHub Desktop.
pool of mlm_client_t
#include <memory>
#include <iostream>
#include <cstdio>
#include <cxxtools/pool.h>
#include <malamute.h>
using namespace cxxtools;
class MlmClient
{
public:
MlmClient ()
{
_client = mlm_client_new ();
}
~MlmClient ()
{
mlm_client_destroy (&_client);
}
int connect (const char *endpoint, uint32_t timeout, const char *address)
{
_address = address;
return mlm_client_connect (_client, endpoint, timeout, address);
}
bool connected () const
{
return mlm_client_connected (_client);
}
std::string address() const
{
return _address;
}
protected:
mlm_client_t* _client;
std::string _address;
};
class MlmConnector
{
public:
explicit MlmConnector(const char *endpoint, uint32_t timeout):
_endpoint(endpoint),
_counter(0),
_template("ui."),
_timeout(timeout)
{
}
explicit MlmConnector (const char *endpoint, uint32_t timeout, const char *templ):
_endpoint(endpoint),
_counter(0),
_template(templ),
_timeout(timeout)
{
}
MlmClient* operator() ()
{
MlmClient* c = new MlmClient();
if (!c)
return NULL;
std::string address = _template + std::to_string (++_counter);
std::cerr << "D: address: " << address << std::endl;
int r = c->connect (_endpoint.c_str(), _timeout, address.c_str());
if (r == -1) {
delete c;
return NULL;
}
return c;
}
protected:
std::string _endpoint;
std::string _template;
unsigned _counter;
uint32_t _timeout;
};
typedef Pool<MlmClient, MlmConnector> MlmClientPool;
// test from threads
class MyThread
{
cxxtools::AttachedThread thread;
MlmClientPool& pool;
unsigned threadNum;
unsigned sec;
public:
explicit MyThread (MlmClientPool& pool_, unsigned threadNum_, unsigned sec_ = 1)
: thread( cxxtools::callable(*this, &MyThread::run) ),
pool(pool_),
threadNum(threadNum_),
sec(sec_)
{ }
void create()
{ thread.start(); }
void join()
{ thread.join(); }
void run()
{
printf("start thread %d\n", threadNum);
sleep(sec);
// We fetch a object from the pool, and call a method pool.get() does
// not return a connection, but a proxy object, so we have to take
// care not to assign the object to a Connection, but use that proxy
// directy.
//
// This would be wrong:
// Connection conn = *pool.get(); // convert the proxy-object
// conn.doSomething(threadNum); // the connection is back in the pool here :-(
//
// The reason is, that the proxy object is destroyed too early.
// The proxy object puts the connection back to the free-list of the
// pool, before we use the connection.
//
printf("doSomething in thread %d\n", threadNum);
printf ("ptr->address(): %s\n", pool.get()->address().c_str());
printf("doSomething ends in thread %d\n", threadNum);
}
};
int main() {
MlmClientPool pool{3, MlmConnector {"ipc://@/malamute", 1000, "test-client."}};
MyThread th1 {pool, 1, 2};
MyThread th2 {pool, 2, 1};
MyThread th3 {pool, 3, 4};
MyThread th4 {pool, 4, 1};
MyThread th5 {pool, 5, 3};
MyThread th6 {pool, 6, 4};
th1.create ();
th2.create ();
th3.create ();
th4.create ();
th5.create ();
th6.create ();
printf ("threads created\n");
th1.join ();
printf ("th1.join())\n");
th2.join ();
th3.join ();
th4.join ();
th5.join ();
th6.join ();
pool.drop ();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment