Wednesday, May 27, 2009

Useful ASIO patterns

Or maybe I should title this "On how I found a use for boost::protect".

The asio library has been immensely helpful in promoting a series of patterns previously rarely seen in the lands of C++ (i.e the proactor pattern). It is therefore not surprising that people are starting to use it beyond the scope of networking.

Take for instance this recipe you can find on the official (non-boost) website:
asio::io_service io_service;
asio::io_service::work work(io_service);
boost::thread_group threads;

for (std::size_t i = 0; i < my_thread_count; ++i)
threads.create_thread(boost::bind(&asio::io_service::run, &io_service));

io_service.post(boost::bind(an_expensive_calculation, 42));
io_service.post(boost::bind(a_long_running_task, 123));

io_service.stop();
threads.join_all();
In just a bunch of lines of code we have a thread pool executing any arbitrary task whenever one thread is available. No need to address the usual multi-threading issues with concurrent jobs sharing few resources: just post what you need and let io_service take care of it!

I used this approach recently when I needed to implement a way to asynchronously update a backup server with incoming operations.
First a bit of background: at last.fm we often use thrift to power our services. Thrift is a powerful RPC framework that works across multiple languages. When you specify the methods the server is going to answer, thrift generates both the code for your server and the client in whatever language you want. For instance this definition
void ping()
void foo(1: i32 value)
will create the client class that can be used (this is simplified) this way:
apache::MyClient client;
// connection stuff
client.ping();
client.foo(10);
What I wanted to do is to be able to queue and execute an arbitrary number of ping and foo on a pool of MyClient objects and on a set of n threads running them.

The pool of objects can be handled by a simple stack that implements the RAII pattern to safely pull objects when needed and push them back in when their job is done. I might write something about it in the future, but for the sake of simplicity I will skip that in my example. What's mostly interesting is the boost+asio part!

So here's how. First I define my BackupHandler class:

class BackupHandler
{
public:

BackupHandler(int poolSize = 4)
{
m_pWork.reset( new boost::asio::io_service::work(m_io_service) );

for ( int i = 0; i < poolSize; ++i)
m_threadGroup.create_thread( boost::bind(&boost::asio::io_service::run, &m_io_service) );
}

~BackupHandler()
{
m_pWork.reset(); // stop all!
m_threadGroup.join_all(); // wait for all completition
}

// this will leave immediately
template <typename TFunc>
void enqueue(TFunc fun)
{
m_io_service.post(
boost::bind( &BackupHandler::execute<TFunc>, this, fun )
);
}

private:

// this will be executed when one of the threads is available
template <typename TFunc>
void execute(TFunc fun)
{
MyClient client; // here I should use RAII to get the client resource
fun(client);
}

private:

io_service m_io_service;
shared_ptr<io_service::work> m_pWork;

thread_group m_threadGroup;
};

Note how client becomes the argument instead of the caller. This is possible because with boost::bind you can put a placeholder (_1) even on the caller when used on pointer to member functions.

I can now enqueue the functions whenever I need, thus purely asynchronously, with bind:

BackupHandler bkp(10);
//.. code
bkp.enqueue(
boost::protect(
boost::bind(&apache::MyClient::ping, _1) // who's gonna call that? _1!
);
//.. other code
bkp.enqueue(
boost::protect(
boost::bind(&apache::MyClient::foo, _1, 42)
);
// ..more stuff enqueued later..
Make sure you use boost::protect otherwise bind will try and evaluate the placeholder before its time is due and the compiler will cry (or crash).

3 comments:

  1. WOWWWW... DAMMMNNNN... this is soooooo smart!

    ReplyDelete
  2. You need to use a strand in these two calls:
    io_service.post(boost::bind(an_expensive_calculation, 42));
    io_service.post(boost::bind(a_long_running_task, 123));

    ReplyDelete
  3. Hi All I have a io_service object created from my Connection class, and from the connection class I'm spawning multiple threads which handed-over a copy of the same io_service object from my Connection class.

    My current problem is actually if I call io_service.stop on one of those spawned threads it subsequently stops my io_service.run() loop in my main controlling thread which is my Connection class instance.

    I'm trying to setup a work object using the following declaration auto_ptr work(new boost::asio::io_service::work(io_service)) so that each of spawned threads has a work object that i can just call work.reset() as per the boost documentation it mentions it will stop processing the io_service.run() event loop.

    But still i can't seem to exit from the io_service run() loop even so after I explicitly call work.reset() to tell io_service to terminate processinng run event loop.

    Please I need your help and guidance I'm just quite new to boost Asio.


    Regards,
    alein

    ReplyDelete