Showing posts with label multithreading. Show all posts
Showing posts with label multithreading. Show all posts

12/18/2013

Solving the Santa Claus problem using Ocaml

Santa’s problem

Not much time left till Xmas.. Haven’t you ever wondered how Santa is doing all his business with those elves and reindeer? And why?
Even if you don’t care, there are people who do. At least one. John Trono of St. Michael’s College in Vermont was so much worried about how Santa can handle all that stuff, that he decided to emulate his work and wrote quite a simplified scenario of Santa’s life:

“Santa Claus sleeps in his shop at the North Pole and can only be awakened by either (1) all nine reindeer being back from their vacation in the South Pacific, or (2) some of the elves having difficulty making toys; to allow Santa to get some sleep, the elves can only wake him when three of them have problems. When three elves are having their problems solved, any other elves wishing to visit Santa must wait for those elves to return. If Santa wakes up to find three elves waiting at his shop’s door, along with the last reindeer having come back from the tropics, Santa has decided that the elves can wait until after Christmas, because it is more important to get his sleigh ready. (It is assumed that the reindeer do not want to leave the tropics, and therefore they stay there until the last possible moment.) The last reindeer to arrive must get Santa while the others wait in a warming hut before being harnessed to the sleigh.”

Besides given scenario, lets make some additional specifications:
  • After the ninth reindeer arrives, Santa must invoke prepare_sleigh, and then all nine reindeer must invoke get_hitched
  • After the third elf arrives, Santa must invoke help_elves. Concurrently, all three elves should invoke get_help.
  • All three elves must invoke get_help before any additional elves enter

Not very complicated, as you can see, but till the moment you try to implement it. To make the solution not that boring, I’ve decided to implement it in Ocaml, but not in any enterprise platform like .NET or Java. At the moment I’m writing this post, I haven’t managed to find an Ocaml solution on the internet. Ocaml is an ML-derived functional language with static typing, pattern matching and automatic garbage collection. It has fairly big standard library and nice native code compiler for a number of platforms. However, I’ve chosen it just to make solving Santa’s problem a bit more challenging and interesting, another words, just for fun. I’ll try to comment lines of code looking weird so don’t panic.

Pseudo-code solution

First, let’s solve Santa’s problem using pseudo-code. We’ll use elves and reindeer counters protected by a mutex, a semaphore for Santa (he waits until either an elf or reindeer signals him), a semaphore for reindeers (they wait until Santa signals them to get hitched), semaphore for elves (they wait until Santa helps them) and a mutex to prevent additional elves to enter while three elves are being helped.

Santa’s code is the easiest one (it runs in an infinite loop)
santa_sem.wait()
mutex.wait()
if reindeer == 9
{
prepare_sleigh()
reindeer_sem.signal(9)
}
else if elves == 3
{
help_elves()
elf_sem.signal(3)
}
mutex.signal()

Santa checks two conditions and either deals with elves or with reindeer. If there’re nine reindeer waiting, Santa prepares sleigh and signals reindeer semaphore nine times, allowing the reindeer to invoke get_hitched. If there are elves waiting, Santa invokes help_elves.

Code for reindeer is also not very complicated:
mutex.wait()
reindeer += 1
if reindeer == 9
{
santa_sem.signal()
}
mutex.signal()

reindeer_sem.wait()
get_hitched()

The ninth reindeer signals Santa and then joins the others waiting for reindeer_sem. When it’s signalled, they invoke get_hitched.

The code for elves is quite similar, but it uses another turnstile for three-elves-stuff:

elf_mutex.wait()
mutex.wait()
elves += 1
if elves == 3
{
santa_sem.signal()
}
else
{
elf_mutex.signal()
}
mutex.signal()

elf_sem.wait()
get_help()

mutex.wait()
elves -= 1
if elves == 0
{
elf_mutex.signal()
}
mutex.signal()


The first two elves release elf_mutex at the same time they release the mutex, but the last elf holds elf_mutex, preventing other elves from entering until all three elves have invoked get_help. The last elf to leave releases elf_mutex, allowing the next batch of elves to enter.

