License : Creative Commons Attribution 4.0 International (CC BY-NC-SA 4.0)
Copyright : Hervé Frezza-Buet, CentraleSupelec
Last modified : April 19, 2024 10:22
Link to the source : condition-variables.md

Condition variables

/*

  g++ -o test example-condition-variable-001.cpp -std=c++20 -lpthread

*/

#include <mutex>
#include <thread>
#include <condition_variable>
#include <atomic>

#include <random>
#include <chrono>

#include <vector>
#include <string>

#include <iostream>
#include <iomanip>
#include <syncstream>

// This simulates a hudge data.
struct some_data {
};

// This simulates the processing of a data instance, with a variable
// duration, in read-only mode.
inline void process(const some_data& data) {
  std::random_device rd;
  auto sleep = std::uniform_int_distribution<unsigned int>(500, 2000)(rd);
  std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
}

// This simulates an update. Indeed, the function waits for you to hit
// the <enter> key to smulate the end of the writing into data.
inline void update(some_data& data) {
  char c; std::cin.get(c);
}

// This simulates other computation, with a variable duration. as
// well.
inline void any_process() {
  std::random_device rd;
  auto sleep = std::uniform_int_distribution<unsigned int>(10, 500)(rd);
  std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
}

// The goal is to encapsulate an instance of data, so that it is
// available in read-only mode for several threads at the same
// time. Nevertheless, if the data is to be changed, we have to wait
// the end of the current readings before starting the writing. When
// the writing is ongoing, readings are blocked. We suppose here that
// we have a single writer, and many readers. The following class also
// prints things on the terminal, for pedagogical purpose.

template<typename VALUE>
class multi_readable {
private:
  VALUE content; // This is the data handled.

  // This is the current number of active readers.
  mutable std::atomic<unsigned int> nb_readers = 0;

  // This tells if writing is in progress.
  mutable std::atomic<bool> writing = false;
  
  void safe_print(const std::string& message) const {
    std::osyncstream os(std::cout);
    os <<  message << std::endl;
  }

  void safe_print_bar() const {
    std::osyncstream os(std::cout);
    os << '|' << std::string(nb_readers, '#') << std::endl;
  }

  // We need to wait on two conditions.

  // One is for the writer, since it has to wait that no reading is in
  // progress.
  mutable std::mutex              read_mutex;
  mutable std::condition_variable nobody_reads;

  // The other is for the readers, that can start reading if writing
  // is not in progress.
  mutable std::mutex              write_mutex;
  mutable std::condition_variable no_writing_in_progress;

public:

  multi_readable() = default;
  
  // This is the reading method.
  template<typename READING_FUNC>
  void get(const READING_FUNC& read) const {

    // Let us wait the end of an eventual writing.
    {
      std::unique_lock<std::mutex> lock(write_mutex);
      // The while prevents from spurious awake...
      while(writing) no_writing_in_progress.wait(lock);
      ++nb_readers;
    }

    // We process the reading.
    safe_print_bar(); // For display only.
    read(content);    // content is provided as a const reference to read.
    safe_print_bar(); // For display only.

    // Now, we update the number of readers, and if we are the last
    // one, we notify the eventual pending writer that all readers are
    // done.
    {
      std::unique_lock<std::mutex> lock(read_mutex);
      if(--nb_readers == 0) nobody_reads.notify_all();
    }
  }

  // This is the writing method.
  template<typename WRITING_FUNC>
  void set(const WRITING_FUNC& write) {
    safe_print("A writing operation is requested.");
    
    // We tell every new reader starting reading from now to wait.
    writing = true;
    
    // Let us wait the end of an eventual readings.
    {
      std::unique_lock<std::mutex> lock(read_mutex);
      // The while prevents from spurious awake...
      while(nb_readers != 0) nobody_reads.wait(lock);
    }

    safe_print(">>> Writing starts");
    write(content); // content is provided as a modifyable (non const) reference to read.
    safe_print(">>> Writing ends, waiting 3s more for the sake of display.");
    std::this_thread::sleep_for(std::chrono::seconds(3));
    

    // Now, we awake eventual readers that are waiting for the writing
    // to be over.
    {
      std::unique_lock<std::mutex> lock(write_mutex);
      writing = false;
      no_writing_in_progress.notify_all();
    }
  }
};

int main(int argc, char* argv[]) {
  if(argc == 1) {
    std::cout << "Usage : " << argv[0] << " <nb-readers>" << std::endl;
    return 0;
  }

  std::cout << std::endl
	    << std::endl
	    << std::endl
	    << "During the running, press <enter> to simulate a writing. Then the writing starts," << std::endl
	    << "and takes time. Indeed, you have to press <enter> again to simulate the end of " << std::endl
	    << "the writing. Type <enter> again to start a new writing, and so on..." << std::endl
	    << std::endl
	    << "press <enter> to start the simulation." << std::endl;

  char c; std::cin.get(c); // This waits for the <enter> key hit.

  unsigned int nb_threads = std::stoul(argv[1]);
  
  multi_readable<some_data> data;

  // We start the readers.
  std::vector<std::thread> readers;
  for(unsigned int i = 0; i < nb_threads; ++i)
    readers.emplace_back([&data](){
	while(true) { // The thread keeps on requiring data reads.
	  data.get([](const some_data& d) {process(d);});
	  any_process();
	}
      });

  // Now we write the data when the user hits the <enter> key.
  while(true) {
    char c; std::cin.get(c);
    data.set([](some_data& d){update(d);});
  }

  // This is useless since all threads are while(true)....
  for(auto& reader : readers) reader.join();
  return 0;
}
Hervé Frezza-Buet,