Wednesday, 7 December 2016

Step by step multithreading : Chapter 8, Conditional Variable.

Intro -

In some specific cases mutex alone is not sufficient to lock the resource, in that cases we use conditional variables. 

Deep understanding through c++ code -

I will give you deep understanding on conditional variable through a classic producer-subscriber example. First pay attention to below mentioned code and its comment section –
#include<iostream>
#include<queue>
#include<thread>
#include<mutex>
#include<chrono>
using namespace std;

queue<int> MyQueue;
mutex locker;

// this function will push numbers in a queue and sleep for a second.
void producer()
{
    int count = 10;
    while(count > 0)
    {
        unique_lock<mutex> locked(locker);
        MyQueue.push(count);
        locked.unlock();
        std::this_thread::sleep_for(chrono::seconds(1)); // this will put current thread on mute.
        count--;
    }
}
// this function keep checking if MyQueue is empty or not, if it is not then it will pops 
// out the data and print it.
void subscriber()
{
    int data = 0;
    while(data != 1) // keep on checking the loop.
    {
        unique_lock<mutex> locked(locker);
        if(!MyQueue.empty())
        {
            data = MyQueue.back();
            MyQueue.pop();
            cout << "t2 got a value for t1 " << data << endl;
        }
        else
        {
            locked.unlock();
        }
    }
}

// producer function is a producer of the data and subscriber is a consumer of that data. 
// As data MyQueue is shared between two threads we are using mutex to lock (in both the loops) 
// before we access it. but the problem is, in subscriber, it will keep on checking if MyQueue 
// has some data or not. and that will lock the mutex very frequently which is not good.
int main()
{
    thread t1(producer);
    thread t2(subscriber);
    t1.join();
    t2.join();
    return 0;
}

As mention in the code, subscriber will keep on checking, whether producer has added any value or not. Which will keep locking the mutex again and again, which is not a good coding practice. The simple question here is - if subscriber keeps mutex locked, when producer will add new value?

One easy solution to this issue could be - We can use std::this_thread::sleep_for function to make subscriber wait. So after subscriber release the lock, it will wait for 10 seconds, and lock again and get the value. 

So now producer got 10 seconds to generate value. Look at the below mentioned program for more detailed understanding.
#include<iostream>
#include<queue>
#include<thread>
#include<mutex>
#include<chrono>
using namespace std;

queue<int> MyQueue;
mutex locker;

// this function will push numbers in a queue and sleep for a second.
void producer()
{
    int count = 10;
    while(count > 0)
    {
        unique_lock<mutex> locked(locker);
        MyQueue.push(count);
        locked.unlock();
        std::this_thread::sleep_for(chrono::seconds(1)); // this will put current thread on mute.
        count--;
    }
}
// this function keep checking if MyQueue is empty or not, if it is not then it will pops 
// out the data and print it.
void subscriber()
{
    int data = 0;
    while(data != 1) // keep on checking the loop.
    {
        unique_lock<mutex> locked(locker);
        if(!MyQueue.empty())
        {
            data = MyQueue.back();
            MyQueue.pop();
            cout << "t2 got a value for t1 " << data << endl;
        }
        else
        {
            locked.unlock();
            std::this_thread::sleep_for(chrono::seconds(10));
        }
    }
}

int main()
{
    thread t1(producer);
    thread t2(subscriber);
    t1.join();
    t2.join();
    return 0;
}

We can see one issue there. Is 10 seconds too long? Or too short? 
What if producer has produced value in 1 second and subscriber is just waiting for extra 9 seconds. Which is of course not a good design. Or in some other case, what if producer is not able to generate value in 10 seconds, it all depends upon requirement to requirement(or what kind of value we are generating?).

Desire solution -

We want something like, as soon as producer generate the value, subscriber should pick it up. To achieve that we have to use conditional variable

In c++, conditional.wait(locked); will put thread into sleep mode. And it will get up by signal given by conditional.notify_one();. Please look at the below mentioned program for more understanding.
#include<iostream>
#include<queue>
#include<thread>
#include<mutex>
#include<chrono>
#include<condition_variable>
using namespace std;