The Ocaml part

Now the time come to have some fun with Ocaml. First thing to mention is that Ocaml does not have any semaphore built-in class-or-something (it’s because of it’s rich standard library). But it’s not a big issue since it has Mutex and Condition classes (yeah, Ocaml is an Objective Caml and it do has classes) in the Threads library and we can use them to write our own semaphore. To make a semaphore more or less serious, let’s write it in the separate module.

module Semaphore = struct
  class semaphore initial_count initial_name =
    object (self)
      val mutable count = initial_count
      val name = initial_name
      val sync = Mutex.create()
      val cond = Condition.create()
          
      method inc n = count <- count + n
      method dec n = count <- count - n

      method signal ?(n=1) () =
        Mutex.lock sync;
        self#inc n;
        for i = 1 to n do
          Condition.signal cond
        done;
        Mutex.unlock sync

      method wait =
        Mutex.lock sync;
        while count = 0 do
          Condition.wait cond sync
        done;
        self#dec 1;
        Mutex.unlock sync
    end
end;;


My semaphore has internal mutable (yeah, Ocaml is not a pure functional language like Haskell is) field count (used for a “gate width“ for threads to enter semaphore simultaneously), internal name (used for logging, when I was looking for a deadlock in my code some time ago), one mutex and one condition variable. It has two primary methods: signal with optional parameter N and wait which are just usual increment-decrement/lock-unlock methods of semaphore: signal increments internal counter and if it’s positive, it allows threads to enter critical section which given semaphore guards and wait decrements counter and if it’s zero, calling thread is blocked until counter becomes positive.

If an Ocaml program is split into modules then you can expect a big fun trying to compile that program. First, you have to generate interfaces for that module. Secondly, you have to compile both interface and the module itself:

ocamlc -thread unix.cma threads.cma -i semaphore.ml > semaphore.mli
ocamlc -thread unix.cma threads.cma -c semaphore.mli
ocamlc -thread unix.cma threads.cma -c semaphore.ml


To enable multithreading support (in terms of Posix compatible threads) you have to compile your code with -thread key and include unix and threads compiled modules.

Now let’s write main program. Since we’re dealing with logging and multithreading, I’ve written a function-helper, which uses our Semaphore class to synchronize printing to stdout:

let stdout_sem = new Semaphore.semaphore 1 "stdout_sem";;
let puts s =
  stdout_sem#wait;
  Printf.printf "%s\n" s;
  flush stdout;
  stdout_sem#signal ();;


Next, I’m using some kind of transport structure for Santa, reindeer and elves functions (shared between them and protected by semaphore). This structure (a record in terms of Ocaml) contains counters and semaphores as discussed earlier:

type santa_counters = { mutable elves : int;
                        mutable reindeer : int;
                        santa_sem : Semaphore.semaphore;
                        reindeer_sem : Semaphore.semaphore;
                        elf_sem : Semaphore.semaphore;
                        elf_mutex : Semaphore.semaphore;
                        mutex : Semaphore.semaphore };;


and a simple initializer:

let new_santa_counters () = { elves = 0;
                              reindeer = 0;
                              santa_sem = new 
Semaphore.semaphore 0 "santa_sem";
                              reindeer_sem = new 
Semaphore.semaphore 0 "reindeer_sem";
                              elf_sem = new 
Semaphore.semaphore 0 "elf_sem";
                              elf_mutex = new 
Semaphore.semaphore 1 "elf_mutex";
                              mutex = new 
Semaphore.semaphore 1 "mutex" };;


To make our example more realistic I’ve implemented functions prepare_sleigh and others to see what’s actually happens using my helper for synchronized printing:

let prepare_sleigh () = puts "Prepare sleigh";;
let help_elves () = puts "Help Elves";;
let get_hitched () = puts "Get Hitched";;
let get_help () = puts "Get Help";;

You might thought right now that braces () in the end of each function are kind of usual braces like in Java, C++ etc, but actually it’s an argument of type unit of each function. Please, refer to the tutorials for more details.

