Wednesday 7 December 2016

Step by step multithreading : Chapter 8, Conditional Variable.

Intro -

In some specific cases mutex alone is not sufficient to lock the resource, in that cases we use conditional variables. 

Deep understanding through c++ code -

I will give you deep understanding on conditional variable through a classic producer-subscriber example. First pay attention to below mentioned code and its comment section –
#include<iostream>
#include<queue>
#include<thread>
#include<mutex>
#include<chrono>
using namespace std;

queue<int> MyQueue;
mutex locker;

// this function will push numbers in a queue and sleep for a second.
void producer()
{
    int count = 10;
    while(count > 0)
    {
        unique_lock<mutex> locked(locker);
        MyQueue.push(count);
        locked.unlock();
        std::this_thread::sleep_for(chrono::seconds(1)); // this will put current thread on mute.
        count--;
    }
}
// this function keep checking if MyQueue is empty or not, if it is not then it will pops 
// out the data and print it.
void subscriber()
{
    int data = 0;
    while(data != 1) // keep on checking the loop.
    {
        unique_lock<mutex> locked(locker);
        if(!MyQueue.empty())
        {
            data = MyQueue.back();
            MyQueue.pop();
            cout << "t2 got a value for t1 " << data << endl;
        }
        else
        {
            locked.unlock();
        }
    }
}

// producer function is a producer of the data and subscriber is a consumer of that data. 
// As data MyQueue is shared between two threads we are using mutex to lock (in both the loops) 
// before we access it. but the problem is, in subscriber, it will keep on checking if MyQueue 
// has some data or not. and that will lock the mutex very frequently which is not good.
int main()
{
    thread t1(producer);
    thread t2(subscriber);
    t1.join();
    t2.join();
    return 0;
}

As mention in the code, subscriber will keep on checking, whether producer has added any value or not. Which will keep locking the mutex again and again, which is not a good coding practice. The simple question here is - if subscriber keeps mutex locked, when producer will add new value?

One easy solution to this issue could be - We can use std::this_thread::sleep_for function to make subscriber wait. So after subscriber release the lock, it will wait for 10 seconds, and lock again and get the value. 

So now producer got 10 seconds to generate value. Look at the below mentioned program for more detailed understanding.
#include<iostream>
#include<queue>
#include<thread>
#include<mutex>
#include<chrono>
using namespace std;

queue<int> MyQueue;
mutex locker;

// this function will push numbers in a queue and sleep for a second.
void producer()
{
    int count = 10;
    while(count > 0)
    {
        unique_lock<mutex> locked(locker);
        MyQueue.push(count);
        locked.unlock();
        std::this_thread::sleep_for(chrono::seconds(1)); // this will put current thread on mute.
        count--;
    }
}
// this function keep checking if MyQueue is empty or not, if it is not then it will pops 
// out the data and print it.
void subscriber()
{
    int data = 0;
    while(data != 1) // keep on checking the loop.
    {
        unique_lock<mutex> locked(locker);
        if(!MyQueue.empty())
        {
            data = MyQueue.back();
            MyQueue.pop();
            cout << "t2 got a value for t1 " << data << endl;
        }
        else
        {
            locked.unlock();
            std::this_thread::sleep_for(chrono::seconds(10));
        }
    }
}

int main()
{
    thread t1(producer);
    thread t2(subscriber);
    t1.join();
    t2.join();
    return 0;
}

We can see one issue there. Is 10 seconds too long? Or too short? 
What if producer has produced value in 1 second and subscriber is just waiting for extra 9 seconds. Which is of course not a good design. Or in some other case, what if producer is not able to generate value in 10 seconds, it all depends upon requirement to requirement(or what kind of value we are generating?).

Desire solution -

We want something like, as soon as producer generate the value, subscriber should pick it up. To achieve that we have to use conditional variable

In c++, conditional.wait(locked); will put thread into sleep mode. And it will get up by signal given by conditional.notify_one();. Please look at the below mentioned program for more understanding.
#include<iostream>
#include<queue>
#include<thread>
#include<mutex>
#include<chrono>
#include<condition_variable>
using namespace std;

queue<int> MyQueue;
mutex locker;
std::condition_variable condition;

