License : Creative Commons Attribution 4.0 International (CC BY-NC-SA 4.0)
Copyright :
Frédéric Pennerath,
CentraleSupelec
Last modified : April 19, 2024 10:22
Link to the source : synchronization.md
Resources shared between threads require mutex protection to avoid corruption or race conditions. Let’s consider the example of a task queue:
#include <queue>
#include <mutex>
class TaskQueue {
std::queue<Task> queue_; // Queue of tasks
std::mutex mutex_; // Mutex for protecting critical sections
public :
struct EmptyQueue {}; // Exception to signal an empty queue
// Push a task into the queue
void push(const Task& t) {
// Start of critical section: mutex is locked here
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(t);
// End of critical section: lock destructor releases mutex here
}
// Pop a task from the queue
Task pop() {
// Start of critical section: mutex is locked here
std::lock_guard<std::mutex> lock(mutex_);
// Note since C++17 one can juste write:
// std::lock_guard lock(mutex_);
// Destruction of lock_guard garantees the mutex is released in case an exception is triggered.
if(queue_.empty()) throw EmptyQueue {};
// One can safely pop a new task from the queue
Task res = queue_.front();
queue_.pop();
return res;
// End of critical section: lock destructor releases mutex here
}
size_t size() {
// Start of critical section: mutex is locked here
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
// End of critical section: lock destructor releases mutex here
}
};
TaskQueue queue;
size_t nbrTasks = 10, nbrThreads = 3;
// Runs tasks withdrawn from the queue
auto consumer = [&queue] () {
try {
while(true) queue.pop().run();
} catch(const TaskQueue::EmptyQueue&) {
std::cerr << "Thread " << std::this_thread::get_id() << "terminates (empty queue)" << std::endl;
}
};
// Add few tasks into the queue
for(size_t t = 0; t != nbrTasks; ++t) queue.push(Task{ std::to_string(t), 100ms });
// Launches some consumer threads
std::vector<std::thread> threads;
for(size_t t = 0; t != nbrThreads; ++t) threads.emplace_back(consumer);
// Wait for thread completion
for(auto& thread : threads) thread.join();
There are more evolved versions of mutexes than std::mutex
:
std::recursive_mutex
can be locked/unlocked multiple times by the same thread.std::timed_mutex
can be blocking obnly for a given finite amount of time.std::shared_mutex
provides two levels of access: shared access (for read operations) and exclusive access (for write operations).There are also more evolved versions of locks than std::lock_guard
:
std::unique_lock
implements a movable lock (other locks cannot be moved)std::shared_lock
allows to lock a shared mutex in shared mode (standard locks can be used to block a shared mutex in exclusive mode).std::scoped_lock
allows to lock multiple mutexes using a deadlock avoidance algorithm. lock with maximum timeout, lock on several mutexes, etc.Let’s now implement a true producer–consumer model based on TaskQueue
:
TaskQueue queue;
size_t nbrTasks = 10, nbrProducers = 3, nbrConsumers = 3;
// Runs tasks withdrawn from the queue
auto consumer = [&queue] () {
while(true)
try {
queue.pop().run();
} catch(const TaskQueue::EmptyQueue&) {
std::cout << "(empty)" << std::flush;
std::this_thread::sleep_for(10ms);
}
};
// Posts some tasks into the queue
auto producer = [&queue, nbrTasks] () {
using namespace std::literals::chrono_literals;
for(size_t t = 0; t != nbrTasks; ++t) {
queue.push(Task{ std::to_string(t), 100ms } );
std::this_thread::sleep_for(500ms);
}
};
// Launches some producer and consumer threads
std::vector<std::thread> threads;
for(size_t t = 0; t != nbrProducers; ++t) threads.emplace_back(producer);
for(size_t t = 0; t != nbrConsumers; ++t) threads.emplace_back(consumer);
// Wait for thread completion
for(auto& thread : threads) thread.join();
Mutexes on their own can not avoid the problem of polling which is inefficient (either latency time or excessive CPU consumption).
Let’s integrate a condition variable into class TaskQueue
:
#include <queue>
#include <mutex>
#include <condition_variable>
#include "Task.hpp"
class TaskQueue {
std::queue<Task> queue_; // Queue of tasks
std::mutex mutex_; // Mutex for protecting critical sections
std::condition_variable cond_; // Condition variable to signal a pending task
public :
// Push a task into the queue
void push(const Task& t) {
// Start of critical section: mutex is locked here
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(t);
// Signal there is a new pending task
cond_.notify_one();
// End of critical section: lock destructor releases mutex here
}
// Pop a task from the queue
Task pop() {
// Start of critical section: mutex is locked here
// Condition variables require to use unique_lock
// that provides lock() and unlock() methods
std::unique_lock<std::mutex> lock(mutex_);
// While there is no pending task
while(queue_.empty()) {
// Await for a signal (mutex is released)
cond_.wait(lock);
// Wake up, a signal has been received (mutex is acquired)
// Check there is really a pending task in the queue
// and this is not a spurious wakeup
}
// One can safely pop a new task from the queue
Task res = queue_.front();
queue_.pop();
return res;
// End of critical section: lock destructor releases mutex here
}
size_t size() {
// Start of critical section: mutex is locked here
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
// End of critical section: lock destructor releases mutex here
}
};
notify_one
or notify_all
), one or all of the waiting threads are woken up and get the resource mutex in turn.Mutex and condition variable address together most usage scenarii. C++20 introduces additional synchronization objects for specific usages:
Counting semaphores std::counting_semaphore
contain a non negative counter. A thread can increment and decrement the counter. Decrementing a counter equal to 0 blocks the calling thread until some other thread increments the count (leaving the counter equal to 0). Counting semaphores are not necessarily associated to a shared ressource contrary to condition variables that are built on top of a mutex.
Latches `std::latch
and barriers std:barrier
are synchronous barriers to ensure an arbitrary number of threads have all completed their task before going further on.