Let’s take a look on our pseudo-code solutions implemented in Ocaml:

let santa_role_func c =
  c.santa_sem#wait;
  c.mutex#wait;

    if c.reindeer = 9 then (
    prepare_sleigh ();
    c.reindeer_sem#signal ~n:9 ();
    c.reindeer <- 0;
   )
  else if c.elves = 3 then (
    help_elves ();
    c.elf_sem#signal ~n:3 ()
   );

  c.mutex#signal ();;


let reindeer_role_func (c, i) =
  let s = Printf.sprintf  
"Starting reindeer (%d)" i in
  puts s;
 
  c.mutex#wait;
  c.reindeer <- c.reindeer + 1;
  if c.reindeer = 9 then c.santa_sem#signal ();
  c.mutex#signal ();

  c.reindeer_sem#wait;
  get_hitched ();;


let elves_role_func (c, i) =
  let s = Printf.sprintf 
"Starting elf [%d]" i in
  puts s;
 
  c.elf_mutex#wait;
  c.mutex#wait;
  c.elves <- c.elves + 1;
  if c.elves = 3 then
    c.santa_sem#signal ()
  else
    c.elf_mutex#signal ();
  c.mutex#signal ();
 
  c.elf_sem#wait;
  get_help ();

  c.mutex#wait;
  c.elves <- c.elves - 1;
  if c.elves = 0 then c.elf_mutex#signal ();
  c.mutex#signal ();;


You can notice that santa_role_func accepts one parameter c (our transport structure), but two others accept two parameters. It’s because Santa’s role function is running in a loop and others are running just one time. Second parameter in elves and reindeer functions is an index of a thread in which they’re running (for debug and visualization purposes).

The last (except of compilation) step to implement is to make all this stuff work together:

let c = new_santa_counters () in
let santa_loop () =
  puts "Starting Santa loop";
  while true do
    santa_role_func c;
  done
in
let santa_array = [| Thread.create santa_loop () |]
and
reindeer_array = Array.init 9 
(fun i -> Thread.create reindeer_role_func (c, i))
and
elf_array = Array.init 20 
(fun i -> Thread.create elves_role_func (c, i))
in
Array.iter Thread.join 
(Array.concat [santa_array; reindeer_array; elf_array]);;


Code above creates three arrays of threads: santa_array (which always contains just one element), reindeer_array (always contains 9 reindeer threads) and elf_array (which contains 20 (fairly chosen) elves threads). After each thread is started, main program joins all of them using humble functional magic with Array.Iter.

What had happened on the North Pole

I’ve copied typical stdout from a santa_problem solution below (and the ocaml version for the clarification).

> ocaml -version

The OCaml toplevel, version 4.01.0

> ./build.sh
> ./santa_problem
Starting santa loop
Starting reindeer (4)
Starting reindeer (5)
Starting reindeer (6)
Starting reindeer (3)
Starting reindeer (7)
Starting reindeer (8)
Starting elf [0]
Starting reindeer (2)
Starting elf [1]
Starting elf [2]
Starting elf [3]
Starting elf [4]
Starting reindeer (1)
Starting elf [5]
Starting elf [6]
Starting elf [7]
Starting elf [8]
Starting elf [9]
Starting elf [10]
Starting elf [11]
Starting elf [12]
Starting reindeer (0)
Starting elf [13]
Starting elf [14]
Starting elf [15]
Starting elf [19]
Prepare sleigh
Starting elf [16]
Starting elf [18]
Get Hitched
Get Hitched
Get Hitched
Get Hitched
Get Hitched
Get Hitched
Get Hitched
Get Hitched
Get Hitched
Starting elf [17]
Help Elves
Get Help
Get Help
Get Help
Help Elves
Get Help
Get Help
Get Help
Help Elves
Get Help
Get Help
Get Help
Help Elves
Get Help
Get Help
Get Help
Help Elves
Get Help
Get Help
Get Help
Help Elves
Get Help
Get Help
Get Help
……

Merry Xmas