// this function will push numbers in a queue and sleep for a second.
void producer()
{
    int count = 10;
    while(count > 0)
    {
        unique_lock<mutex> locked(locker);
        MyQueue.push(count);
        locked.unlock();
        condition.notify_one(); // this will wakeup any thread if that is waiting for this condition.
                           // notify one thread.
        std::this_thread::sleep_for(chrono::seconds(1)); // this will put current thread on mute.
        count--;
    }   
}
// this function keep checking if MyQueue is empty or not, if it is not then it will pops 
// out the data and print it.
void subscriber()
{
    int data = 0;
    while(data != 1) // keep on checking the loop.
    {
        unique_lock<mutex> locked(locker);
        condition.wait(locked);// this will put thread 2 in sleep, until get notify from thread 1.
                           // means conditional variable can enforce, thread 2 to get the data 
                           // only when it get the data into the queue. and thread 1 notify it.
        /*
        Before wait function, put thread 2 in sleep mode. It will unlock the mutex mu. otherwise 
        it is not a good idea that you are putting any locked thread in sleep as other thread 
        will be waiting for mutex lock.
        
        and once it will get notification from thread 1, it will again put the lock on mutex 
        and then it will execute below mentioned code. After that you have to unlock the mutex
        after your business logic implementation.
        
        All things goes well, until thread 2 is not waking up and only become active once thread 1
        notify it. but the problem is, thread 2 can wake-up by its own, and that is we call a 
        spurious wake. So if at all, it gets wake up by its own, we have to put it in sleep mode again.
        */
        if(!MyQueue.empty())
        {
            data = MyQueue.back();
            MyQueue.pop();
            cout << "t2 got a value for t1 " << data << endl;
        }
        else
        {
            locked.unlock();
            std::this_thread::sleep_for(chrono::seconds(10));
        }
    }
}

int main()
{
    thread t1(producer);
    thread t2(subscriber);
    t1.join();
    t2.join();
    return 0;
}

As you might have seen in comment section. This program will work fine until thread t2 is in sleep mode. What if thread t2 wakes up? (when thread wake-up by its own we called it surprise wake-up), and by that time producer has not entered the value? It will be a disaster for sure. 

Actually function conditional.wait () can take two parameters, we will explore the second one. In second parameter we can pass lambda expressions. And if at all thread wake-up by its own, it will fire lambda expression and if lambda expression return false it will make thread sleep again. Look at the below mentioned program for more detail –
#include<iostream>
#include<queue>
#include<thread>
#include<mutex>
#include<chrono>
#include<condition_variable>
using namespace std;

queue<int> MyQueue;
mutex locker;
std::condition_variable condition;

// this function will push numbers in a queue and sleep for a second.
void producer()
{
    int count = 10;
    while(count > 0)
    {
        unique_lock<mutex> locked(locker);
        MyQueue.push(count);
        locked.unlock();
        condition.notify_one(); // this will wakeup any thread if that is waiting for this condition.
                           // notify one thread.
        std::this_thread::sleep_for(chrono::seconds(1)); // this will put current thread on mute.
        count--;
    }   
}
// this function keep checking if MyQueue is empty or not, if it is not then it will pops 
// out the data and print it.
void subscriber()
{
    int data = 0;
    while(data != 1) // keep on checking the loop.
    {
        unique_lock<mutex> locked(locker);
        condition.wait(locked, [](){return !MyQueue.empty();}); // passing lamda expression
        if(!MyQueue.empty())
        {
            data = MyQueue.back();
            MyQueue.pop();
            cout << "t2 got a value for t1 " << data << endl;
        }
        else
        {
            locked.unlock();
            std::this_thread::sleep_for(chrono::seconds(10));
        }
    }
}

int main()
{
    thread t1(producer);
    thread t2(subscriber);
    t1.join();
    t2.join();
    return 0;
}

Now last important point. conditional.notify_one(); will notify to only one thread, which we have seen in above mentioned programs. What if there are more threads (subscriber) waiting for producer? In that case we can use cond.notify_all(); it will wake up all the threads waiting for producer.

void producer()
{
    int count = 10;
    while(count > 0)
    {
        unique_lock<mutex> locked(locker);
        MyQueue.push(count);
        locked.unlock();
        cond.notify_all(); // 
        std::this_thread::sleep_for(chrono::second(1)); // this will put current thread on mute.
        count--;
    }   
}


Thanks for reading this, please write questions or some more useful information related to this topic on comment section. To learn more about threading, see the full learning index here, or join multithreading learning page on FB or on G++.

No comments:

Post a Comment