C++ Reactive Programming
上QQ阅读APP看书,第一时间看更新

Condition variables

We already know that mutexes can be used to share common resources and synchronize operations between threads. But synchronization using mutexes is a little complex and deadlock-prone if you are not careful. In this section, we will discuss how to wait for events with condition variables and how to use them for synchronization in an easier way.

When it comes to synchronization using mutexes, if the waiting thread has acquired a lock over a mutex, it can't be locked by any other thread. Also, waiting for one thread to complete its execution by checking on a status flag periodically that is protected by a mutex is a waste of CPU resources. This is because these resources can be effectively utilized by other threads in the system rather than having to wait for a longer time.

To address these problems, the C++ standard library has provided two implementations of conditional variables: std::condition_variable and std::condition_variable_any. Both are declared inside the <condition_variable> library header, and both the implementations need to work with a mutex to synchronize threads. The implementation of std::condition_variable is limited to working with std::mutex. On the other hand, std::condition_variable_any can work with anything that meets mutex-like criteria (mutex-like semantics), hence suffix _any. Because of its generic behavior, std::condition_variable_any ends up consuming more memory and degrades performance. It is not recommended unless a real, tailored requirement is in place.

The following program is an implementation of odd-even threads that we discussed when we talked about mutexes, which is now being re-implemented using condition variables:

std::mutex numMutex; 
std::condition_variable syncCond; 
auto bEvenReady = false; 
auto bOddReady  = false; 
void printEven(int max) 
{ 
    for (int i = 0; i <= max; i +=2) 
    { 
        std::unique_lock<std::mutex> lk(numMutex); 
        syncCond.wait(lk, []{return bEvenReady;}); 
         
        std::cout << i << ","; 
         
        bEvenReady = false; 
        bOddReady  = true; 
        syncCond.notify_one(); 
    } 
}

The program starts with the declaration of a mutex, a conditional variable, and two Boolean flags globally so that we can synchronize them between two threads. The printEven function gets executed in a worker thread and prints only even numbers starting from 0. Here, when it enters the loop, the mutex is protected with std::unique_lock instead of std::lock_guard; we will see the reason for that in a moment. The thread then calls the wait() function in std::condition_variable, passing the lock object and a Lambda predicate function that expresses the condition being waited for. This can be replaced with any callable object that returns bool. In this function, the predicate function returns the bEvenReady flag, so that the function continues execution when it becomes true. If the predicate returns false, the wait() function will unlock the mutex and wait for another thread to notify it, hence the std::unique_lock object comes handy here with the provided flexibility to lock and unlock.

As soon as std::cout prints the loop index, the bEvenReady flag is raised to false and bOddReady is raised to true. Then, the call to the notify_one() function associated with syncCond signals the waiting odd thread to write an odd number into the standard output stream:

void printOdd(int max) 
{ 
    for (int i = 1; i <= max; i +=2) 
    { 
        std::unique_lock<std::mutex> lk(numMutex); 
        syncCond.wait(lk, []{return bOddReady;}); 
         
        std::cout << i << ","; 
         
        bEvenReady = true; 
        bOddReady  = false; 
        syncCond.notify_one(); 
    } 
} 

The printOdd function gets executed in another worker thread and prints only odd numbers starting from 1. Like the printEven function, a loop iterates and prints the index that is protected by the globally declared conditional variable and mutex. Unlike the printEven function, the predicate used in the wait() function of a condition variable returns bOddReady, and the bEvenReady flag is raised to true and the bOddReady flag is raised to false. Followed by that, calling the notify_one() function associated with syncCond signals the waiting even thread to write an even number into the standard output stream. This interleaved printing of even and odd numbers continues until the max value:

int main() 
{ 
    auto max = 10; 
    bEvenReady = true; 
     
    std::thread t1(printEven, max); 
    std::thread t2(printOdd, max); 
     
    if (t1.joinable()) 
        t1.join(); 
    if (t2.joinable()) 
        t2.join(); 
     
} 

The main function launches two background threads, t1, which is associated with the printEven function and t2, which is associated with the printOdd function. The output starts when even parity is confirmed by raising the bEvenReady flag to true before the threads are launched.