12/05/2013

C++ Multithreading Support


C++11 Multithreading overview 


The C++11 standard provides multithreading support, including the following features: thread creation and management, mutexes, atomic objects, thread local storage and condition variables.
Thread creation and management is provided by the std::thread class which is defined in <thread>. The class works with regular functions, lambdas (functions written directly in your source code) or functors (classes with overloaded () operator).  Any number of parameters can be passed to the thread function.  To work with the thread you should create an instance of std::thread class and pass it the working function.  Let’s see the example:
#include <iostream>
#include <thread>

void testFunc()
{
       std::cout << "Hello from the thread!";
}

int main(int argc, char *argv [])
{
       std::thread my_thread(testFunc);
       my_thread.join();
       return 0;
} 
In this example a reference to the trivial function is being passed to the constructor of the thread class. After thread creation we call join() to ensure that it has finished executing before program exit. Technically you should check whether the thread is joinable by calling its joinable() method first. If you don’t want to wait for thread execution finish, you could call the detach() member function to separate thread execution from the thread object. After calling detach() this thread object is no longer joinable. Any number of arguments can be passed to a thread:
#include <iostream>
#include <thread>

void printEven(int printArray[], int size)
{
       for (int i = 0; i < size; ++i)
       {
             if (printArray[i] % 2 == 0)
                    std::cout << "Even number " << i << " is - " << printArray[i] << "\n";
       }
}

int main(int argc, char *argv [])
{
       const int SIZE = 7;
       int evenArray[SIZE] = { 0, 7, 8, 63, 81, 24, 369 };
       std::thread my_thread(printEven, evenArray, SIZE);

       if (my_thread.joinable())
             my_thread.join();

       return 0;
} 
If there is need to pass parameters by reference, they should be wrapped into std::ref or std::cref.
Each thread has its own id. std::thread class  has get_id() member function, which returns unique id for the thread. To get the id of the current thread std::this_thread::get_id() can be used.  Thread id's are stored in lightweight std::thread::id class.

Useful features 

There are several useful functions in this_thread namespace. yield() function is used to notify the thread scheduler that it should select another thread to run. Using yield() is rarely necessary. sleep_until() expects an instance of std::chrono::time_point class and stops execution till system clock reaches this point. sleep_for() expects an instance std::chrono:duration class and blocks thread execution for that certain time.
#include <iostream>
#include <thread>

const int MAX_DELAY = 10;
const int THREADS_NUMBER = 5;

void raceThread()
{
       int delay = rand() % MAX_DELAY;
       std::thread::id currentID = std::this_thread::get_id();
       std::cout << "Thread with id " << currentID << " started. Delay " << delay << " seconds\n";
       std::this_thread::sleep_for(std::chrono::seconds(delay));
       std::cout << "Finishing thread...\n";
}

int main(int argc, char *argv [])
{
      
       std::auto_ptr<std::thread> ThreadsArray[THREADS_NUMBER];
       for (int i = 0; i < THREADS_NUMBER; ++i)
       {
             ThreadsArray[i].reset(new std::thread(raceThread));
       }

       for (int i = 0; i < THREADS_NUMBER; ++i)
       {
             ThreadsArray[i]->join();
       }
       return 0;
} 
Please try to tun the listing above several times. As you can see the results are different every time. That's because each thread is printing information using stdout which is a shared resource. Because iutput is not synchronized between working threads, it's ownership could be easily "stolen" by another threads. The result of such unsynchronized access is undefined behaviour.
Mutex  is used to ensure that only one thread is working with the shared resource at the same time.  So lets talk a bit about how they are defined in C++ 11 standard. Class std::mutex is used for locking and unlocking shared resources.  Besides it, there are three different types of mutexes – std::timed_mutex – which difference from previous one only by having a possibility to lock some resource for a certain timeout, std::recursive_mutex – this kind of mutex allows a thread to recursivly lock a resource, by making additional calls to lock or try_lock The period of ownership ends when thread makes a matching number of calls to unlock. Attempting to recursively lock a regular mutex will cause a deadlock. std::recursive_timed_mutex– recursive mutex with timeout ability. Lets change previous example and make access to the shared resource managed as we want it to be.
#include <iostream>
#include <thread>
#include <mutex>

