License : Creative Commons Attribution 4.0 International (CC BY-NC-SA 4.0)
Copyright : Frédéric Pennerath, CentraleSupelec
Last modified : April 19, 2024 10:22
Link to the source : multicore.md

Table of contents

Multicore programming based on futures

Futures and Promises

Let’s consider again mathematical vectors:

#include <array>
...
constexpr size_t n = 100000;

struct Vector : public std::array<double,n> {  // A vector is an array of n double
  static Vector random();                      // Builds a random vector
  double operator*(const Vector& other) const; // Dot product of two vectors
};

Suppose we want to build two random vectors in parallel to compute their dot products:

#include <future>

std::promise<Vector> promise;                   // We promise a vector 
std::future<Vector>  V1 = promise.get_future(); // We get the future from the promise

std::thread collaborator {[&promise] () {   // Compute V1 in a dedicated thread
       Vector V = Vector::random();         // Do the computation
       promise.set_value(std::move(V));     // Move of result into the promise
}};

Vector V2 = Vector::random(); // In parallel compute V2 in the main thread

std::cout << "V1 * V2 = " << V1.get() * V2 << std::endl;
collaborator.join();

Remarks:

Management of exceptions

struct division_by_zero {};

double harmonic_mean(const Vector& V) {
  double s = 0.;
  for(double coef : V)
    if(coef == 0.) throw division_by_zero{};
    else s += 1. / coef;
  return V.size() / s;
}

// We want to compute asynchroneously the harmonic mean of a vector with some 0
Vector V = Vector::random(); V[1234] = 0.;    // to cause an exception
std::promise<double> promise;
std::future<double>  mean = promise.get_future();

std::thread collaborator {[&promise, &V] () {
    try { 
      promise.set_value(harmonic_mean(V));
    } catch(...) {
      std::exception_ptr ex = std::current_exception();  // Get a pointer on the current exception
      promise.set_exception(ex);                         // Transfer the exception to the future
    }
}};

try {
  std::cout << "Harmonic mean " << std::flush << mean.get() << std::endl;   
} catch(division_by_zero) {
  std::cerr << "made a division by zero !" << std::endl;
}
collaborator.join();

Remarks:

async is your friend

Vector V = Vector::random(); V[1234] = 0.;    // We cause a division by zero

std::future<double> mean = std::async(std::launch::async, harmonic_mean, V); // Call async in async mode

try {
  std::cout << "Harmonic mean " << std::flush << mean.get() << std::endl;   
} catch(division_by_zero) {
  std::cerr << "made a division by zero !" << std::endl;
}

Remarks:

Parallelization of algorithms on multi-core architecture

Sometimes futures are not the best solution. Inplace parallelized algorithms can do better. Let’s look at the example of sorting.

A simple sorting algorithm using two cores

template <typename RandomIter>
void sort_with_two_cores(RandomIter const start, RandomIter const end) {
  // Split in two parts the sequence to sort
  RandomIter mid = start + (end - start)/2;

  // Assign the sorting of the first half to a thread
  std::thread secondThread([start, mid] () { std::sort(start, mid); });

  // Sort the second half in the main thread
  std::sort(mid, end);

  // Synchronisation point
  secondThread.join();

  // Merge the two halves into a sorted array
  std::inplace_merge(start, mid, end);
}
Sorting 1000 integers 

Creating dataset ... ok
100 tests "Sort with one core"  ... computed on average in 0.000205664s.
100 tests "Sort with two cores" ... computed on average in 0.00010411s.

Sorting 10000 integers 

Creating dataset ... ok
10 tests "Sort with one core"  ... computed on average in 0.00109624s.
10 tests "Sort with two cores" ... computed on average in 0.000686576s.

Sorting 100000000 integers 

Creating dataset ... ok
1 test "Sort with one core"  ... computed on average in 21.0161s.
1 test "Sort with two cores" ... computed on average in 11.1098s.

A general recursive sort with n cores

template <typename RandomIter>
void sort_with_n_cores(RandomIter const start, RandomIter const end, size_t nCores = std::thread::hardware_concurrency()) {
  // Compute the number of intervals to sort in parallel
  assert(start <= end);
  size_t n = end - start;
  size_t segmentSize = n / nCores ;

  // Assign the sorting of every interval to a different thread, but the last one
  std::vector<std::thread> threads;
  RandomIter ptr;
  for(ptr = start; ptr + segmentSize < end; ptr += segmentSize)
    threads.emplace_back([ptr, segmentSize] () { std::sort(ptr, ptr + segmentSize); });

  // Sort the last interval in the main thread
  std::sort(ptr, end);

  // Wait all threads have finished
  for(std::thread& thread : threads) thread.join(); 
  threads.clear();

  // While data contain more than one segment
  while(segmentSize < n) {
    // Merge intervals two by two
    auto mid = segmentSize;
    segmentSize *= 2;
    for(ptr = start; ptr + segmentSize < end; ptr += segmentSize)
      threads.emplace_back([ptr, mid, segmentSize] () { std::inplace_merge(ptr, ptr + mid, ptr + segmentSize); });

    // The main thread merges the last interval
    std::inplace_merge(ptr, ptr + ((end - ptr) / 2), end);

    // Wait all threads have finished
    for(std::thread& thread : threads) thread.join();   
    threads.clear();
  }
}
Sorting 1000 integers 

Creating dataset ... ok
1000 tests "Sort with one core" ... computed on average in 7.48429e-05s.
1000 tests "Sort with two cores" ... computed on average in 5.23481e-05s.
1000 tests "Sort with 16 cores" ... computed on average in 0.000346463s.

Sorting 10000 integers 

Creating dataset ... ok
1000 tests "Sort with one core" ... computed on average in 0.00108602s.
1000 tests "Sort with two cores" ... computed on average in 0.000611962s.
1000 tests "Sort with 16 cores" ... computed on average in 0.000536732s.

Sorting 100000000 integers 

Creating dataset ... ok
1 tests "Sort with one core" ... computed on average in 21.0763s.
1 tests "Sort with two cores" ... computed on average in 11.4s.
1 tests "Sort with 16 cores" ... computed on average in 3.49781s.
Frédéric Pennerath,