Concurrent object wrapper – C++11 – Part I

In this excellent presentation on C++ concurrency, Herb Sutter shows the importance of non-blocking application sofware, and goes on to develop a very useful “concurrent wrapper”. We present a working version of it, implemented using the concurrent queue from an earlier post, and tweaked to compile and run with GCC 4.7.2.

I am not sure what name to give this device (in one of my attempts to come up with a descriptive name at some point I came up with “asynchronous concurrent transactional message serializer”, which only goes to show how difficult the naming task is, at least for me), or even if there is a name that can fully capture its simplicity and power. Sutter referrs to it simply as concurrent<T>, which is catchy but not descriptive enough. Anyway, enough about naming. The bottom line is that here will show a working version of it, and we shall dub it The ConcurrentWrapper.

Let’s first see what this class should look like and what we are to expect from it. The main idea is to have a means to allow many threads to access a shared resource concurrently, in a non-blocking manner. So the callers should be able to access the resource and then carry on with their work while, in the backrgound, the access is performed. For simplicity we restrict the access to mean any action that queries and/or modifies the resource but which do not require a return value on the caller side. Here’s an example of a possible use of this “wrapper”:


// wrap an output file stream
ConcurrentWrapper<std::ofstream> log("log.txt");

// Thread 1 prints a message to the log file
log( [](std::ostream& o){ o << "Hello 1..."; o << " Bye 1"; } );

// Thread 2 prints a message to the log file
log( [](std::ostream& o){ o << "HELLO 2..."; o << " BYE 2"; } );

where we have used C++11 lambda expressions to pass a function equivalent to

void foo(std::ostream& o)
{
  o << "Hello N...";
  o << " Bye N";
}

By passing function objects that act on the wrapped type, we can access its full public interface while keeping that of the wrapper minimal.

The calls to ConcurrentWrapper::operator() should not block, and each call should result in the code passed being executed as an atomic unit. There is to be no interleaving in the execution of the code from the different caller threads. So we can only expect the following outputs:

Hello1... Bye1
HELLO2... BYE2

or

HELLO2... BYE2
Hello1... Bye1

but not

Hello1... BYE2
HELLO2... Bye1

or any mangled variant thereof. Note that our example involved two calls to ostream& operator<<, and that these must executed in the order specified, with no interleaving: the caller can create arbitrarily complex call sequences that are treated as single “transactions”:

void foo(std::ostream& o);
void bar(std::ostream&o);
void baz(std::ostream& o);

log( [](std::ostream& o)
     {
       baz(o);
       foo(o);
       baz(o);
       bar(o);
       o << "MARKER\n";
     }
   );

In order to be non-blocking to the caller, it makes sense that ConcurrentWrapper perform the work in a separate thread. In order to avoid interleaving and data races, the work must be performed sequentially. This could be achieved by putting each work unit in to a queue, and executing these work units sequentially. This is the approach taken here. These are the main elements:

  1. A concurrent queue of std::function<void()> to hold work elements
  2. An std::thread to do the work in
  3. A loop that pops work elements off the queue and executes them in the worker thread
  4. A means to stop the loop when the concurrent wrapper object goes out of scope

This is pretty minimal, so let’s look at the completed product:


#include <functional>     // for std::function
#include <thread>         // for std::thread
#include "Queue/Queue.h"  // for concurrent Queue from earlier blog post

template <typename T>
class ConcurrentWrapper
{
 public:

  ConcurrentWrapper(T resource = T{})
  :
  resource_{resource},
  done_{false}
  {
    // Start worker thread. Loops, popping function object off queue and executing
    // until done. 
    worker_ = std::thread([this]{ while (!done_) queue_.pop()();} );
  }

  ~ConcurrentWrapper()
  {
    queue_.push([this]{done_ = true;}); // push code to set done to true in worker thread
    worker_.join();                     // we're done, join worker thead!
  }

  template <typename F>
  void operator()(F f) const
  {
    queue_.push([=]{f(resource_);});
  }

 private:

  mutable Queue<std::function<void()>> queue_;
  mutable T resource_;
  std::thread worker_;
  bool done_;

};

The only potential for blocking here is if the call to Queue::push() blocks. This can only happen in the fleeting moment when an item is being popped off the queue. Besides that, note that the shared resource is held as a mutable data member. Conceptually, the wrapper is managing a mutable shared resource, and although a call to ConcurrentQueue::operator() does affect the internal state, the reasoning goes that in a concurrent application, const denotes something that can be safely called concurrently. Our ConcurrentWrapper::operator() satisfies that criteria. This is explained in another excellent talk by Herb Sutter. My single threaded brain is still processing the implications of this.

Finally, the working implementation, together with some simple demo code, can be found on github. It has been compiled and tested with GCC 4.7.2. At the time of writing, it was making my particular snapshot of GCC 4.8 crash.

Advertisements

2 comments

  1. When I was writing my own blog about this *concurrent* wrapper I stumbled on your blog. Nicely written Juan!

    Herb definitely has improved his original Active Object pattern thanks to C++11.

    Keep up the good writing. Cheers!
    – Kjell

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s