License : Creative Commons Attribution 4.0 International (CC BY-NC-SA 4.0)
Copyright :
Hervé Frezza-Buet,
CentraleSupelec
Last modified : April 19, 2024 10:22
Link to the source : condition-variables.md
/*
g++ -o test example-condition-variable-001.cpp -std=c++20 -lpthread
*/
#include <mutex>
#include <thread>
#include <condition_variable>
#include <atomic>
#include <random>
#include <chrono>
#include <vector>
#include <string>
#include <iostream>
#include <iomanip>
#include <syncstream>
// This simulates a hudge data.
struct some_data {
};
// This simulates the processing of a data instance, with a variable
// duration, in read-only mode.
inline void process(const some_data& data) {
std::random_device rd;
auto sleep = std::uniform_int_distribution<unsigned int>(500, 2000)(rd);
std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
}
// This simulates an update. Indeed, the function waits for you to hit
// the <enter> key to smulate the end of the writing into data.
inline void update(some_data& data) {
char c; std::cin.get(c);
}
// This simulates other computation, with a variable duration. as
// well.
inline void any_process() {
std::random_device rd;
auto sleep = std::uniform_int_distribution<unsigned int>(10, 500)(rd);
std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
}
// The goal is to encapsulate an instance of data, so that it is
// available in read-only mode for several threads at the same
// time. Nevertheless, if the data is to be changed, we have to wait
// the end of the current readings before starting the writing. When
// the writing is ongoing, readings are blocked. We suppose here that
// we have a single writer, and many readers. The following class also
// prints things on the terminal, for pedagogical purpose.
template<typename VALUE>
class multi_readable {
private:
VALUE content; // This is the data handled.
// This is the current number of active readers.
mutable std::atomic<unsigned int> nb_readers = 0;
// This tells if writing is in progress.
mutable std::atomic<bool> writing = false;
void safe_print(const std::string& message) const {
std::osyncstream os(std::cout);
os << message << std::endl;
}
void safe_print_bar() const {
std::osyncstream os(std::cout);
os << '|' << std::string(nb_readers, '#') << std::endl;
}
// We need to wait on two conditions.
// One is for the writer, since it has to wait that no reading is in
// progress.
mutable std::mutex read_mutex;
mutable std::condition_variable nobody_reads;
// The other is for the readers, that can start reading if writing
// is not in progress.
mutable std::mutex write_mutex;
mutable std::condition_variable no_writing_in_progress;
public:
multi_readable() = default;
// This is the reading method.
template<typename READING_FUNC>
void get(const READING_FUNC& read) const {
// Let us wait the end of an eventual writing.
{
std::unique_lock<std::mutex> lock(write_mutex);
// The while prevents from spurious awake...
while(writing) no_writing_in_progress.wait(lock);
++nb_readers;
}
// We process the reading.
safe_print_bar(); // For display only.
read(content); // content is provided as a const reference to read.
safe_print_bar(); // For display only.
// Now, we update the number of readers, and if we are the last
// one, we notify the eventual pending writer that all readers are
// done.
{
std::unique_lock<std::mutex> lock(read_mutex);
if(--nb_readers == 0) nobody_reads.notify_all();
}
}
// This is the writing method.
template<typename WRITING_FUNC>
void set(const WRITING_FUNC& write) {
safe_print("A writing operation is requested.");
// We tell every new reader starting reading from now to wait.
writing = true;
// Let us wait the end of an eventual readings.
{
std::unique_lock<std::mutex> lock(read_mutex);
// The while prevents from spurious awake...
while(nb_readers != 0) nobody_reads.wait(lock);
}
safe_print(">>> Writing starts");
write(content); // content is provided as a modifyable (non const) reference to read.
safe_print(">>> Writing ends, waiting 3s more for the sake of display.");
std::this_thread::sleep_for(std::chrono::seconds(3));
// Now, we awake eventual readers that are waiting for the writing
// to be over.
{
std::unique_lock<std::mutex> lock(write_mutex);
writing = false;
no_writing_in_progress.notify_all();
}
}
};
int main(int argc, char* argv[]) {
if(argc == 1) {
std::cout << "Usage : " << argv[0] << " <nb-readers>" << std::endl;
return 0;
}
std::cout << std::endl
<< std::endl
<< std::endl
<< "During the running, press <enter> to simulate a writing. Then the writing starts," << std::endl
<< "and takes time. Indeed, you have to press <enter> again to simulate the end of " << std::endl
<< "the writing. Type <enter> again to start a new writing, and so on..." << std::endl
<< std::endl
<< "press <enter> to start the simulation." << std::endl;
char c; std::cin.get(c); // This waits for the <enter> key hit.
unsigned int nb_threads = std::stoul(argv[1]);
multi_readable<some_data> data;
// We start the readers.
std::vector<std::thread> readers;
for(unsigned int i = 0; i < nb_threads; ++i)
readers.emplace_back([&data](){
while(true) { // The thread keeps on requiring data reads.
data.get([](const some_data& d) {process(d);});
any_process();
}
});
// Now we write the data when the user hits the <enter> key.
while(true) {
char c; std::cin.get(c);
data.set([](some_data& d){update(d);});
}
// This is useless since all threads are while(true)....
for(auto& reader : readers) reader.join();
return 0;
}