Вопрос

I'm playing with boost library and C++. I want to create a multithreaded program that contains a producer, conumer, and a stack. The procuder fills the stack, the consumer remove items (int) from the stack. everything work (pop, push, mutex) But when i call the pop/push winthin a thread, i don't get any effect

i made this simple code :

#include "stdafx.h"
#include <stack>
#include <iostream>
#include <algorithm>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/date_time.hpp> 
#include <boost/signals2/mutex.hpp>
#include <ctime>

using namespace std;

/ *
* this class reprents a stack which is proteced by mutex
* Pop and push are executed by one thread each time.
*/
class ProtectedStack{
private : 
stack<int> m_Stack;
boost::signals2::mutex m;

public : 
ProtectedStack(){
}
ProtectedStack(const ProtectedStack & p){

}
void push(int x){
    m.lock();
    m_Stack.push(x);
    m.unlock();
}

void pop(){
    m.lock();
    //return m_Stack.top();
    if(!m_Stack.empty())
        m_Stack.pop();
    m.unlock(); 
}
int size(){
    return m_Stack.size();
}
bool isEmpty(){
    return m_Stack.empty();
}
int top(){
    return m_Stack.top();
}
};

/*
*The producer is the class that fills the stack. It encapsulate the thread object 
*/

class Producer{
public:
Producer(int number ){
    //create thread here but don't start here
m_Number=number;


}
void fillStack (ProtectedStack& s ) {
    int object = 3; //random value
    s.push(object);
    //cout<<"push object\n";
}

void produce (ProtectedStack & s){
    //call fill within a thread 
    m_Thread = boost::thread(&Producer::fillStack,this, s);  
}

 private :
int m_Number;
boost::thread m_Thread;

};


/* The consumer will consume the products produced by the producer */ 

class Consumer {
private : 
int m_Number;
boost::thread m_Thread;
public:
Consumer(int n){
    m_Number = n;
}

void remove(ProtectedStack &s ) {

     if(s.isEmpty()){ // if the stack is empty sleep and wait for the producer      to fill the stack
        //cout<<"stack is empty\n";
        boost::posix_time::seconds workTime(1); 
        boost::this_thread::sleep(workTime);
     }
     else{
        s.pop(); //pop it
        //cout<<"pop object\n";

     }

}

void consume (ProtectedStack & s){
    //call remove within a thread 
    m_Thread = boost::thread(&Consumer::remove, this, s);  
}

};


int main(int argc, char* argv[])  
{  



ProtectedStack s;


    Producer p(0);
    p.produce(s);

    Producer p2(1);
    p2.produce(s);

    cout<<"size after production "<<s.size()<<endl;
    Consumer c(0);
    c.consume(s);
    Consumer c2(1);
    c2.consume(s);
    cout<<"size after consumption  "<<s.size()<<endl;

getchar();
return 0;  
}  

After i run that in VC++ 2010 / win7 i got : 0 0

Could you please help me understand why when i call fillStack function from the main i got an effect but when i call it from a thread nothing happens? Thank you

Это было полезно?

Решение

The major problem with your code is that your threads are not synchronized. Remember that by default threads execution isn't ordered and isn't sequenced, so consumer threads actually can be (and in your particular case are) finished before any producer thread produces any data.

To make sure consumers will be run after producers finished its work you need to use thread::join() function on producer threads, it will stop main thread execution until producers exit:

// Start producers
...

p.m_Thread.join();  // Wait p to complete
p2.m_Thread.join(); // Wait p2 to complete

// Start consumers
...

This will do the trick, but probably this is not good for typical producer-consumer use case.

To achieve more useful case you need to fix consumer function. Your consumer function actually doesn't wait for produced data, it will just exit if stack is empty and never consume any data if no data were produced yet.

It shall be like this:

void remove(ProtectedStack &s)
{
   // Place your actual exit condition here,
   // e.g. count of consumed elements or some event
   // raised by producers meaning no more data available etc.
   // For testing/educational purpose it can be just while(true)
   while(!_some_exit_condition_)
   {
      if(s.isEmpty())
      {
          // Second sleeping is too big, use milliseconds instead
          boost::posix_time::milliseconds workTime(1); 
          boost::this_thread::sleep(workTime);               
      }               
      else
      {
         s.pop();
      }
   }
} 

