一、为什么需要多线程?
在现代计算机体系结构中,多核处理器已成为标准配置。多线程编程允许我们充分利用这些计算资源,通过并行执行任务来提升程序性能。C++11之前,多线程编程依赖于平台特定的API(如POSIX pthreads、Windows线程API),C++11标准引入了<thread>等头文件,为多线程编程提供了统一、可移植的解决方案。
二、C++多线程基础
2.1 第一个多线程程序
#include<iostream>#include<thread>#include<chrono>// 线程函数voidthreadFunction(intid){std::cout<<"线程 "<<id<<" 开始执行"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));std::cout<<"线程 "<<id<<" 结束执行"<<std::endl;}intmain(){std::cout<<"主线程开始,创建3个子线程"<<std::endl;// 创建并启动线程std::threadt1(threadFunction,1);std::threadt2(threadFunction,2);std::threadt3(threadFunction,3);// 等待线程完成t1.join();t2.join();t3.join();std::cout<<"所有线程执行完毕"<<std::endl;return0;}2.2 线程管理的基本操作
#include<thread>#include<iostream>voidworker(){std::cout<<"工作线程ID: "<<std::this_thread::get_id()<<std::endl;}intmain(){// 获取硬件支持的并发线程数unsignedintn=std::thread::hardware_concurrency();std::cout<<"硬件支持的最大并发线程数: "<<n<<std::endl;// 创建线程std::threadt(worker);// 线程IDstd::cout<<"主线程ID: "<<std::this_thread::get_id()<<std::endl;std::cout<<"子线程ID: "<<t.get_id()<<std::endl;// 检查线程是否可joinif(t.joinable()){t.join();// 等待线程完成}// 分离线程(主线程不等待)// t.detach(); // 谨慎使用!return0;}三、数据共享与同步
3.1 竞态条件与数据竞争
#include<iostream>#include<thread>#include<vector>// 有数据竞争的错误示例intcounter=0;voidincrement(){for(inti=0;i<100000;++i){counter++;// 非原子操作,存在数据竞争}}intmain(){std::threadt1(increment);std::threadt2(increment);t1.join();t2.join();// 结果可能不是200000std::cout<<"计数器值: "<<counter<<std::endl;return0;}3.2 互斥锁(Mutex)
#include<iostream>#include<thread>#include<mutex>#include<vector>intcounter=0;std::mutex mtx;voidsafeIncrement(){for(inti=0;i<100000;++i){std::lock_guard<std::mutex>lock(mtx);// 自动加锁解锁counter++;// lock_guard离开作用域,自动释放锁}}voidtryLockExample(){std::timed_mutex timed_mtx;for(inti=0;i<5;++i){// 尝试获取锁,非阻塞if(timed_mtx.try_lock()){std::cout<<"线程 "<<std::this_thread::get_id()<<" 获取到锁"<<std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));timed_mtx.unlock();break;}else{std::cout<<"线程 "<<std::this_thread::get_id()<<" 未能获取锁,等待重试"<<std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(50));}}}intmain(){std::threadt1(safeIncrement);std::threadt2(safeIncrement);t1.join();t2.join();std::cout<<"安全的计数器值: "<<counter<<std::endl;// 尝试锁示例std::threadt3(tryLockExample);std::threadt4(tryLockExample);t3.join();t4.join();return0;}3.3 递归锁与锁保护
#include<iostream>#include<thread>#include<mutex>std::recursive_mutex rec_mtx;voidrecursiveFunction(intdepth){if(depth<=0)return;rec_mtx.lock();std::cout<<"深度: "<<depth<<", 线程ID: "<<std::this_thread::get_id()<<std::endl;// 递归调用,使用递归锁避免死锁recursiveFunction(depth-1);rec_mtx.unlock();}// 使用unique_lock提供更灵活的锁管理voidflexibleLockExample(){std::mutex mtx;std::unique_lock<std::mutex>lock(mtx,std::defer_lock);// 延迟锁定std::cout<<"准备获取锁..."<<std::endl;lock.lock();std::cout<<"获取到锁,执行临界区操作"<<std::endl;lock.unlock();// 可以重新锁定lock.lock();std::cout<<"重新锁定"<<std::endl;// 自动解锁}intmain(){std::threadt1(recursiveFunction,3);std::threadt2(flexibleLockExample);t1.join();t2.join();return0;}3.4 条件变量
#include<iostream>#include<thread>#include<mutex>#include<condition_variable>#include<queue>std::mutex mtx;std::condition_variable cv;std::queue<int>dataQueue;boolfinished=false;// 生产者线程voidproducer(intitems){for(inti=0;i<items;++i){std::this_thread::sleep_for(std::chrono::milliseconds(100));std::lock_guard<std::mutex>lock(mtx);dataQueue.push(i);std::cout<<"生产: "<<i<<std::endl;cv.notify_one();// 通知一个等待的消费者}{std::lock_guard<std::mutex>lock(mtx);finished=true;}cv.notify_all();// 通知所有消费者}// 消费者线程voidconsumer(intid){while(true){std::unique_lock<std::mutex>lock(mtx);// 等待条件:队列不为空或生产结束cv.wait(lock,[]{return!dataQueue.empty()||finished;});if(finished&&dataQueue.empty()){break;}if(!dataQueue.empty()){intvalue=dataQueue.front();dataQueue.pop();lock.unlock();// 提前解锁,减少锁持有时间std::cout<<"消费者 "<<id<<" 消费: "<<value<<std::endl;}}std::cout<<"消费者 "<<id<<" 结束"<<std::endl;}intmain(){std::threadprod(producer,10);std::threadcons1(consumer,1);std::threadcons2(consumer,2);prod.join();cons1.join();cons2.join();return0;}四、原子操作
4.1 原子类型
#include<iostream>#include<thread>#include<atomic>#include<vector>std::atomic<int>atomicCounter(0);std::atomic<bool>ready(false);voidatomicIncrement(){// 等待信号while(!ready.load(std::memory_order_acquire)){std::this_thread::yield();// 让出CPU时间片}for(inti=0;i<100000;++i){atomicCounter.fetch_add(1,std::memory_order_relaxed);}}voidtestAtomicOperations(){std::atomic<int>value(10);// 原子操作示例intexpected=10;boolsuccess=value.compare_exchange_strong(expected,20);std::cout<<"CAS操作: "<<(success?"成功":"失败")<<", 当前值: "<<value.load()<<std::endl;// 原子标志测试std::atomic_flag flag=ATOMIC_FLAG_INIT;// 测试并设置boolwas_set=flag.test_and_set();std::cout<<"第一次test_and_set: "<<was_set<<std::endl;flag.clear();was_set=flag.test_and_set();std::cout<<"清除后test_and_set: "<<was_set<<std::endl;}intmain(){constintnum_threads=4;std::vector<std::thread>threads;// 启动线程for(inti=0;i<num_threads;++i){threads.emplace_back(atomicIncrement);}// 让所有线程开始执行ready.store(true,std::memory_order_release);// 等待所有线程完成for(auto&t:threads){t.join();}std::cout<<"原子计数器最终值: "<<atomicCounter.load()<<std::endl;// 测试其他原子操作testAtomicOperations();return0;}4.2 内存顺序
#include<atomic>#include<thread>#include<iostream>std::atomic<int>x(0),y(0);std::atomic<int>z(0);voidwrite_x_then_y(){x.store(1,std::memory_order_relaxed);// 1y.store(1,std::memory_order_release);// 2}voidread_y_then_x(){while(!y.load(std::memory_order_acquire));// 3if(x.load(std::memory_order_relaxed)){// 4z.fetch_add(1);}}voidmemoryOrderDemo(){std::threadt1(write_x_then_y);std::threadt2(read_y_then_x);t1.join();t2.join();// z应该为1std::cout<<"z = "<<z.load()<<std::endl;}intmain(){// 多次运行观察结果for(inti=0;i<10;++i){x=0;y=0;z=0;memoryOrderDemo();}return0;}五、高级多线程模式
5.1 线程池实现
#include<iostream>#include<vector>#include<queue>#include<thread>#include<mutex>#include<condition_variable>#include<functional>#include<future>#include<memory>classThreadPool{public:ThreadPool(size_t numThreads):stop(false){for(size_t i=0;i<numThreads;++i){workers.emplace_back([this]{while(true){std::function<void()>task;{std::unique_lock<std::mutex>lock(queueMutex);condition.wait(lock,[this]{returnstop||!tasks.empty();});if(stop&&tasks.empty()){return;}task=std::move(tasks.front());tasks.pop();}task();}});}}template<classF,class...Args>autoenqueue(F&&f,Args&&...args)->std::future<typenamestd::result_of<F(Args...)>::type>{usingreturn_type=typenamestd::result_of<F(Args...)>::type;autotask=std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f),std::forward<Args>(args)...));std::future<return_type>res=task->get_future();{std::unique_lock<std::mutex>lock(queueMutex);if(stop){throwstd::runtime_error("线程池已停止");}tasks.emplace([task](){(*task)();});}condition.notify_one();returnres;}~ThreadPool(){{std::unique_lock<std::mutex>lock(queueMutex);stop=true;}condition.notify_all();for(std::thread&worker:workers){worker.join();}}private:std::vector<std::thread>workers;std::queue<std::function<void()>>tasks;std::mutex queueMutex;std::condition_variable condition;boolstop;};// 使用示例intmain(){ThreadPoolpool(4);std::vector<std::future<int>>results;// 提交任务到线程池for(inti=0;i<8;++i){results.emplace_back(pool.enqueue([i]{std::cout<<"任务 "<<i<<" 在线程 "<<std::this_thread::get_id()<<" 执行"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));returni*i;}));}// 获取结果for(auto&result:results){std::cout<<"结果: "<<result.get()<<std::endl;}return0;}5.2 读写锁(C++14及以上)
#include<iostream>#include<thread>#include<shared_mutex>#include<vector>#include<chrono>classThreadSafeData{private:mutablestd::shared_mutex mutex_;intdata_;public:ThreadSafeData(intinit=0):data_(init){}// 读取操作 - 共享锁intread()const{std::shared_lock<std::shared_mutex>lock(mutex_);std::cout<<"读取: "<<data_<<" 线程ID: "<<std::this_thread::get_id()<<std::endl;returndata_;}// 写入操作 - 独占锁voidwrite(intvalue){std::unique_lock<std::shared_mutex>lock(mutex_);std::cout<<"写入: "<<value<<" 线程ID: "<<std::this_thread::get_id()<<std::endl;data_=value;}// 增量操作voidincrement(){std::unique_lock<std::shared_mutex>lock(mutex_);++data_;std::cout<<"增量到: "<<data_<<" 线程ID: "<<std::this_thread::get_id()<<std::endl;}};intmain(){ThreadSafeDatadata(0);std::vector<std::thread>threads;// 创建5个读线程for(inti=0;i<5;++i){threads.emplace_back([&data,i]{for(intj=0;j<3;++j){data.read();std::this_thread::sleep_for(std::chrono::milliseconds(100));}});}// 创建2个写线程for(inti=0;i<2;++i){threads.emplace_back([&data,i]{for(intj=0;j<2;++j){data.write(i*10+j);std::this_thread::sleep_for(std::chrono::milliseconds(200));}});}// 创建1个增量线程threads.emplace_back([&data]{for(inti=0;i<4;++i){data.increment();std::this_thread::sleep_for(std::chrono::milliseconds(150));}});// 等待所有线程完成for(auto&t:threads){t.join();}std::cout<<"最终值: "<<data.read()<<std::endl;return0;}六、C++20新特性:协程和信号量
6.1 信号量(C++20)
#include<iostream>#include<thread>#include<semaphore>#include<vector>// 生产者-消费者模型使用信号量std::counting_semaphore<10>emptySlots(10);// 空槽位信号量std::counting_semaphore<10>fullSlots(0);// 满槽位信号量std::mutex bufferMutex;std::queue<int>buffer;booldone=false;voidproducer(intid){for(inti=0;i<5;++i){emptySlots.acquire();// 等待空槽位{std::lock_guard<std::mutex>lock(bufferMutex);buffer.push(i);std::cout<<"生产者 "<<id<<" 生产: "<<i<<std::endl;}fullSlots.release();// 增加满槽位std::this_thread::sleep_for(std::chrono::milliseconds(100));}}voidconsumer(intid){while(!done||!buffer.empty()){fullSlots.acquire();// 等待满槽位if(done&&buffer.empty()){fullSlots.release();break;}intvalue;{std::lock_guard<std::mutex>lock(bufferMutex);if(!buffer.empty()){value=buffer.front();buffer.pop();std::cout<<"消费者 "<<id<<" 消费: "<<value<<std::endl;}}emptySlots.release();// 增加空槽位}}intmain(){constintnumProducers=3;constintnumConsumers=2;std::vector<std::thread>producers;std::vector<std::thread>consumers;// 启动生产者for(inti=0;i<numProducers;++i){producers.emplace_back(producer,i);}// 启动消费者for(inti=0;i<numConsumers;++i){consumers.emplace_back(consumer,i);}// 等待生产者完成for(auto&p:producers){p.join();}// 设置完成标志done=true;// 释放所有信号量以唤醒等待的消费者for(inti=0;i<numConsumers;++i){fullSlots.release();}// 等待消费者完成for(auto&c:consumers){c.join();}std::cout<<"生产消费完成"<<std::endl;return0;}七、最佳实践与性能考虑
7.1 避免死锁的准则
#include<iostream>#include<thread>#include<mutex>// 死锁示例std::mutex mtx1,mtx2;voiddeadlockThread1(){std::lock_guard<std::mutex>lock1(mtx1);std::this_thread::sleep_for(std::chrono::milliseconds(100));std::lock_guard<std::mutex>lock2(mtx2);// 可能死锁std::cout<<"线程1完成"<<std::endl;}voiddeadlockThread2(){std::lock_guard<std::mutex>lock2(mtx2);std::this_thread::sleep_for(std::chrono::milliseconds(100));std::lock_guard<std::mutex>lock1(mtx1);// 可能死锁std::cout<<"线程2完成"<<std::endl;}// 避免死锁的方法1:按固定顺序加锁voidsafeThread1(){std::lock(mtx1,mtx2);// 同时锁定多个互斥量std::lock_guard<std::mutex>lock1(mtx1,std::adopt_lock);std::lock_guard<std::mutex>lock2(mtx2,std::adopt_lock);std::cout<<"安全线程1完成"<<std::endl;}voidsafeThread2(){std::lock(mtx1,mtx2);// 相同的锁定顺序std::lock_guard<std::mutex>lock1(mtx1,std::adopt_lock);std::lock_guard<std::mutex>lock2(mtx2,std::adopt_lock);std::cout<<"安全线程2完成"<<std::endl;}// 避免死锁的方法2:使用std::scoped_lock(C++17)voidscopedLockExample(){std::scoped_locklock(mtx1,mtx2);// 自动管理多个锁std::cout<<"使用scoped_lock安全加锁"<<std::endl;}intmain(){// 可能产生死锁// std::thread t1(deadlockThread1);// std::thread t2(deadlockThread2);// t1.join();// t2.join();// 安全版本std::threadt3(safeThread1);std::threadt4(safeThread2);t3.join();t4.join();// C++17 scoped_lockstd::threadt5(scopedLockExample);t5.join();return0;}7.2 性能优化技巧
#include<iostream>#include<thread>#include<atomic>#include<vector>#include<chrono>// 伪共享问题示例structBadAlignment{inta;// 可能和b在同一个缓存行intb;};// 解决伪共享structalignas(64)GoodAlignment{// 64字节对齐,通常是缓存行大小inta;// 独占一个缓存行};structalignas(64)GoodAlignment2{intb;// 独占另一个缓存行};voidfalseSharingTest(){constintiterations=100000000;// 伪共享情况BadAlignment bad;autostart=std::chrono::high_resolution_clock::now();std::threadt1([&bad,iterations]{for(inti=0;i<iterations;++i){bad.a++;}});std::threadt2([&bad,iterations]{for(inti=0;i<iterations;++i){bad.b++;}});t1.join();t2.join();autoend=std::chrono::high_resolution_clock::now();autoduration=std::chrono::duration_cast<std::chrono::milliseconds>(end-start);std::cout<<"伪共享耗时: "<<duration.count()<<"ms"<<std::endl;// 避免伪共享GoodAlignment good1;GoodAlignment2 good2;start=std::chrono::high_resolution_clock::now();std::threadt3([&good1,iterations]{for(inti=0;i<iterations;++i){good1.a++;}});std::threadt4([&good2,iterations]{for(inti=0;i<iterations;++i){good2.b++;}});t3.join();t4.join();end=std::chrono::high_resolution_clock::now();duration=std::chrono::duration_cast<std::chrono::milliseconds>(end-start);std::cout<<"避免伪共享耗时: "<<duration.count()<<"ms"<<std::endl;}intmain(){falseSharingTest();return0;}八、调试与测试
8.1 线程安全的单元测试
#include<iostream>#include<thread>#include<vector>#include<cassert>#include<future>classThreadSafeCounter{private:std::mutex mtx;intcount;public:ThreadSafeCounter():count(0){}voidincrement(){std::lock_guard<std::mutex>lock(mtx);++count;}intget()const{std::lock_guard<std::mutex>lock(mtx);returncount;}};voidtestThreadSafety(){ThreadSafeCounter counter;constintnumThreads=10;constintincrementsPerThread=1000;std::vector<std::future<void>>futures;// 启动多个线程同时增加计数器for(inti=0;i<numThreads;++i){futures.emplace_back(std::async(std::launch::async,[&counter,incrementsPerThread]{for(intj=0;j<incrementsPerThread;++j){counter.increment();}}));}// 等待所有线程完成for(auto&future:futures){future.wait();}// 验证结果intexpected=numThreads*incrementsPerThread;intactual=counter.get();std::cout<<"期望值: "<<expected<<std::endl;std::cout<<"实际值: "<<actual<<std::endl;assert(actual==expected);std::cout<<"测试通过!"<<std::endl;}intmain(){try{testThreadSafety();}catch(conststd::exception&e){std::cerr<<"测试失败: "<<e.what()<<std::endl;return1;}return0;}九、总结与进阶学习
9.1 关键点总结
- 线程创建与管理:使用
std::thread创建线程,理解join和detach的区别 - 数据同步:掌握互斥锁、条件变量、原子操作的使用场景
- 避免常见问题:识别和避免死锁、竞态条件、伪共享
- 性能优化:合理选择同步机制,减少锁竞争
- 现代C++特性:利用C++14/17/20的新特性简化多线程编程
9.2 进阶学习方向
- 并行算法:C++17的并行STL算法
- GPU编程:CUDA、OpenCL、SYCL
- 分布式计算:MPI、gRPC、ZeroMQ
- 异步编程模型:Promise/Future、反应式编程
- 并发数据结构:无锁队列、并发哈希表
9.3 推荐工具
- 调试工具:ThreadSanitizer、Helgrind、Intel Inspector
- 性能分析:Perf、VTune、AMD uProf
- 可视化:Chrome Tracing、Vampir
多线程编程是C++高级开发的核心技能之一。从基础同步机制到高级并发模式,需要不断实践和积累经验。记住:过早优化是万恶之源,在确保正确性的前提下进行性能优化,使用工具验证线程安全性,编写可维护的并发代码。