Concurrent object wrapper – C++11 – Part II

In a previous post I presented an implementation of a concurrent object wrapper. In this post, the implementation is extended to deal with one obvious short-coming of the original: the lack of a means to return values from the actions passed to the wrapper.

As a re-cap, this is a typical call to the original wrapper:


// wrap an output file stream
Concurrent<std::ofstream> log("log.txt");

// Thread 1 prints a message to the log file
log( [](std::ostream& o){ o << "Hello 1..."; o << " Bye 1"; } );

// Thread 2 prints a message to the log file
log( [](std::ostream& o){ o << "HELLO 2..."; o << " BYE 2"; } );

Now, we would like to have return values, while maintaining the non-blocking nature of the calls. This is a possible use:


// wrap an std::vector<int>
Concurrent<std::vector<int>> cv{std::vector<int>(10)};

auto f1 = cv([](std::vector<int>& v)->bool{
               std::iota(v.begin(), v.end(), 0);
               return true;
             });

auto f2 = cv([](std::vector<int>& v){
               return std::accumulate(v.begin(), v.end(), 0);
             });

auto f3 = cv([](std::vector<int>& v){ return v.size(); });

where the results of each call are stores in variables f1, f2 and f3. The non-blocking return semantics can be expressed using std::future, so all we need to do is re-engineer Concurrent::operator() to set up a future-promise pair, and return the future to the caller. Recall the original operator:

  template <typename F>
  void operator()(F f) const
  {
    queue_.push([=]{f(resource_);});
  }

Our new version should return an std::future<decltype(f(WrappedType))>. This can be achieved using trailing return type syntax. Here, we set up the promise-future pair, and add a simple lambda function that sets the up the asynchronous call:

template <typename F>
auto operator()(F f) const -> std::future<decltype(f(resource_))>
{
  auto prom = std::make_shared<std::promise<decltype(f(resource_))>>();
  auto fut = prom->get_future();
  queue_.push( [=]{ prom->set_value(f(resource_)); } );
  return fut;
}

Note that, since the trailing return type is part of the member function declaration, and not its definition, names involved in it must be previously declared. In this particular case, it means that the declaration of data member resource_ must come before our call operator.

There is a catch: in the above code, prom->set_value(f(resource_)) will not compile for void f(T). We can fix this with a level of indirection: a couple of non-member helper functions to call the relevant std::promise::set_value() overload depending on whether f() returns void or not:

template <typename Fut, typename Fun, typename T>
void promise_set_value(std::promise<Fut>& prom, Fun& f, T& t)
{
  prom.set_value(f(t));
}

template <typename Fun, typename T>
void promise_set_value(std::promise<void>& prom, Fun& f, T& t)
{
  f(t);
  prom.set_value();
}

Armed with this, and taking care of propagating any exceptions thrown in the worked thread to the caller thread, we arrive at this implementation of the call operator:

template <typename F>
auto operator()(F f) const -> std::future<decltype(f(resource_))>
{
  auto prom = std::make_shared<std::promise<decltype(f(resource_))>>();
  auto fut = prom->get_future();     
  queue_.push([=]{
                   try { promise_set_value(*prom, f, resource_); }
                   catch(std::exception&) { prom->set_exception(std::current_exception());}
                 }
             );
  return fut;
}

A working implementation, together with some simple demo code, can be found on github. It has been compiled and tested with GCC 4.7.3.

Advertisements

One comment

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s