Another problem is wrong thread constructor usage:

m_Thread = boost::thread(&Producer::fillStack, this, s);  

Quote from Boost.Thread documentation:

Thread Constructor with arguments

template <class F,class A1,class A2,...> thread(F f,A1 a1,A2 a2,...);

Preconditions: F and each An must by copyable or movable.

Effects: As if thread(boost::bind(f,a1,a2,...)). Consequently, f and each an are copied into internal storage for access by the new thread.

This means that each your thread receives its own copy of s and all modifications aren't applied to s but to local thread copies. It's the same case when you pass object to function argument by value. You need to pass s object by reference instead - using boost::ref:

void produce(ProtectedStack& s)
{
   m_Thread = boost::thread(&Producer::fillStack, this, boost::ref(s));
}

void consume(ProtectedStack& s)
{
   m_Thread = boost::thread(&Consumer::remove, this, boost::ref(s));
}  

Another issues is about your mutex usage. It's not the best possible.

  1. Why do you use mutex from Signals2 library? Just use boost::mutex from Boost.Thread and remove uneeded dependency to Signals2 library.

  2. Use RAII wrapper boost::lock_guard instead of direct lock/unlock calls.

  3. As other people mentioned, you shall protect with lock all members of ProtectedStack.

Sample:

boost::mutex m;

void push(int x)
{ 
   boost::lock_guard<boost::mutex> lock(m);
   m_Stack.push(x);
} 

void pop()
{
   boost::lock_guard<boost::mutex> lock(m);
   if(!m_Stack.empty()) m_Stack.pop();
}              

int size()
{
   boost::lock_guard<boost::mutex> lock(m);
   return m_Stack.size();
}

bool isEmpty()
{
   boost::lock_guard<boost::mutex> lock(m);
   return m_Stack.empty();
}

int top()
{
   boost::lock_guard<boost::mutex> lock(m);
   return m_Stack.top();
}

Другие советы

Your example code suffers from a couple synchronization issues as noted by others:

  • Missing locks on calls to some of the members of ProtectedStack.
  • Main thread could exit without allowing worker threads to join.
  • The producer and consumer do not loop as you would expect. Producers should always (when they can) be producing, and consumers should keep consuming as new elements are pushed onto the stack.
  • cout's on the main thread may very well be performed before the producers or consumers have had a chance to work yet.

I would recommend looking at using a condition variable for synchronization between your producers and consumers. Take a look at the producer/consumer example here: http://en.cppreference.com/w/cpp/thread/condition_variable It is a rather new feature in the standard library as of C++11 and supported as of VS2012. Before VS2012, you would either need boost or to use Win32 calls.

Using a condition variable to tackle a producer/consumer problem is nice because it almost enforces the use of a mutex to lock shared data and it provides a signaling mechanism to let consumers know something is ready to be consumed so they don't have so spin (which is always a trade off between the responsiveness of the consumer and CPU usage polling the queue). It also does so being atomic itself which prevents the possibility of threads missing a signal that there is something to consume as explained here: https://en.wikipedia.org/wiki/Sleeping_barber_problem

To give a brief run-down of how a condition variable takes care of this...

  • A producer does all time consuming activities on its thread without the owning the mutex.
  • The producer locks the mutex, adds the item it produced to a global data structure (probably a queue of some sort), lets go of the mutex and signals a single consumer to go -- in that order.
  • A consumer that is waiting on the condition variable re-acquires the mutex automatically, removes the item out of the queue and does some processing on it. During this time, the producer is already working on producing a new item but has to wait until the consumer is done before it can queue the item up.

This would have the following impact on your code:

  • No more need for ProtectedStack, a normal stack/queue data structure will do.
  • No need for boost if you are using a new enough compiler - removing build dependencies is always a nice thing.

I get the feeling that threading is rather new to you so I can only offer the advice to look at how others have solved synchronization issues as it is very difficult to wrap your mind around. Confusion about what is going on in an environment with multiple threads and shared data typically leads to issues like deadlocks down the road.

You're not checking that the producing thread has executed before you try to consume. You're also not locking around size/empty/top... that's not safe if the container's being updated.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top