说明:本篇旨在介绍并发库怎么使用,介绍的不会太全面。前置条件是了解 Linux/window 的相应的系统调用再看。这是文档:cplusplus。
1、<thread>库
1.1 说明
其实 C++ 的 <thread> 库就是对系统调用封装了一层(其实不仅是它,我下面介绍的所有库都是这样),其实用系统调用也是可以实现的,不过它支持跨平台,这是一个很大的优势,另外它还融合了一些 C++ 的特性总体来说用起来会比系统调用要舒服。。
1.2 函数说明
1.2.1 构造函数
下面是构造一个线程对象的方式:
可以看到它支持创建一个空线程,后续可通过移动构造为线程函数体赋值,注意它不支持拷贝构造!一般是第二种用的最多。下面是定义线程的一些方式:
#include <iostream> #include <thread> void print(int num) { std::cout << std::this_thread::get_id() << ": " << num << std::endl; } int main() { std::thread t1; t1 = std::thread(print, 1); //这里是移动赋值 std::thread t2(std::thread(print, 2)); //这里是移动构造 std::thread t3(print, 3); //这里使用了第二种构造 t1.join(); t2.join(); t3.join(); return 0; }大家应该发现了 std::thread t3(print, 3) 它没有写显示的写类型。这是因为它是函数编译器会自动推导类型。
1.2.2joinable()
用于判断一个线程是否可被 join。可 join 了返回 true,否则返回 false。
1.2.3 get_id()
它就是返回线程的 id 号。注意到返回值是 id 它其实是一个类:
正常来说返回一个无符号整数就行,这里之所以返回类是为了在不同环境下做适配。它支持比较大小(像:>、<、==、>=等)、流插入和提取、特化了hash仿函数用于被哈希存储从而实现快速查找。
它一般是线程对象用于获取线程 id 时使用,对于在执行体里面获取 id 需要用到 this_tread。
1.3 this_thread
它是一个命名空间:
它的 get_id() 用于在执行体中获取线程 id 。yield() 是主动让出 CPU,让其他线程先执行,该线程会被放到同一优先级线程的末尾。
sleep_for 和 sleep_until 都是让线程休眠。sleep_for 是休眠多久,sleep_until 是休眠到什么时候。下面介绍它的时候需要用到 C++ 的 <chrono>库它是时间库(它在文章末尾,看完这个在继续看)。
void print(int num) { std::this_thread::sleep_for(std::chrono::seconds(1)); // 该线程休眠一秒 std::cout << std::this_thread::get_id() << ": " << num << std::endl; }void print(int num) { auto start = std::chrono::steady_clock::now(); auto end = start + std::chrono::seconds(5); std::this_thread::sleep_until(end); std::cout << std::this_thread::get_id() << ": " << num << std::endl; }2、<mutex>库
2.1 说明
其内部主要封装了互斥锁机制。下面是它的常见的类:
2.2 mutexes
最常见的就是 mutex 即互斥锁。
可以看到它仅支持默认构造,不支持拷贝构造。下面是他的成员函数:
其中 lock() 是加锁、try_lock() 是尝试去加锁,如果能加锁,它会加锁然后返回 true,不能加锁这直接返回 false、unlock() 是解锁。下面是一个用例:
int cnt = 0; std::mutex mtx; void print(int num) { mtx.lock(); for (int i = 0; i < num; i++) cnt++; mtx.unlock(); } int main() { std::vector<std::thread> ths(2); for (int i = 0; i < 2; i++) ths[i] = std::thread(print, 10000000); for (int i = 0; i < 2; i++) ths[i].join(); std::cout << cnt; return 0; }可以看到 cnt 和 mtx 都被定义到了全局,如果想在函数内部使用会怎么样呢?也就是下面的这种写法:
void print(int num, int& cnt, std::mutex& mtx) { mtx.lock(); for (int i = 0; i < num; i++) cnt++; mtx.unlock(); } int main() { int cnt = 0; std::mutex mtx; std::vector<std::thread> ths(2); for (int i = 0; i < 2; i++) ths[i] = std::thread(print, 10000000, cnt, mtx); for (int i = 0; i < 2; i++) ths[i].join(); std::cout << cnt; return 0; }运行发现报错了:
这是因为:在感官上我们认为是直接把引用传递,实际并不是。因为这里用到了可变参数在底层会创建一个结构体用来存放函数指针和它的参数,这就相当于把值拷贝了一份,但是由于 mutex 不支持拷贝,所以就会报错。所以就引入了 ref 函数。所以赋值的那行就可以这么写:
ths[i] = std::thread(print, 10000000, std::ref(cnt), std::ref(mtx));由于 C++11 引入了 lambda 表达式所以就可以这样写:
int main() { int cnt = 0; std::mutex mtx; std::vector<std::thread> ths(2); auto print = [&cnt, &mtx](int num) { mtx.lock(); for (int i = 0; i < num; i++) cnt++; mtx.unlock(); }; for (int i = 0; i < 2; i++) ths[i] = std::thread(print, 10000000); for (int i = 0; i < 2; i++) ths[i].join(); std::cout << cnt; return 0; }lambda 表达式本质上就是仿函数也就是类,捕捉列表相当于使它的成员变量,因为是引用捕捉所以它们的类型是引用也就相当于持有本身。在打包的时候只是拷贝了这个 lambda 对象本身,它的成员并没有被拷贝。
timed_mutex 和 mutex 没有什么大的区别,他只是多加了两个函数 try_lock_for 和 try_lock_until 它们和 try_lock 类似,只不过多加了个休眠。try_lock_for 是如果没有申请到锁就休眠一段时间如果在这段时间内申请到锁就返回true(没锁的时候就会被挂起,不会一直占据CPU,有锁的时候在给它唤醒,让他去申请锁),时间到还没有申请到就返回 false。try_lock_until 它和 try_lock_for一样不过他是到某个时间点被唤醒。
recursive_mutex 和 mutex 没有什么大的区别,它是避免由于递归造成的死锁。就像一个线程申请到了锁,因为该函数体是递归调用,所以会再次去申请它已经申请到的锁,从而造成死锁。
2.3 lock_guard
lock_guard 是 C++11 提供的支持 RAII 方式管理互斥锁资源的类。也就是在构造的时候申请锁,析构的时候释放锁,这样可以有效的帮助因为异常而造成死锁问题。
第二个构造可以传递 adopt_lock_t 类型的 adopt_lock 对象用来管理已经 lock 的锁。下面是它的定义:
可以看到它是一个空类并且定义了一个 adopt_lock 常量。
int cnt = 0; std::mutex mtx; // 法一 void print(int num) { { std::lock_guard<std::mutex> _lock(mtx); for (int i = 0; i < num; i++) cnt++; } } // 法二 //void print(int num) //{ // mtx.lock(); // { // std::lock_guard<std::mutex> _lock(mtx, std::adopt_lock); // for (int i = 0; i < num; i++) // cnt++; // } //} int main() { std::vector<std::thread> ths(2); for (int i = 0; i < 2; i++) ths[i] = std::thread(print, 10000000); for (int i = 0; i < 2; i++) ths[i].join(); std::cout << cnt; return 0; }它没有成员函数!
2.4 unique_lock
它就是 lock_guard 的升级版:
可以看到在构造的时候多了很多种,新增了时间它是用来管理 timed_mutex 的构造时会调用try_lock_for 或 try_lock_until,还有多了两个类型:try_to_lock_t 和 defer_lock_t,它们和上面的adopt_lock_t 类似都只是一个空类,没有任何数据,只用来区分调用哪个构造函数。在它的内部有申请锁的成员函数:
try_to_lock 是去尝试申请锁,调用 try_lock 函数去申请。但如何判断是否成功拿到锁?unique_lock中重载了 bool,可以用它来判断,具体如下:
void print(int num) { std::unique_lock<std::mutex> _lock(mtx, std::try_to_lock); if (_lock) std::cout << "拿到锁了" << std::endl; else std::cout << "没拿到锁" << std::endl; }defer_lock_t 是延迟申请锁:
std::mutex mtx; void func1() { mtx.lock(); std::unique_lock<std::mutex> lock(mtx, std::adopt_lock); } void func2() { std::unique_lock<std::mutex> lock; lock = std::unique_lock<std::mutex>(mtx, std::defer_lock); lock.lock(); } int main() { std::thread t1(func1); std::thread t2(func2); t1.join(); t2.join(); return 0; }2.5 lock() 和 try_lock()
它的版本有很多,像上面的 unique_lock、mutexs 中都有它的身影。不过这里我要介绍一下全局的。全局的 lock 和 try_lock 是支持申请多个锁(最少也是两个)。
在申请多个锁的时候如果其中某一个锁申请失败,会立即释放掉之前所有申请成功的锁,然后再次重新申请。程序会一直卡在这里直至所有锁申请成功。
它和 lock 基本一样不过它如果申请失败会返回申请失败锁的下标(从0开始),如果申请成功会返回 -1。
2.6 call_once()
该函数只会被线程执行一次。std::once_flag它是一个类,保证某段代码,在整个程序生命周期里,只执行 1 次!。
std::once_flag flag; void print() { std::cout << "我执行了"; } void func1() { std::call_once(flag, print); } int main() { std::thread t1(func1); std::thread t2(func1); t1.join(); t2.join(); return 0; }三、<atomic>库
3.1 说明
因为锁的消耗较大所以就有了原子库,用来实现无锁编程。它里面的所有成员函数都是原子操作!但是并不是所有类型都可以被当作原子类型,必须得符合以下判断:
std::is_trivially_copyable<T>::value std::is_copy_constructible<T>::value std::is_move_constructible<T>::value std::is_copy_assignable<T>::value std::is_move_assignable<T>::value std::is_same<T, typename std::remove_cv<T>::type>::value只要有一个返回false,那它就不能被认为可以当作原子类型。一般来说:int、bool、指针可以当作原子类型。还有一个原因就是要支持加减操作。
atomic 的原理主要是通过硬件实现的,现代处理器提供了原子指令来支持原子操作。原子指令可以实现对内存读取、比较和写入操作是不可分的简称 CAS 操作。另外硬件也采用了缓存一致性,因为有多个处理器时它们的缓存可能有数据一致性问题。Linux/windows 分别提供了CAS的系统调用接口,在C++11中对它又进行了封装:
// C++11⽀持的CAS接⼝ template <class T> bool atomic_compare_exchange_weak (atomic<T>* obj, T* expected, T val) noexcept; template <class T> bool atomic_compare_exchange_strong (atomic<T>* obj, T* expected, T val) noexcept;obj 是原子对象,expected是期望值,val是要改的新值。如果原子对象的值等于期望值,那么就把原子对象的值换成新值;如果原子对象的值不等于期望值,那就代表原子对象的值被其他线程修改了,那么就把期望值改成原子对象的值,然后再次进行判断。大概的样子如下:
std::atomic<int> acnt = 0; void Add(std::atomic<int>& cnt) { int old = cnt.load();// 获取原子变量的值 while (std::atomic_compare_exchange_weak(&cnt, &old, old + 1)); } void func1() { for (int i = 0; i < 10; i++) { ++acnt; //可以认为是这样实现的 Add(acnt); } }下面是<atomic>库中的CAS接口:
// C++11中atomic类的成员函数 bool compare_exchange_weak (T& expected, T val, memory_order sync = memory_order_seq_cst) noexcept; bool compare_exchange_strong (T& expected, T val, memory_order sync = memory_order_seq_cst) noexcept;memory_order 是内存顺序( memory_order )选项,CPU和编译器会偷偷乱序执行代码(在多线程下,一个线程内部,前面和后面代码的执行顺序被颠倒了),本质是为了榨干硬件性能。由于它比较复杂,感兴趣的可以自己去看看。
3.2 compare_exchange_weak() 和 compare_exchange_strong() 区别
compare_exchange_weak() 在某些平台下,即使原子变量的值等于预期值也会返回 false 这叫伪失败。在某些架构下就可能会出现像:RISCV 架构它的LL/SC 指令,LL/SC 指令天生就会 “伪失败”!即使值一样在读取和写入的时候被中断,线程被切走等SC就会失败也就是指令失败了所以返回 false!(LL和SC是一对指令,在LL执行完线程可能会被切走,即使这样它也是原子的,因为要么后面SC更新值,要么直接返回旧值,所以还是原子的)
compare_exchange_strong() 保证只要相等就一定会返回 true,因为它内部自带循环,自动把指令失败重试到成功为止!通常来说 compare_exchange_weak() 要比 compare_exchange_strong() 要更快。
3.3 自旋锁实现
下面是用 atomic 极简实现了一个自旋锁:
class spinlock { public: spinlock() :flag(false) {} void lock() { while (flag.exchange(true)) ; } void unlock() { flag.store(false); } private: std::atomic<bool> flag; };exchange是修改原子变量值,然后返回它上一次值。store是修改原子变量值。这段代码就是当lock() 时,将变量改为 true 然后返回 false,由于返回 false 循环结束即代表加锁成功。如果变量的值本来就是 true 那么会一直循环下去。解锁就是把变量的值设置为 false 即可。
其实还有 atomic_flag,它就是一个标志,它是专门用来实现自旋锁。
test_and_set 就类似于 exchange 它是把值设为 true 然后返回 false。clear 就是把值设置为 false。
class spinlock { public: void lock() { while (flag.test_and_set()) ; } void unlock() { flag.clear(); } private: std::atomic_flag flag = ATOMIC_FLAG_INIT; };四、<condition_variable>库
4.1 说明
它就是在系统调用的基础上又封装了一层,没什么好说的:
wait() 需要传入unique_lock<mutex>类型的互斥锁。它会阻塞线程,阻塞前先把锁释放然后在休眠,直至被 notify 唤醒,被唤醒后会再次去申请锁,然后在判断。对 wait_for()、wait_until() 它和 wait() 一样不过就只是加了个时间而已。wait_for() 在等待的期间内如果被唤醒则返回 true,时间到了还没被唤醒就返回 false。wait_until() 是判断是否到某一时间点。
notify_one() 是随机唤醒一个线程,notify_all() 是唤醒所有线程。
提一下对于想使用其他类型的锁有 condition_variable_any,不过这里我就不展开说了,感兴趣的可以自己去查查文档。
4.2 经典面试题:两个线程交替打印奇数和偶数
大家可以先思考一下,下面是实现代码:
int size = 100; bool flag = true; std::mutex mtx; std::condition_variable cv; //打印奇数 void odd() { int i = 1; while (i < size) { std::unique_lock<std::mutex> lock(mtx); while (flag) cv.wait(lock); std::cout << i << std::endl; i += 2; flag = true; cv.notify_one(); } } //打印偶数 void even() { int i = 0; while (i < size) { std::unique_lock<std::mutex> lock(mtx); while (!flag) cv.wait(lock); std::cout << i << std::endl; i += 2; flag = false; cv.notify_one(); } }上面的代码其实写的很巧妙,线程的启动有三种情况:1.odd 线程先启动;2.even 线程先启动;3.同时启动。
第一种情况:odd 申请到了锁,even 没有申请到锁他被放到阻塞队列中。odd 继续向下执行在 while 中判断发现符合条件则休眠,这时候它会释放申请的锁,然后 even 被唤醒它拿到锁向下执行,发现不符合 while 条件则继续向下执行,之后将 flag 改为 false 然后唤醒一个线程也就是odd。这时候lock锁也会被释放(因为单次循环执行完毕)。然后它们接着竞争锁如果是odd申请到那就执行,如果还是 even 那么会从新开始运行,注意这里的 flag 已经是 false 了在 while 判断时会失败,该线程进入阻塞。接着唤醒 odd 线程,重复上述的步骤。
对于剩下的第二种,也类似。第三种细分情况就是第一种或第二种。
五、future
这个我之前写过,这是链接:C++11——异步_c++有没有异步操作-CSDN博客
六、线程池
如果线程池的实现也在这篇文章就会造成该篇文章太长了,我会把它放到我的下一篇中。
七、<chrono>库
7.1 介绍
对于它的使用主要就是要用到 duartion 和 time_point 这两个类。
7.2 duartion
它表示一段时间间隔。Req 是数值类型(存储时长的计数,如 int/ long long 等);Period 是时间单位(默认 ratio<1> 是1秒)。为了方便使用库已经定义好了常用的时长(就相当于是 typedef了一下):
std::chrono::seconds s(2); // 2秒7.3 time_point
它表示一个具体的时间点。Clock 是时钟类型(提供时间基准,常用:system_clock / steady_clock);Duartion 是时间精度(有纳秒、秒等)。
system_clock:它是类模板,表示系统实时时钟,对应北京时间,可转成日期。下面是它的成员函数:
now:获取当前时间。
to_time_t :把 time_point 类型 time_t 从而适配C语言。
steady_clock:它是类模板,表示单调时钟,只增不减,适合计算代码耗时。下面是它的成员函数:
定义 time_point 对象:
using namespace std::chrono; time_point<system_clock> start = system_clock::now();不过这种方式挺不爽的,在system_clock 和 steady_clock 中的内部有这样的代码(以system_clock举例)
namespace std::chrono { struct system_clock { // 别名:把 time_point<system_clock> 简写为 system_clock::time_point using time_point = std::chrono::time_point<std::chrono::system_clock>; }; }所以就可以这么写:
using namespace std::chrono; system_clock::time_point start = system_clock::now();time_point 允许加减 duartion 类型它的返回值是 time_point, time_point 相互减的返回值是 duartion 类型。
system_clock::time_point now = system_clock::now(); // 时间点 + 时长 = 新时间点 auto future = now + seconds(10); ------------------------------------------------------------- auto start = steady_clock::now(); auto end = steady_clock::now(); // 两个时间点相减 → 返回时长类型 duration steady_clock::duration diff = end - start;