Santa’s problem is one of the classical synchronization problems and is worth looking into. A lot of solutions exist nowadays. For example, there is an interesting approach to solve it in Haskell using Software Transactional Memory (STM). Despite the fact Ocaml does not provide us with as cool features as STM, we can see that building parallel programs in Ocaml is easy fun! As far as I can see, the solution above is kind of first pure Ocaml solution of Santa’s problem.
You can download all code from the Santa's gist.

12/05/2013

C++ Multithreading Support


C++11 Multithreading overview 


The C++11 standard provides multithreading support, including the following features: thread creation and management, mutexes, atomic objects, thread local storage and condition variables.
Thread creation and management is provided by the std::thread class which is defined in <thread>. The class works with regular functions, lambdas (functions written directly in your source code) or functors (classes with overloaded () operator).  Any number of parameters can be passed to the thread function.  To work with the thread you should create an instance of std::thread class and pass it the working function.  Let’s see the example:
#include <iostream>
#include <thread>

void testFunc()
{
       std::cout << "Hello from the thread!";
}

int main(int argc, char *argv [])
{
       std::thread my_thread(testFunc);
       my_thread.join();
       return 0;
} 
In this example a reference to the trivial function is being passed to the constructor of the thread class. After thread creation we call join() to ensure that it has finished executing before program exit. Technically you should check whether the thread is joinable by calling its joinable() method first. If you don’t want to wait for thread execution finish, you could call the detach() member function to separate thread execution from the thread object. After calling detach() this thread object is no longer joinable. Any number of arguments can be passed to a thread:
#include <iostream>
#include <thread>

void printEven(int printArray[], int size)
{
       for (int i = 0; i < size; ++i)
       {
             if (printArray[i] % 2 == 0)
                    std::cout << "Even number " << i << " is - " << printArray[i] << "\n";
       }
}

int main(int argc, char *argv [])
{
       const int SIZE = 7;
       int evenArray[SIZE] = { 0, 7, 8, 63, 81, 24, 369 };
       std::thread my_thread(printEven, evenArray, SIZE);

       if (my_thread.joinable())
             my_thread.join();

       return 0;
} 
If there is need to pass parameters by reference, they should be wrapped into std::ref or std::cref.
Each thread has its own id. std::thread class  has get_id() member function, which returns unique id for the thread. To get the id of the current thread std::this_thread::get_id() can be used.  Thread id's are stored in lightweight std::thread::id class.

Useful features 

There are several useful functions in this_thread namespace. yield() function is used to notify the thread scheduler that it should select another thread to run. Using yield() is rarely necessary. sleep_until() expects an instance of std::chrono::time_point class and stops execution till system clock reaches this point. sleep_for() expects an instance std::chrono:duration class and blocks thread execution for that certain time.
#include <iostream>
#include <thread>

const int MAX_DELAY = 10;
const int THREADS_NUMBER = 5;

void raceThread()
{
       int delay = rand() % MAX_DELAY;
       std::thread::id currentID = std::this_thread::get_id();
       std::cout << "Thread with id " << currentID << " started. Delay " << delay << " seconds\n";
       std::this_thread::sleep_for(std::chrono::seconds(delay));
       std::cout << "Finishing thread...\n";
}

int main(int argc, char *argv [])
{
      
       std::auto_ptr<std::thread> ThreadsArray[THREADS_NUMBER];
       for (int i = 0; i < THREADS_NUMBER; ++i)
       {
             ThreadsArray[i].reset(new std::thread(raceThread));
       }

       for (int i = 0; i < THREADS_NUMBER; ++i)
       {
             ThreadsArray[i]->join();
       }
       return 0;
} 
Please try to tun the listing above several times. As you can see the results are different every time. That's because each thread is printing information using stdout which is a shared resource. Because iutput is not synchronized between working threads, it's ownership could be easily "stolen" by another threads. The result of such unsynchronized access is undefined behaviour.
Mutex  is used to ensure that only one thread is working with the shared resource at the same time.  So lets talk a bit about how they are defined in C++ 11 standard. Class std::mutex is used for locking and unlocking shared resources.  Besides it, there are three different types of mutexes – std::timed_mutex – which difference from previous one only by having a possibility to lock some resource for a certain timeout, std::recursive_mutex – this kind of mutex allows a thread to recursivly lock a resource, by making additional calls to lock or try_lock The period of ownership ends when thread makes a matching number of calls to unlock. Attempting to recursively lock a regular mutex will cause a deadlock. std::recursive_timed_mutex– recursive mutex with timeout ability. Lets change previous example and make access to the shared resource managed as we want it to be.
#include <iostream>
#include <thread>
#include <mutex>