queue<int> MyQueue;
mutex locker;
std::condition_variable condition;

// this function will push numbers in a queue and sleep for a second.
void producer()
{
    int count = 10;
    while(count > 0)
    {
        unique_lock<mutex> locked(locker);
        MyQueue.push(count);
        locked.unlock();
        condition.notify_one(); // this will wakeup any thread if that is waiting for this condition.
                           // notify one thread.
        std::this_thread::sleep_for(chrono::seconds(1)); // this will put current thread on mute.
        count--;
    }   
}
// this function keep checking if MyQueue is empty or not, if it is not then it will pops 
// out the data and print it.
void subscriber()
{
    int data = 0;
    while(data != 1) // keep on checking the loop.
    {
        unique_lock<mutex> locked(locker);
        condition.wait(locked);// this will put thread 2 in sleep, until get notify from thread 1.
                           // means conditional variable can enforce, thread 2 to get the data 
                           // only when it get the data into the queue. and thread 1 notify it.
        /*
        Before wait function, put thread 2 in sleep mode. It will unlock the mutex mu. otherwise 
        it is not a good idea that you are putting any locked thread in sleep as other thread 
        will be waiting for mutex lock.
        
        and once it will get notification from thread 1, it will again put the lock on mutex 
        and then it will execute below mentioned code. After that you have to unlock the mutex
        after your business logic implementation.
        
        All things goes well, until thread 2 is not waking up and only become active once thread 1
        notify it. but the problem is, thread 2 can wake-up by its own, and that is we call a 
        spurious wake. So if at all, it gets wake up by its own, we have to put it in sleep mode again.
        */
        if(!MyQueue.empty())
        {
            data = MyQueue.back();
            MyQueue.pop();
            cout << "t2 got a value for t1 " << data << endl;
        }
        else
        {
            locked.unlock();
            std::this_thread::sleep_for(chrono::seconds(10));
        }
    }
}

int main()
{
    thread t1(producer);
    thread t2(subscriber);
    t1.join();
    t2.join();
    return 0;
}

As you might have seen in comment section. This program will work fine until thread t2 is in sleep mode. What if thread t2 wakes up? (when thread wake-up by its own we called it surprise wake-up), and by that time producer has not entered the value? It will be a disaster for sure. 

Actually function conditional.wait () can take two parameters, we will explore the second one. In second parameter we can pass lambda expressions. And if at all thread wake-up by its own, it will fire lambda expression and if lambda expression return false it will make thread sleep again. Look at the below mentioned program for more detail –
#include<iostream>
#include<queue>
#include<thread>
#include<mutex>
#include<chrono>
#include<condition_variable>
using namespace std;

queue<int> MyQueue;
mutex locker;
std::condition_variable condition;

// this function will push numbers in a queue and sleep for a second.
void producer()
{
    int count = 10;
    while(count > 0)
    {
        unique_lock<mutex> locked(locker);
        MyQueue.push(count);
        locked.unlock();
        condition.notify_one(); // this will wakeup any thread if that is waiting for this condition.
                           // notify one thread.
        std::this_thread::sleep_for(chrono::seconds(1)); // this will put current thread on mute.
        count--;
    }   
}
// this function keep checking if MyQueue is empty or not, if it is not then it will pops 
// out the data and print it.
void subscriber()
{
    int data = 0;
    while(data != 1) // keep on checking the loop.
    {
        unique_lock<mutex> locked(locker);
        condition.wait(locked, [](){return !MyQueue.empty();}); // passing lamda expression
        if(!MyQueue.empty())
        {
            data = MyQueue.back();
            MyQueue.pop();
            cout << "t2 got a value for t1 " << data << endl;
        }
        else
        {
            locked.unlock();
            std::this_thread::sleep_for(chrono::seconds(10));
        }
    }
}

int main()
{
    thread t1(producer);
    thread t2(subscriber);
    t1.join();
    t2.join();
    return 0;
}

Now last important point. conditional.notify_one(); will notify to only one thread, which we have seen in above mentioned programs. What if there are more threads (subscriber) waiting for producer? In that case we can use cond.notify_all(); it will wake up all the threads waiting for producer.

