Concurrent queue – C++11

This is an implementation of the classic multiple producer, multiple consumer thread-safe queue concept. Much has been said about this before, but I need a simple, pure C++ version of this for future posts, so I thought I might as well say a few words about it. I had set a variant on this theme as an exercise in a C++11 workshop, and I also encountered a simplified variant of it in a job interview assessment question. It asked to implement a thread safe queue a single producer and a single consumer thread. The interface was minimal and clean:

template <typename T>
struct Queue
{
  // pop an element and return a copy. Block if queue empty.
  T pop();
  //  push an element to the back of the queue.
  void push(const T& item);
};

Let’s spend a moment looking at the advantages and disadvantages of this interface. For reference, we look at a part of the std::queue container adapter. A simplified version, with the methods needed for enqueueing and popping elements, would look something like this:

template <typename T>
class queue
{
  T& front();
  void push( const T& value );
  void pop();
  bool empty() const; // we need this to avoid UB when calling front()
};

We see immediately that we have twice as many methods to achieve the same two operations. There good reasons for this:

  • Exception safety: the standard library call to front() does not return return a copy of the element and pop it off the queue because if the resulting copy construction throws an exception, the element is lost: it has been removed from the queue, and the caller has failed to obtain a copy. This would break the strong exception guarantee. We are required to copy the front() element first, and only then pop() it from the queue.
  • Non-blocking access: a call to front() is synchronous and must return a valid reference to an element of the queue. If the queue is empty, the call cannot block, and results in undefined behaviour. The empty() method allows us to check before calling front().

These requirements are at odds with concurrent access: we have to carefully synchronise four operations (std::queue has more even methods that would require synchronization, amongst them size() and back()). In fact, the standard library containers and container adapters have not been designed considering thread safety in the presence of race conditions (that is, more than one thread accessing them with at least one of them performing write operations). The Queue interface in the first code sample simplifies the problem by condensing the de-queueing operations into a single returning, blocking pop() member function, at the cost of some exception safety.

There is no clean way to solve this last problem. One option is to modify Queue::pop() to take a reference to an element:

template <typename T>
struct Queue
{
 ...
  void pop(T& firstInQueue);
};

This makes it ugly on the caller side:

Queue<SomeType> q;
...
SomeType elem;
q.pop(elem);

We have a choice between elegance and the strong exception safety guarantee. We can opt for the former, and only provide the basic guarantee, meaning that the invariants of the Queue component are maintained and no resources are leaked. Or we can provide the safe and unsafe methods and let the caller decide. In any case, lets move on from exception safety and onto thread safety.

Since I want to use this queue for examples requiring more than one writer and one reader thread, let’s up the requirements to deal with multi-writer, multi-reader threads. It is a good moment to go over the requirements:

  • Concurrent access from multiple writer and reader threads
  • pop() blocks if queue is empty
  • At least one means to pop an element from queue satisfying the strong exception guarantee
  • No user actions should result in deadlock
  • Performance: minimize lock contention, polling.

In terms of the implementation, there are three key issues to consider:

  1. Only one thread can be pushing an element at any given time
  2. Only one thread can be popping an element at any given time
  3. Threads must wait for elements to be in the queue before they can pop

We can implement this functionality using a single std::mutex, together with an std::condition_variable, following this excellent example.

The mutex prevents concurrent reads and writes, and the condition variable allows consuming threads to wait for elements to be available in the queue without excessive mutex contention and without using expensive and inefficient polling.

#include <mutex>
#include <condition_variable>
#include <queue>

template <typename T>
class Queue
{
 ...
 std::queue<T> queue_;
 std::mutex mutex_;
 std::condition_variable cond_;
};

The core of our pop() methods looks like this:

  std::unique_lock<std::mutex> mlock(mutex_); // mutex scope lock
  while (queue_.empty()) // check condition to be safe against spurious wakes
  {
    cond_.wait(mlock); // release lock and go join the waiting thread queue
  }
  // copy front(), pop() and return
}

The method is protected by a mutex, which is locked using scope guard std::unique_lock. If the queue is empty, we wait on the condition variable cond_. This releases the lock to other threads and blocks until we are notified that the condition has been met. In this case, the call to wait() returns and we check the while condition again. This extra check is because conditions may experience spurious wakes: we could be wrongly notified while the queue is still empty.

The condition notification will be provided when an element is pushed into the queue, as shown here in the push() method:

  void push(const T& item)
  {
    std::unique_lock<std::mutex> mlock(mutex_);
    queue_.push(item);
    mlock.unlock();     // unlock before notificiation to minimize mutex contention
    cond_.notify_one(); // notify one waiting thread
  }

We notify with every call to push() to wake up threads that could be waiting eventhough there is data in the queue.

Our final queue looks like this:

#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

template <typename T>
class Queue
{
 public:

  T pop()
  {
    std::unique_lock<std::mutex> mlock(mutex_);
    while (queue_.empty())
    {
      cond_.wait(mlock);
    }
    auto item = queue_.front();
    queue_.pop();
    return item;
  }

  void pop(T& item)
  {
    std::unique_lock<std::mutex> mlock(mutex_);
    while (queue_.empty())
    {
      cond_.wait(mlock);
    }
    item = queue_.front();
    queue_.pop();
  }