const int MAX_DELAY = 10;
const int THREADS_NUMBER = 5;

std::mutex g_mutex;

void raceThread()
{
       std::lock_guard<std::mutex> lock(g_mutex);
       int delay = rand() % 10;
       std::thread::id currerntId = std::this_thread::get_id();
       std::cout << "Thread with id " << currerntId << " started. Delay " << delay << " seconds\n";
       std::this_thread::sleep_for(std::chrono::seconds(delay));
       std::cout << "Finishing thread...\n";
} 
The main function is the same as in previous listing so we've just skipped it here.  
std::lock_guard  is a RAII-style wrapper for more convenient work with std::mutex, it locks on creation and releases the lock after destruction.Because of potential deadlock problems, it is strongly recommended to avoid locking several mutexes at one time. Unfortunately, sometimes more than one resource should be locked (e.g. you  are working with two distinct data items and each of them is protected by its own mutex). In that case std::unique_lock can be helpful. It has a similar functionality to lock_guard but besides that it has a mechanism for avoiding deadlocks in case each resource should be locked by different mutex. An additional argument is passed to its constructor which defines locking strategy. By passing std::defer_lock you can tell std::unique_lock not to acquire ownership of the mutex immediately, and then safely lock both resources by std::lock:
struct Account
{
    Account(float sum) : m_fValue(sum) {}

    float m_fValue;
    std::mutex m_Mutex;
};

void foo(Account &parentAcc, Account &childAcc, float amount)
{
    // don't actually take the locks yet
    std::unique_lock<std::mutex> lock1(childAcc.m_Mutex, std::defer_lock);
    std::unique_lock<std::mutex> lock2(parentAcc.m_Mutex, std::defer_lock);

    // lock both unique_locks without deadlock
    std::lock(lock1, lock2);

    childAcc.m_fValue -= amount;
    parentAcc.m_fValue += amount;
} 
Please keep in mind that behaviour of a program is undefined in case a mutex is destroyed while still owned by some thread. A thread owns the mutex since the time the mutex is successfully locked till  the call of unlock method. So you have to be sure that mutex is not locking anything before destroying it.
Condition variables are another synchronization primitive which is provided by the C++11 standard. It is always associated with mutexes and allows threads to communicate with each other. Also it allows multiple threads to wait for some certain condition to become true or for a specified timeout to expire .  To work with a condition variable thread must acquire a lock first and pass the lock to subsequent condition variable operations. This lock will be implicitly released when the thread starts to wait on the condition variable and when the thread is being awakened because of conditional variable wakeup. std::condition_variable can block threads until notification is received or timeout is expired and sometimes when spurious wakeup occurs.
Besides std::condition_variable, there is a std::condition_variable_any. Thing is std::condition_variable can  work only with std::unique_lock<std::mutex>, where std::condition_variable_any can work with any (possibly used-defined) type which meets the BasicLockable requirments. Besides that, functionality of both synchronization primitives is almost identical.

C++ threads vs. Boost threads