void producer()
{
    int count = 10;
    while(count > 0)
    {
        unique_lock<mutex> locked(locker);
        MyQueue.push(count);
        locked.unlock();
        cond.notify_all(); // 
        std::this_thread::sleep_for(chrono::second(1)); // this will put current thread on mute.
        count--;
    }   
}


Thanks for reading this, please write questions or some more useful information related to this topic on comment section. To learn more about threading, see the full learning index here, or join multithreading learning page on FB or on G++.

Sunday, 4 December 2016

Step by step multithreading : Chapter 7, unique_lock.

We have seen very useful RAII, lock_guard to lock the mutex. There are many other ways, we can lock the mutex. One of the easy way is to call mutex’s own lock () and unlock () functions, which is not recommended. There is a very effective way to lock mutex, which is Unique_lock. Unique_lock is also RAII and as safe as lock_guard, on top of that it will provide many more flexibilities, which lock_guard doesn’t offer.

Flexibilities provided by Unique_lock –

  1. In same scope, we may want to lock mutex initially, but we may like to execute rest of the statements without mutex. Please look into below mentioned program (inside executeYourQuery function, initially we had locked the mutex, then unlock it, in the same scope, and then we can run rest of statements without lock.)
    #include<iostream>
    #include<memory>
    #include<thread>
    #include<mutex>
    using namespace std;
    
    class DB   // class responsible for all DB operation.
    {
        public:
            execute(const string& str)
            {
                //execute the sql in respective DB
            }
    };
    class DBConnection
    {
        public:
            DBConnection()
            {
                //m_connection = new DB;
            }
            //execute your query 
            void executeYourQuery(const string& str)
            {
                unique_lock<mutex> nowLockWithGuard(m_lock);
                m_connection->execute(str);
                nowLockWithGuard.unlock();
                //After unlocking the mutex, now we can execute other logics 
                //which we wanted to execute without mutex.
            }
        private:
            auto_ptr<DB*> m_connection;
            std::mutex m_lock;
    };
    
    int main(int argc, char **argv)
    {
        DBConnection connection;
        string input = "select * from emp;";
        connection.executeYourQuery(input);
        return 0;
    }
    
  2. We can lock/unlock as many times as we want. So if you have any business requirement, where you want to lock and unlock the mutex in same scope, Unique_lock will help you to do so. Look at the below mentioned program for more clarity. Also see how we can differ the locking through std::defer_lock
    #include<iostream>
    #include<thread>
    #include<mutex>
    using namespace std;
    
    class DB   // class responsible for all DB operation.
    {
        public:
            execute(const string& str)
            {
                //execute the sql in respective DB
            }
    };
    class DBConnection
    {
        public:
            DBConnection()
            {
                m_connection = new DB;
            }
            //execute your query 
            void executeYourQuery(const string& str)
            {
                /*
                we can make Unique_lock variable without locking the mutex.
                we can lock and unlock any number of times in the code, which is 
                very safe and standard library provides it.
                This we cannot do with lock_guard<>
                */
                unique_lock<mutex> nowLockWithGuard(m_mutex, std::defer_lock);
                m_connection->execute(str);
                nowLockWithGuard.unlock();
                //After unlocking the mutex, now we can execute other logics 
                //which we wanted to execute without mutex.
                nowLockWithGuard.lock();
                // we locked it again, execute some logic..
                // ...
                nowLockWithGuard.unlock();
                // we unlocked it gain.
            }
        private:
            auto_ptr<DB> m_connection;
            std::mutex m_lock;
    };
    
    int main(int argc, char **argv)
    {
        DBConnection connection;
        string input = "select * from emp;";
        connection.executeYourQuery(input);
        return 0;
    }
    


Like thread object, Unique_lock cannot be copied, we can only move it, from one Unique_lock to another Unique_lock.
#include<iostream>
#include<thread>
#include<mutex>
using namespace std;