const int MAX_DELAY = 10;
const int THREADS_NUMBER = 5;

std::mutex g_mutex;

void raceThread()
{
       std::lock_guard<std::mutex> lock(g_mutex);
       int delay = rand() % 10;
       std::thread::id currerntId = std::this_thread::get_id();
       std::cout << "Thread with id " << currerntId << " started. Delay " << delay << " seconds\n";
       std::this_thread::sleep_for(std::chrono::seconds(delay));
       std::cout << "Finishing thread...\n";
} 
The main function is the same as in previous listing so we've just skipped it here.  
std::lock_guard  is a RAII-style wrapper for more convenient work with std::mutex, it locks on creation and releases the lock after destruction.Because of potential deadlock problems, it is strongly recommended to avoid locking several mutexes at one time. Unfortunately, sometimes more than one resource should be locked (e.g. you  are working with two distinct data items and each of them is protected by its own mutex). In that case std::unique_lock can be helpful. It has a similar functionality to lock_guard but besides that it has a mechanism for avoiding deadlocks in case each resource should be locked by different mutex. An additional argument is passed to its constructor which defines locking strategy. By passing std::defer_lock you can tell std::unique_lock not to acquire ownership of the mutex immediately, and then safely lock both resources by std::lock:
struct Account
{
    Account(float sum) : m_fValue(sum) {}

    float m_fValue;
    std::mutex m_Mutex;
};

void foo(Account &parentAcc, Account &childAcc, float amount)
{
    // don't actually take the locks yet
    std::unique_lock<std::mutex> lock1(childAcc.m_Mutex, std::defer_lock);
    std::unique_lock<std::mutex> lock2(parentAcc.m_Mutex, std::defer_lock);

    // lock both unique_locks without deadlock
    std::lock(lock1, lock2);

    childAcc.m_fValue -= amount;
    parentAcc.m_fValue += amount;
} 
Please keep in mind that behaviour of a program is undefined in case a mutex is destroyed while still owned by some thread. A thread owns the mutex since the time the mutex is successfully locked till  the call of unlock method. So you have to be sure that mutex is not locking anything before destroying it.
Condition variables are another synchronization primitive which is provided by the C++11 standard. It is always associated with mutexes and allows threads to communicate with each other. Also it allows multiple threads to wait for some certain condition to become true or for a specified timeout to expire .  To work with a condition variable thread must acquire a lock first and pass the lock to subsequent condition variable operations. This lock will be implicitly released when the thread starts to wait on the condition variable and when the thread is being awakened because of conditional variable wakeup. std::condition_variable can block threads until notification is received or timeout is expired and sometimes when spurious wakeup occurs.
Besides std::condition_variable, there is a std::condition_variable_any. Thing is std::condition_variable can  work only with std::unique_lock<std::mutex>, where std::condition_variable_any can work with any (possibly used-defined) type which meets the BasicLockable requirments. Besides that, functionality of both synchronization primitives is almost identical.

C++ threads vs. Boost threads