One of the most common used open-source libraries for C++ is Boost. This library also provides multithreading support, so let's compare C++11 threads with boost threads. C++11 threads are supported at core language level while boost threads are basically wrappers around platform-dependent thread implementations.
First thing which you may notice by now is that a C++ thread will not have its detach() method implicitly called when it is being destructed.  You have to manually call join() or detach() before destructing  a thread. If you don’t,  the program will call std::terminate() and it will abort your application.  In Boost there is no such problem because detach() is called automatically on thread destruction.  Boost threads support cancelation, but C++ standard threads don’t.
std::thread allows move-only types, such as  std::unique_ptr to be passed as arguments. Because of the use of boost::bind internally,  Boost requires copyable arguments.  To clear things up a little bit let's see an example:
void foo(std::unique_ptr<int>);
std::thread my_thread(foo, std::unique_ptr<int>(new int(228)));  
std::unique_ptr cannot be copied so this example won't work.  It works when using std::thread, but it won’t be working with boost::thread due to its semantics requiring copyable arguments. Migration from Boost threads to C++11 should not cause a lot of trouble. Of course, some of the Boost features are not supported in C++ standard yet. For example, there is no analogue of boost::shared_mutex, which allows multiple-reader and single-writer locking. It is planned to include such kind of mutex in future C++ standard (from C++14), but currently it is not implemented.Also using threads from the language standard gives few advantages comparing to usage of Boost threads.  No matter how convenient Boost is it’s still an additional library, which means you need to rely on Boost libraries during compilation and linkage. Sometimes these libraries need to be updated. When using C++ threads, which are already included into the standard you don’t  need to worry about such things.

How it works? 


We’ve seen the main features of the C++ threading model. Let’s try to use these features while solving some practical task. Producer-consumer is a well-known multithreading problem. It describes two threads working with a single queue. One of them produces something and puts it into queue and another is consuming and removes items from the queue. The main problem is to ensure that the producer won't try to add into the queue when it's full and consumer won't try to consume from the queue when it's empty.
Lets add some general variables which will be used by both threads.
bool g_isReady = false;
std::mutex g_Mutex;
std::condition_variable g_ConditionVar;

std::queue<int> g_MessagesQueue;

const int QUEUE_SIZE = 10;
const int SLEEP_TIME = 500; 
We declared a boolean flag to indicate ready status of queue, the global message queue to put data in it, and mutex for locking shared data while working with it. Also we a declare condition variable for communication between threads. To prevent hardcoding we declare constants for queue size and sleep time. Producer function will look like this:
void Producer()
{
       std::cout << "\\\\\ Producer is ready ///// \n";
      
       for( int i = 0; ; ++i )
       {
             std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_TIME));                   
             std::unique_lock<std::mutex> lock(g_Mutex);  
            
                            
             if ( g_MessagesQueue.size() < QUEUE_SIZE )
             {
                    std::cout << "Produced " << i << "\n";
                    g_MessagesQueue.push(i);    
             }
             else
             {
                    g_isReady = true; 
                    g_ConditionVar.notify_one();             
             }
       }
} 
Here we have a an infinite loop which pushes data to queue . On each iteration it has small delay and total number of queue elements is compared with queue size. If it less  than the maximum queue size data  is being pushed to the queue, otherwise the status flag is given the appropriate status and the condition variable is used to send notification to the consumer thread.  Lock is acquired when working with shared resources.     Now in the consumer function we have one more infinite loop. It uses the status flag which prevents it from spurious wakeups and waits for notification form the producer thread. After a notification has been received the consumer pops data from the queue and sets the status flag to false:
void Consumer()
{
       std::cout << "\\\\\ Consumer is ready ///// \n";

       while (true)
       {
             std::unique_lock<std::mutex> lock(g_Mutex);

             while (!g_isReady)                            
             {
                    g_ConditionVar.wait(lock);           
             }

             while (!g_MessagesQueue.empty())            
             {
                    std::cout << "Consumed -" << g_MessagesQueue.front() << "\n";
                    g_MessagesQueue.pop();
             }

             g_isReady = false;                           
       }

} 
In the main routine there are only threads creation and waiting them to finish.
int main(int argc, char *argv[])
{
       std::thread producer(Producer);
       std::thread consumer(Consumer);

       if (producer.joinable())
             producer.join();

       if (consumer.joinable())
             consumer.join();
}  
As you can see the C++11 standard presents some powerful features for multithreading support. These features are expected to be enhanced in the next C++ standards.