License : Creative Commons Attribution 4.0 International (CC BY-NC-SA 4.0)
Copyright : Frédéric Pennerath, CentraleSupelec
Last modified : April 19, 2024 10:22
Link to the source : synchronization.md

Table of contents

Mutexes and locks

Resources shared between threads require mutex protection to avoid corruption or race conditions. Let’s consider the example of a task queue:

#include <queue>
#include <mutex>

class TaskQueue {
  std::queue<Task> queue_; // Queue of tasks
  std::mutex mutex_;       // Mutex for protecting critical sections

public :

  struct EmptyQueue {}; // Exception to signal an empty queue

  // Push a task into the queue
  void push(const Task& t) {
    // Start of critical section: mutex is locked here
    std::lock_guard<std::mutex> lock(mutex_);
    
    queue_.push(t);
    // End of critical section: lock destructor releases mutex here
  }

  // Pop a task from the queue
  Task pop() {
    // Start of critical section: mutex is locked here
    std::lock_guard<std::mutex> lock(mutex_);
    
    // Note since C++17 one can juste write: 
    // std::lock_guard lock(mutex_);
    
    // Destruction of lock_guard garantees the mutex is released in case an exception is triggered.
    if(queue_.empty()) throw EmptyQueue {};

    // One can safely pop a new task from the queue
    Task res = queue_.front();
    queue_.pop();

    return res;
    // End of critical section: lock destructor releases mutex here
  }

  size_t size() {
    // Start of critical section: mutex is locked here
    std::lock_guard<std::mutex> lock(mutex_);
    
    return queue_.size();
    // End of critical section: lock destructor releases mutex here
  }
};
  TaskQueue queue;
  size_t nbrTasks = 10, nbrThreads = 3;

  // Runs tasks withdrawn from the queue
  auto consumer = [&queue] () {
            try {
              while(true) queue.pop().run();
            } catch(const TaskQueue::EmptyQueue&) {
              std::cerr << "Thread " << std::this_thread::get_id() << "terminates (empty queue)" << std::endl;
            }
          };
    
  // Add few tasks into the queue
  for(size_t t = 0; t != nbrTasks; ++t) queue.push(Task{ std::to_string(t), 100ms });

  // Launches some consumer threads
  std::vector<std::thread> threads;
  for(size_t t = 0; t != nbrThreads; ++t) threads.emplace_back(consumer);

  // Wait for thread completion
  for(auto& thread : threads) thread.join();

The expert’s corner:

There are more evolved versions of mutexes than std::mutex:

There are also more evolved versions of locks than std::lock_guard:

Condition variables

The problem of polling

Let’s now implement a true producer–consumer model based on TaskQueue:

  TaskQueue queue;
  size_t nbrTasks = 10, nbrProducers = 3, nbrConsumers = 3;

  // Runs tasks withdrawn from the queue
  auto consumer = [&queue] () {
            while(true)
              try {
            queue.pop().run();
              } catch(const TaskQueue::EmptyQueue&) {
            std::cout << "(empty)" << std::flush;
            std::this_thread::sleep_for(10ms);
              }
          };

  // Posts some tasks into the queue
  auto producer = [&queue, nbrTasks] () {
            using namespace std::literals::chrono_literals;     

            for(size_t t = 0; t != nbrTasks; ++t) {
              queue.push(Task{ std::to_string(t), 100ms } );
              std::this_thread::sleep_for(500ms);
            }
          };

  // Launches some producer and consumer threads
  std::vector<std::thread> threads;

  for(size_t t = 0; t != nbrProducers; ++t) threads.emplace_back(producer);
  for(size_t t = 0; t != nbrConsumers; ++t) threads.emplace_back(consumer);

  // Wait for thread completion
  for(auto& thread : threads) thread.join();

Mutexes on their own can not avoid the problem of polling which is inefficient (either latency time or excessive CPU consumption).

A good solution requires a condition variable

Let’s integrate a condition variable into class TaskQueue:

#include <queue>
#include <mutex>
#include <condition_variable>

#include "Task.hpp"

class TaskQueue {
  std::queue<Task> queue_;       // Queue of tasks
  std::mutex mutex_;             // Mutex for protecting critical sections
  std::condition_variable cond_; // Condition variable to signal a pending task

public :

  // Push a task into the queue
  void push(const Task& t) {
    // Start of critical section: mutex is locked here
    std::lock_guard<std::mutex> lock(mutex_);
    queue_.push(t);

    // Signal there is a new pending task
    cond_.notify_one();

    // End of critical section: lock destructor releases mutex here
  }

  // Pop a task from the queue
  Task pop() {
    // Start of critical section: mutex is locked here
    // Condition variables require to use unique_lock
    // that provides lock() and unlock() methods
    std::unique_lock<std::mutex> lock(mutex_);

    // While there is no pending task
    while(queue_.empty()) {
      // Await for a signal (mutex is released)
      cond_.wait(lock);
      // Wake up, a signal has been received (mutex is acquired)

      // Check there is really a pending task in the queue
      // and this is not a spurious wakeup
    }

    // One can safely pop a new task from the queue
    Task res = queue_.front();
    queue_.pop();

    return res;
    // End of critical section: lock destructor releases mutex here
  }

  size_t size() {
    // Start of critical section: mutex is locked here
    std::lock_guard<std::mutex> lock(mutex_);
    return queue_.size();
    // End of critical section: lock destructor releases mutex here
  }
};

More on the problem of spurious wakeup

Summary:

The expert’s corner

Mutex and condition variable address together most usage scenarii. C++20 introduces additional synchronization objects for specific usages:

Frédéric Pennerath,