One of the most common used open-source libraries for C++ is Boost. This library also provides multithreading support, so let's compare C++11 threads with boost threads. C++11 threads are supported at core language level while boost threads are basically wrappers around platform-dependent thread implementations.
First thing which you may notice by now is that a C++ thread will not have its detach() method implicitly called when it is being destructed.  You have to manually call join() or detach() before destructing  a thread. If you don’t,  the program will call std::terminate() and it will abort your application.  In Boost there is no such problem because detach() is called automatically on thread destruction.  Boost threads support cancelation, but C++ standard threads don’t.
std::thread allows move-only types, such as  std::unique_ptr to be passed as arguments. Because of the use of boost::bind internally,  Boost requires copyable arguments.  To clear things up a little bit let's see an example:
void foo(std::unique_ptr<int>);
std::thread my_thread(foo, std::unique_ptr<int>(new int(228)));  
std::unique_ptr cannot be copied so this example won't work.  It works when using std::thread, but it won’t be working with boost::thread due to its semantics requiring copyable arguments. Migration from Boost threads to C++11 should not cause a lot of trouble. Of course, some of the Boost features are not supported in C++ standard yet. For example, there is no analogue of boost::shared_mutex, which allows multiple-reader and single-writer locking. It is planned to include such kind of mutex in future C++ standard (from C++14), but currently it is not implemented.Also using threads from the language standard gives few advantages comparing to usage of Boost threads.  No matter how convenient Boost is it’s still an additional library, which means you need to rely on Boost libraries during compilation and linkage. Sometimes these libraries need to be updated. When using C++ threads, which are already included into the standard you don’t  need to worry about such things.

How it works? 


We’ve seen the main features of the C++ threading model. Let’s try to use these features while solving some practical task. Producer-consumer is a well-known multithreading problem. It describes two threads working with a single queue. One of them produces something and puts it into queue and another is consuming and removes items from the queue. The main problem is to ensure that the producer won't try to add into the queue when it's full and consumer won't try to consume from the queue when it's empty.
Lets add some general variables which will be used by both threads.
bool g_isReady = false;
std::mutex g_Mutex;
std::condition_variable g_ConditionVar;

std::queue<int> g_MessagesQueue;

const int QUEUE_SIZE = 10;
const int SLEEP_TIME = 500; 
We declared a boolean flag to indicate ready status of queue, the global message queue to put data in it, and mutex for locking shared data while working with it. Also we a declare condition variable for communication between threads. To prevent hardcoding we declare constants for queue size and sleep time. Producer function will look like this:
void Producer()
{
       std::cout << "\\\\\ Producer is ready ///// \n";
      
       for( int i = 0; ; ++i )
       {
             std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_TIME));                   
             std::unique_lock<std::mutex> lock(g_Mutex);  
            
                            
             if ( g_MessagesQueue.size() < QUEUE_SIZE )
             {
                    std::cout << "Produced " << i << "\n";
                    g_MessagesQueue.push(i);    
             }
             else
             {
                    g_isReady = true; 
                    g_ConditionVar.notify_one();             
             }
       }
} 
Here we have a an infinite loop which pushes data to queue . On each iteration it has small delay and total number of queue elements is compared with queue size. If it less  than the maximum queue size data  is being pushed to the queue, otherwise the status flag is given the appropriate status and the condition variable is used to send notification to the consumer thread.  Lock is acquired when working with shared resources.     Now in the consumer function we have one more infinite loop. It uses the status flag which prevents it from spurious wakeups and waits for notification form the producer thread. After a notification has been received the consumer pops data from the queue and sets the status flag to false:
void Consumer()
{
       std::cout << "\\\\\ Consumer is ready ///// \n";

       while (true)
       {
             std::unique_lock<std::mutex> lock(g_Mutex);

             while (!g_isReady)                            
             {
                    g_ConditionVar.wait(lock);           
             }

             while (!g_MessagesQueue.empty())            
             {
                    std::cout << "Consumed -" << g_MessagesQueue.front() << "\n";
                    g_MessagesQueue.pop();
             }

             g_isReady = false;                           
       }

} 
In the main routine there are only threads creation and waiting them to finish.
int main(int argc, char *argv[])
{
       std::thread producer(Producer);
       std::thread consumer(Consumer);

       if (producer.joinable())
             producer.join();

       if (consumer.joinable())
             consumer.join();
}  
As you can see the C++11 standard presents some powerful features for multithreading support. These features are expected to be enhanced in the next C++ standards.

2 comments:

Note: Only a member of this blog may post a comment.