您的位置: 网界网 > 网络学院-技术开发 > 正文

C++11线程、锁和条件变量(1)

2014年06月19日 20:41:58 | 作者:佚名 | 来源:51CTO | 查看本文手机版

摘要:C++11标准使得C++开发人员能够以一种标准的和平台独立的方式来编写多线程代码。本文一一讲述了标准所支持的线程和同步机制。 头文件提供了名为thread的类(另外还包含了一些辅助类或方法),该类代表了一个执行线程。头文件 提...

标签
C++11C++条件变量

std::thread类代表了一个可执行的线程,它来自头文件。与其它创建线程的API(比如 Windows API中的CreateThread)不同的是, 它可以使用普通函数、lambda函数以及仿函数(实现了operator()函数的类)。另外,它还允许向线程函数传递任意数量的参数。

  1. #include  void func()  
  2. // do some work } int main()  
  3. {  
  4.    std::thread t(func);  
  5.    t.join(); return 0;  

在上面的例子中,t是一个线程对象,函数func()运行于该线程之中。调用join函数后,该调用线程(本例中指的就是主线程)就会在join进来进行执行的线程t结束执行之前,一直处于阻塞状态。如果该线程函数执行结束后返回了一个值,该值也将被忽略。不过,该函数可以接受任意数量的参数。

  1. void func(int i, double d, const std::string& s)  
  2. {  
  3.     std::cout << i << ", " << d << ", " << s << std::endl;  
  4. int main()  
  5. {  
  6.    std::thread t(func, 1, 12.50, "sample");  
  7.    t.join(); return 0;  

尽管我们可以向线程函数传递任意数量的参数,但是,所有的参数都是按值传递的。如果需要将参数按引用进行传递,那么就一定要象下例所示一样,把该参数封装到 std::ref或者std::cref之中。

  1. void func(int& a)  
  2. {  
  3.    a ;  
  4. int main()  
  5. int a = 42;  
  6.    std::thread t(func, std::ref(a));  
  7.    t.join();  
  8.    
  9.    std::cout << a << std::endl; return 0;  

上面程序打印结果为43,但要不是将a封装到std::ref之中的话,输出的将是42。

除join方法之外,这个线程类还提供了另外几个方法:

  • swap: 将两个线程对象的底层句柄进行交换
  • detatch: 允许执行该方法的线程独立于本线程对象的执行而继续执行。脱离后的线程就再也不能执行join了(你不能等待到它执行结束了)
  1. "font-family:'Courier New', Arial;font-size:9pt;line-height:1.5;">int"font-family:'Courier New', Arial;font-size:9pt;line-height:1.5;"> main() {  
  2.     std::thread t(funct);  
  3.     t.detach(); return 0;  

有一点非常重要,值得注意:线程函数中要是抛出了异常的话,使用通常的try-catch方式是捕获不到该异常的。换句话说,下面这种做法行不通:

  1. try {  
  2.     std::thread t1(func);  
  3.     std::thread t2(func);  
  4.    
  5.     t1.join();  
  6.     t2.join();  
  7. catch(const std::exception& ex)  
  8. {  
  9.     std::cout << ex.what() << std::endl;  

要在线程间传递异常,你可以先在线程函数中捕获它们,然后再将它们保存到一个合适的地方,随后再让另外一个线程从这个地方取得这些异常。

  1. std::vector  g_exceptions; void throw_function()  
  2. throw std::exception("something wrong happened");  
  3. void func()  
  4. try {  
  5.       throw_function();  
  6.    } catch(...)  
  7.    {  
  8.       std::lock_guard lock(g_mutex);  
  9.       g_exceptions.push_back(std::current_exception());  
  10.    }  
  11. int main()  
  12. {  
  13.    g_exceptions.clear();  
  14.  
  15.    std::thread t(func);  
  16.    t.join(); for(auto& e : g_exceptions)  
  17.    { try { if(e != nullptr)  
  18.          {  
  19.             std::rethrow_exception(e);  
  20.          }  
  21.       } catch(const std::exception& e)  
  22.       {  
  23.          std::cout << e.what() << std::endl;  
  24.       }  
  25.    } return 0;  

要获得更多关于捕获并传递异常的知识,你可以阅读在主线程中处理工作线程抛出的C 异常以及怎样才能在线程间传递异常?。

在深入讨论之前还有一点值得注意,头文件里还在命名空间std::this_thread中提供了一些辅助函数:

  • get_id: 返回胆怯线程的id
  • yield: 让调度器先运行其它的线程,这在忙于等待状态时很有用
  • sleep_for: 将当前线程置于阻塞状态,时间不少于参数所指定的时间段
  • sleep_util: 在指定的时刻来临前,一直将当前的线程置于阻塞状态

在上一个例子中,我需要对g_exceptions这个vector进行同步访问,以确保同一个时刻只能有一个线程向其中压入新元素。为了实现同步,我使用了一个互斥量,并在该互斥量上进行了锁定。互斥量是一个核心的同步原语,C 11的头文件中包含了四种不同的互斥量。

  • mutex: 提供了核心的lock()函数和unlock()函数,以及非阻塞式的try_lock()方法,该方法在互斥量不可用时会立即返回。
  • recursive_mutex: 运行在同一线程中,多次获得同一个互斥量。
  • timed_mutex: 同第一条中的mutex类似,但它还带来了另外两个方法try_lock_for()和try_lock_until(),分别用于在某个时间段内或在某个时刻到来之前获得该互斥量。
  • recursive_timed_mutex: 结合了第二条的timed_mutex和第三条的recusive_mutex。

以下所列就是一个使用std::mutex(注意其中get_id()和sleep_for()这两个前文所述的辅助函数的用法)的例子。

  1. #include   
  2. #include   
  3. #include   
  4. #include   
  5.    
  6. std::mutex g_lock; void func()  
  7. {  
  8.     g_lock.lock();  
  9.    
  10.     std::cout << "entered thread " << std::this_thread::get_id() << std::endl;  
  11.     std::this_thread::sleep_for(std::chrono::seconds(rand() % 10));  
  12.     std::cout << "leaving thread " << std::this_thread::get_id() << std::endl;  
  13.    
  14.     g_lock.unlock();  
  15. int main()  
  16. {  
  17.     srand((unsigned int)time(0));  
  18.    
  19.     std::thread t1(func);  
  20.     std::thread t2(func);  
  21.     std::thread t3(func);  
  22.    
  23.     t1.join();  
  24.     t2.join();  
  25.     t3.join(); return 0;  

其输出将类似如下所示:

  1. entered thread 10144 leaving thread 10144 entered thread 4188 leaving thread 4188 entered thread 3424 leaving thread 3424  

lock()和unlock()这两个方法顾名思义,头一个方法用来对互斥量进行加锁,如果互斥量不可得便会处于阻塞状态;第二个方法用来对互斥量进行解锁。

接下来的这个例子演示的是一个简单的线程安全的容器(内部使用的是std::vector)。这个容器具有添加单个元素的add()方法以及添加一批元素的addrange()方法,addrange()方法内只是简单的调用了add()方法。 

  1. template <typename T> class container   
  2. {  
  3.     std::mutex _lock;  
  4.     std::vector _elements; publicvoid add(T element)   
  5.     {  
  6.         _lock.lock();  
  7.         _elements.push_back(element);  
  8.         _lock.unlock();  
  9.     } void addrange(int num, ...)  
  10.     {  
  11.         va_list arguments;  
  12.    
  13.         va_start(arguments, num); for (int i = 0; i < num; i )  
  14.         {  
  15.             _lock.lock();  
  16.             add(va_arg(arguments, T));  
  17.             _lock.unlock();  
  18.         }  
  19.    
  20.         va_end(arguments);   
  21.     } void dump()  
  22.     {  
  23.         _lock.lock(); for(auto e : _elements)  
  24.             std::cout << e << std::endl;  
  25.         _lock.unlock();  
  26.     }  
  27. }; void func(container<int>& cont)  
  28. {  
  29.     cont.addrange(3, rand(), rand(), rand());  
  30. int main()  
  31. {  
  32.     srand((unsigned int)time(0));  
  33.    
  34.     container<int> cont;  
  35.    
  36.     std::thread t1(func, std::ref(cont));  
  37.     std::thread t2(func, std::ref(cont));  
  38.     std::thread t3(func, std::ref(cont));  
  39.    
  40.     t1.join();  
  41.     t2.join();  
  42.     t3.join();  
  43.    
  44.     cont.dump(); return 0;  

这个程序执行起来会进入死锁状态。其原因在于,该容器多次尝试获取同一个互斥量而之前却并没有释放该互斥量,这么做是行不通的。这正是std::recursive_mutex的用武之地,它允许同一个线程多次获得同一个互斥量,可重复获得的最大次数并未具体说明,但一旦查过一定次数,再对lock进行调用就会抛出std::system错误。为了修复上面所列代码的死锁问题(不通过修改addrange方法的实现,让它不对lock和unlock方法进行调用),我们可以将互斥量改为std::recursive_mutex。

  1. template <typename T> class container   
  2. {  
  3.     std::recursive_mutex _lock; // ...   
  4. }; 

经过修改之后,该程序的输出会同如下所示类似:

  1. 6334 18467 41 6334 18467 41 6334 18467 41  

明眼的读者可能已经发现了,每次调用func()所产生的数字序列都完全相同。这是因为对srad的初始化是要分线程进行的,对srand()的调用只是在主线程中进行了初始化。在其它的工作线程中,srand并没有得到初始化,所以每次产生的数字序列就是完全相同的了。

显式的加锁和解锁可能会导致一定的问题,比如忘了解锁或者加锁的顺序不对都有可能导致死锁。本标准提供了几个类和函数用于帮助解决这类问题。使用这些封装类就能够以相互一致的、RAII风格的方式使用互斥量了,它们可以在相应的代码块的范围内进行自动的加锁和解锁动作。这些封装类包括:

  • lock_guard: 该类的对象在构造之时会试图获得互斥量的拥有权(通过调用lock()实现),而在析构之时会自动释放它所获得的互斥量(通过调用unlock()实现)。这是一个不可复制的类。
  • unique_lock: 是一个通用的互斥量封装类。与lock_quard不同,它还支持延迟加锁、时间锁、递归锁、锁所有权的转移并且还支持使用条件变量。这也是一个不可复制的类,但它是可以移动的类。

使用这些封装类,我们可以象这样来改写我们的容器:

  1. template <typename T> class container   
  2. {  
  3.     std::recursive_mutex _lock;  
  4.     std::vector _elements; publicvoid add(T element)   
  5.     {  
  6.         std::lock_guard locker(_lock);  
  7.         _elements.push_back(element);  
  8.     } void addrange(int num, ...)  
  9.     {  
  10.         va_list arguments;  
  11.    
  12.         va_start(arguments, num); for (int i = 0; i < num; i )  
  13.         {  
  14.             std::lock_guard locker(_lock);  
  15.             add(va_arg(arguments, T));  
  16.         }  
  17.    
  18.         va_end(arguments);   
  19.     } void dump()  
  20.     {  
  21.         std::lock_guard locker(_lock); for(auto e : _elements)  
  22.             std::cout << e << std::endl;  
  23.     }  
  24. }; 

有人会说,既然dump()方法并不会对容器的状态做出任何修改,所以它应该定义为congst的方法。但要是你真的这么改了之后,编译器就会报告出如下的错误:

  1. ‘std::lock_guard<_Mutex>::lock_guard(_Mutex &)' : cannot convert parameter 1 from ‘const std::recursive_mutex' to ‘std::recursive_mutex &'  

互斥量(无论使用的是哪一种实现)必须要获得和释放,这就意味着要调用非常量型的lock()和unlock()方法。所以,从逻辑上讲,lock_guard不能在定义中添加const(因为该方法定义为const的话,互斥量也就必需是const的了)这个问题有个解决办法,可以让 mutex变为mutable的。成为 mutable之后就可以在const函数中对状态进行修改了。不过,这种用法应该只用于隐藏的或者“元”状态(比如,对计算结果或者查询到的数据进行缓存,以供下次调用时直接使用而无需再次计算或查询;再比如,对 只是对对象的实际状态起着辅助作用的互斥量中的位进行修改)。

  1. template <typename T> class container   
  2. {  
  3.    mutable std::recursive_mutex _lock;  
  4.    std::vector _elements; publicvoid dump() const {  
  5.       std::lock_guard locker(_lock); for(auto e : _elements)  
  6.          std::cout << e << std::endl;  
  7.    }  
  8. }; 

这些封装类都具有可以接受一个用来指导加锁策略的参数的构造器,可用的加锁策略有:

  • defer_lockof typedefer_lock_t: 不要取得互斥量的拥有权
  • try_to_lockof typetry_to_lock_t: 在不会被阻塞的情况下尝试获得互斥量的拥有权
  • adopt_lockof typeadopt_lock_t: 假设调用线程已经获得了互斥量的拥有权

这些策略的定义如下所示:

  1. struct defer_lock_t { };   
  2. struct try_to_lock_t { };   
  3. struct adopt_lock_t { };   
  4. constexpr std::defer_lock_t defer_lock = std::defer_lock_t();   
  5. constexpr std::try_to_lock_t try_to_lock = std::try_to_lock_t();   
  6. constexpr std::adopt_lock_t adopt_lock = std::adopt_lock_t(); 

除了这些互斥量的封装类,本标准还提供了几个用来对一个或多个互斥量进行加锁的方法。

  • lock: 使用一种可避免死锁的算法对互斥量进行加锁(通过调用tolock()、try_lock()以及unlock())。
  • try_lock: 通过调用try_lock()i按照参数里指定的互斥量的顺序对多个互斥量进行加锁。

这里举一个造成死锁的例子:我们有一个保存元素的容器,还有一个叫做exchange()的方法,用来将一个元素从一个容器中取出来放入另外一个容器。为了成为线程安全的函数,这个函数通过获得每个容器的互斥量,对两个容器的访问进行了同步处理。

  1. template <typename T> class container   
  2. public:  
  3.     std::mutex _lock;  
  4.     std::set _elements; void add(T element)   
  5.     {  
  6.         _elements.insert(element);  
  7.     } void remove(T element)   
  8.     {  
  9.         _elements.erase(element);  
  10.     }  
  11. }; void exchange(container<int>& cont1, container<int>& cont2, int value)  
  12. {  
  13.     cont1._lock.lock();  
  14.     std::this_thread::sleep_for(std::chrono::seconds(1)); // <-- forces context switch to simulate the deadlock  cont2._lock.lock();      
  15.    
  16.     cont1.remove(value);  
  17.     cont2.add(value);  
  18.    
  19.     cont1._lock.unlock();  
  20.     cont2._lock.unlock();  

假设这个函数是从两个不同的线程中进行调用的,在第一个线程中有一个元素从第一个容器中取出来,放到了第二个容器中,在第二个线程中该元素又从第二个容器中取出来放回到了第一个容器中。这样会导致死锁(如果线程上下文正好在获得第一个锁的时候从一个线程切换到了另一个线程的时候就会发生死锁)。

  1. int main()  
  2. {  
  3.     srand((unsigned int)time(NULL));  
  4.    
  5.     container<int> cont1;   
  6.     cont1.add(1);  
  7.     cont1.add(2);  
  8.     cont1.add(3);  
  9.    
  10.     container<int> cont2;   
  11.     cont2.add(4);  
  12.     cont2.add(5);  
  13.     cont2.add(6);  
  14.    
  15.     std::thread t1(exchange, std::ref(cont1), std::ref(cont2), 3);  
  16.     std::thread t2(exchange, std::ref(cont2), std::ref(cont1), 6);  
  17.    
  18.     t1.join();  
  19.     t2.join(); return 0;  

要解决该问题,你可以使用以能够避免死锁的方式获得锁的std::lock:

  1. void exchange(container<int>& cont1, container<int>& cont2, int value)  
  2. {  
  3.     std::lock(cont1._lock, cont2._lock);   
  4.    
  5.     cont1.remove(value);  
  6.     cont2.add(value);  
  7.    
  8.     cont1._lock.unlock();  
  9.     cont2._lock.unlock();  

12
[责任编辑:孙可 sun_ke@cnw.com.cn]