class DB   // class responsible for all DB operation.
{
    public:
        execute(const string& str)
        {
            //execute the sql in respective DB
        }
};
class DBConnection
{
    public:
        DBConnection()
        {
            m_connection = new DB;
        }
        //execute your query 
        void executeYourQuery(const string& str)
        {
            unique_lock<mutex> nowLockWithGuard(m_mutex, std::defer_lock);
            m_connection->execute(str);
            nowLockWithGuard.unlock();
            //After unlocking the mutex, now we can execute other logics 
            //which we wanted to execute without mutex.
            nowLockWithGuard.lock();
            // we locked it again, execute some logic..
            // ...
            nowLockWithGuard.unlock();
            // we unlocked it gain.
            
            Unique_lock<mutex> nowLockWithGuard02 = std::move(nowLockWithGuard);
            // this will move the ownership of mutex from nowLockWithGuard to
            // nowLockWithGuard02, which is not possible in lock_guard<>
        }
    private:
        auto_ptr<DB> m_connection;
        std::mutex m_lock;
};

int main(int argc, char **argv)
{
    DBConnection connection;
    string input = "select * from emp;";
    connection.executeYourQuery(input);
    return 0;
}

Now the question is, if Unique_lock is that good and flexible, then why we don’t use only Unique_lock? The reason we cannot use Unique_lock in every condition is because it is a weighted object. Though it provide many flexibility, it is heavy object, use it all over the place will slow down the performance. lock_guard gives very less flexibility but for light weight application it is the best choice.

Now let’s implement thread safe singleton. A class who will have only one object at max, this will give you step by step understanding on how to implement multithreading in real problem solving scenario.
#include<iostream>
#include<thread>
#include<mutex>
using namespace std;

std::mutex m_lock;
    
class Singleton
{
public:
    static Singleton* GetInstance();
    
private:
    //zero argument copy constructor.
    Singleton(){}
    //Copy constructor.
    Singleton(Singleton &){}
    // overloaded assignment operator.
    const Singleton& operator = (Singleton &){}
    // single pointer which will get refereed in all the places.
    static Singleton *mp_singleton;

};

Singleton* Singleton::mp_singleton = NULL;

Singleton* Singleton::GetInstance()
{
    if(mp_singleton == NULL)
    {
        unique_lock<mutex> lockMutex(m_lock);
        mp_singleton = new Singleton;
        return mp_singleton;
    }
    else 
    {
        return mp_singleton;
    }
    
}
void doSomeWork()
{
    Singleton *ptr1 = Singleton::GetInstance();
    cout<<"Address of pointer one is          - "<< &ptr1 <<endl;
    cout<<"Address of pointer Singleton pointer from pointer one is   - "<< &(*ptr1) <<endl;
}
int main(int argc, char ** argv)
{
    thread t1(doSomeWork);
    t1.join();
    thread t2(doSomeWork);
    t2.join();
    return 0;
}

Let’s discuss the problem with this approach, what if two different threads reach to statement if(mp_singleton == NULL), of function GetInstance at same time, and at this moment mp_singleton is null (means we are calling it first time). 

So both thread will enter inside if block and first thread will get the lock (but remember second thread is also at same line of code (unique_lock<mutex> lockMutex(m_lock);) and just waiting first thread to release the lock), so first thread creates the Singleton object and returns, as soon as it releases the lock, second thread will get the lock, and create Singleton again. 

So in this case we will end up with two Singleton objects. And after wards for any thread’s call, as mp_singleton will not be null, so statement if(mp_singleton == NULL) will not be true.

Or I can put it in other way. What if thread one is just creating singleton object(it is in process), but not created yet, and thread two has arrived, at that moment thread two will found mp_singleton to NULL and he will also enter inside if block and create two threads.

So is that what we want, two objects, for our singleton class? Of course not.

How can we solve this problem? One simple way is to put locking mechanism outside of, if block. See the code below –
#include<iostream>
#include<thread>
#include<mutex>
using namespace std;

std::mutex m_lock;
    
class Singleton
{
public:
    static Singleton* GetInstance();
    
private:
    //zero argument copy constructor.
    Singleton(){}
    //Copy constructor.
    Singleton(Singleton &){}
    // overloaded assignment operator.
    const Singleton& operator = (Singleton &){}
    // single pointer which will get refereed in all the places.
    static Singleton *mp_singleton;

};

Singleton* Singleton::mp_singleton = NULL;