  void push(const T& item)
  {
    std::unique_lock<std::mutex> mlock(mutex_);
    queue_.push(item);
    mlock.unlock();
    cond_.notify_one();
  }

  void push(T&& item)
  {
    std::unique_lock<std::mutex> mlock(mutex_);
    queue_.push(std::move(item));
    mlock.unlock();
    cond_.notify_one();
  }

 private:
  std::queue<T> queue_;
  std::mutex mutex_;
  std::condition_variable cond_;
};

where we have provided the two versions of the pop() method discussed earlier, plus an of push() taking an rvalue reference T&& to benefit from move semantics. We can think of a few useful extensions to this queue class. For example, it would be nice to have a timeout pop() facility, and it could be wise to restrict the queue to a maximum length, beyond which calls to push() would block. These improvements may form the basis of a future post.

Sample code can be found on github.

28 comments

    1. Currently, it only works for copyable and assignable types. I might extend it to non-copyable, movable types. Do you have a particular use-case where this would be beneficial?

    1. empty() should not modify the queue, so it would make no sense for it not to be const. But note that the real concurrent queue developed in this post does not have an empty() member, because it doesn’t make much sense in a multi-threaded context to ask is something is empty while some other thread can add elements before you use the result of the call to empty().

  1. I’m confused, I don’t understand why the “pop”s don’t need “mlock.unlock();”s.

  2. Hi , for my application i needed a fixed size queue so that only a limited number of elements can be pushed in it. How can i add this synchronization mechanism to void push(const T& item) so that the writer thread will wait when the queue is filled to the given size ? Should i use another condition variable besides cond_ which will enable the thread to wait if the queue is full ?

    1. Use a separate condition_variable which waits untill one place is availabe in the queue

  3. Interesting article – thanks!
    Few questions:
    1. Any reason not to use shared_ptr with this implementation?
    2. You are using a single lock here. I wonder if you are familiar with an implementation which will be more efficient in terms of poping and pushing. After all, you can design a queue that will require full lock when the size is more than two, but otherwise, you don’t have to lock the readers and the writers (only readers or writer).

    and last, but not least, with what license is this being released?
    (otherwise we cannot use this code at all)

    1. 1. I simply didn’t see a reason to use one. Did you have any specific use-case in mind?
      2. I wanted to keep it as simple as possible, and never used it enough to run into performance issues that would merit making the code more complex. I may look into C++14 shared locking for the readers.
      3. I released it under the BSD 2-clause license, which I believe is quite permissive. There’s a copy of the license on the git repository where the code is hosted (see link at the bottom of the post.) So, feel free to pick it up, try it out, improve/fix it if necessary. I haven’t really had the time to develop this beyond a simple demo.

  4. Your design is broken though because you dont define thread safely copy constructor, and = operator:
    Queue( const Queue& other)
    {
    std::lock_guard guard( other.mutex_ );
    queue_ = other.queue_;
    }

    Queue& operator= (Queue& other)
    {
    if(&other == this)
    return *this;
    std::unique_lock lock1( mutex_, std::defer_lock);
    std::unique_lock lock2( other.mutex_, std::defer_lock);
    std::lock( lock1, lock2);
    queue_ = other.queue_;
    return *this;
    }

    1. I wouldn’t say it is broken. I have decided the queue should not be copyable or assignable, so those operations are not permitted. Neither copy construction nor assignment would compile.

      1. In such a case, declare it non copyable and not assignable by adding:
        Queue(const Queue &) = delete;
        Queue &operator=(const Queue &) = delete;

  5. Great article!

    May I go on and ask: Is there any way to make sure that, if the queue is detroyed, a blocked thread trying to pop() gets released?

    1. I can’t think of an easy way to do that. But I think that is OK. The assumption is that the queue will outlive the threads, so client code should ensure that happens. I don’t think it is worth being to clever about this.

  6. Juan, you can combine the 2 push functions into one using perfect forwarding:

    void push(T&& item)
    {
    std::unique_lock mlock(mutex_);
    queue_.push(std::forward(item));
    mlock.unlock();
    cond_.notify_one();
    }

    Joogle for “perfect forwarding” and “universal references”. My $0.02.

    1. It could certainly benefit from less code duplication, but for perfect forwarding I would have to make `push` a function template, then ensure that its template parameter is the same as T. I’m not sure it is worth it.

      1. Better yet, do this:
        void push(T item)
        {
        std::unique_lock mlock(mutex_);
        queue_.push(std::move(item));
        mlock.unlock();
        cond_.notify_one();
        }

      2. See my other answer below. This will take care of both r- and lvalues in one function and with no templates.

      3. Template deduction is required for perfect forwarding. There’s no template deduction unless push is a function template.

  7. juan, I meant my 2nd version. It takes item by value. No templates involved. If you pass rvalue to it, it will be moved into item and then into queue_. If you pass lvalue, it will be copied into item and then moved into queue. One function handles both versions. No code duplication.

    1. This is the one I am talking about:
      void push(T item)
      {
      std::unique_lock mlock(mutex_);
      queue_.push(std::move(item));
      mlock.unlock();
      cond_.notify_one();
      }

Leave a reply to juanchopanza Cancel reply