Singleton* Singleton::GetInstance()
{
    unique_lock<mutex> lockMutex(m_lock);
    if(mp_singleton == NULL)
    {
        mp_singleton = new Singleton;
        return mp_singleton;
    }
    else 
    {
        return mp_singleton;
    }
    
}
void doSomeWork()
{
    Singleton *ptr1 = Singleton::GetInstance();
    cout<<"Address of pointer one is          - "<< &ptr1 <<endl;
    cout<<"Address of pointer Singleton pointer from pointer one is   - "<< &(*ptr1) <<endl;
}
int main(int argc, char ** argv)
{
    thread t1(doSomeWork);
    t1.join();
    thread t2(doSomeWork);
    t2.join();
    return 0;
}

Yeeeee, we did it. Though this code has become thread safe, still a big issue is there. As the locking mechanism went outside of if block, every thread will come and lock it.

Though we have created only one singleton object. Our purpose to use mutex is, when we are creating first singleton object. After we create out first singleton object successfully, we don’t need locking anymore. 

As locking and unlocking has their own cost, now though we have created singleton object we are paying the cost of locking and unlocking mutex, for each and every call to GetInstance function, which is not a good programming practice.

Look below mentioned code, to solve above mentioned problem - 
#include<iostream>
#include<thread>
#include<mutex>
using namespace std;

std::mutex m_lock;
std::once_flag m_flagOneTime;
class Singleton
{
public:
    static Singleton* GetInstance();
    
private:
    //zero argument copy constructor.
    Singleton(){}
    //Copy constructor.
    Singleton(Singleton &){}
    // overloaded assignment operator.
    const Singleton& operator = (Singleton &){}
    // single pointer which will get refereed in all the places.
    static Singleton *mp_singleton;

};

Singleton* Singleton::mp_singleton = NULL;

Singleton* Singleton::GetInstance()
{
    if(mp_singleton == NULL)
    {
        unique_lock<mutex> lockMutex(m_lock);
        if(mp_singleton == NULL) // check same condition again
        {
            mp_singleton = new Singleton;
        }
    }
    return mp_singleton;
}
void doSomeWork()
{
    Singleton *ptr1 = Singleton::GetInstance();
    cout<<"Address of pointer one is          - "<< &ptr1 <<endl;
    cout<<"Address of pointer Singleton pointer from pointer one is   - "<< &(*ptr1) <<endl;
}
int main(int argc, char ** argv)
{
    thread t1(doSomeWork);
    t1.join();
    thread t2(doSomeWork);
    t2.join();
    return 0;
}

So we want this to execute mp_singleton = new Singleton; statement only one time, okay c++ gives us a way to do that. Use std::call_once() function. Look at below mentioned program –

#include<iostream>
#include<thread>
#include<mutex>
using namespace std;

std::mutex m_lock;
std::once_flag m_flagOneTime;
class Singleton
{
public:
    static Singleton* GetInstance();
    
private:
    //zero argument copy constructor.
    Singleton(){}
    //Copy constructor.
    Singleton(Singleton &){}
    // overloaded assignment operator.
    const Singleton& operator = (Singleton &){}
    // single pointer which will get refereed in all the places.
    static Singleton *mp_singleton;

};

Singleton* Singleton::mp_singleton = NULL;

Singleton* Singleton::GetInstance()
{
    std::call_once(m_flagOneTime, [&]() {mp_singleton = new Singleton;});
    return mp_singleton;
}
void doSomeWork()
{
    Singleton *ptr1 = Singleton::GetInstance();
    cout<<"Address of pointer one is          - "<< &ptr1 <<endl;
    cout<<"Address of pointer Singleton pointer from pointer one is   - "<< &(*ptr1) <<endl;
}
int main(int argc, char ** argv)
{
    thread t1(doSomeWork);
    t1.join();
    thread t2(doSomeWork);
    t2.join();
    return 0;
}

Std::call_once function help us to call any function only one time, and that is exactly we was looking in our code. I have used lambda expression, to create Singleton object inside call_once function.

Thanks for reading this, please write questions or some more useful information related to this topic on comment section. To learn more about threading, see the full learning index here, or join multithreading learning page on FB or on G++.