目录
一. 条件变量
1.1 条件变量
1.2 条件变量的优点
二. 主要函数
2.1 pthread_cond_init
2.2 pthread_cond_destroy
2.3 pthread_cond_wait
2.4 pthread_cond_timedwait
相对时间与绝对时间:
2.5 pthread_cond_signal
2.6 pthread_cond_broadcast
三. 生产者消费者条件变量模型
3.1 什么是生产者消费者模式
3.2 为什么使用生产者消费者模式
3.3 代码实现
情况一:单个消费者
情况二:多个消费者
一. 条件变量
1.1 条件变量
1、条件变量本身不是锁,但它也可以造成线程阻塞。通常与互斥锁配合使用。给多线程提供一个会合的场所。
2、条件变量是利用线程间共享的全局变量进行同步的一种机制。主要包括两个动作:一个线程等待”条件变量的条件成立”而挂起;另一个线程使”条件成立”(并且给出条件成立的信号)。
3、为了防止竞争,条件变量的使用通常配合互斥锁。
4、条件变量类型为 pthread_cond_t。
1.2 条件变量的优点
1、相较于mutex而言,条件变量可以减少竞争。
2、如直接使用mutex,除了生产者、消费者之间要竞争互斥量以外,消费者之间也需要竞争
互斥量,但如果汇聚(链表)中没有数据,消费者之间竞争互斥锁是无意义的。有了条件
变量机制以后,只有生产者完成生产,才会引起消费者之间的竞争。提高了程序效率。
二. 主要函数
以下函数的返回值都是:成功返回0, 失败直接返回错误号。
| 函数 | 功能 |
| pthread_cond_init | 初始化条件变量 |
| pthread_cond_destroy | 销毁条件变量 |
| pthread_cond_wait | 阻塞等待条件满足 |
| pthread_cond_timedwait | 设置等待超时(超时不阻塞) |
| pthread_cond_signal | 通知函数,一次通知一个线程 |
| pthread_cond_broadcast | 通知函数,一次通知多个线程 |
pthread_cond_t类型:用于定义条件变量:pthread_cond_t cond;
2.1 pthread_cond_init
作用:
1、pthread_cond_init() 函数将使用由 attr 参数所指定的属性来初始化由 cond 参数所引用的条件变量。如果 attr 为 NULL,则将使用默认的条件变量属性;其效果相当于传递一个默认条件变量属性对象的地址。在成功初始化后,该条件变量的状态将被设置为已初始化状态。
2、试图初始化一个已经初始化好的条件变量会导致未定义的行为。
3、在默认条件变量属性适用的情况下,可以使用宏 PTHREAD_COND_INITIALIZER 来初始化条件变量。其效果应与通过调用 pthread_cond_init() 并将参数 attr 设为 NULL 来进行动态初始化相同,只是不会执行任何错误检查。
静态初始化:pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
动态初始化:pthread_cond_init(&cond, NULL);
4、如果 pthread_cond_init() 函数中的 attr 参数所指定的值并非指向已初始化的条件变量属性,则该行为未定义。
2.2 pthread_cond_destroy
作用:
1、pthread_cond_destroy() 函数将销毁由 cond 指定的条件变量;该对象实际上将处于未初始化状态。
2、已销毁的条件变量对象可以通过 pthread_cond_init() 重新初始化;在条件变量已被销毁之后再对其进行其他引用的结果是未定义的。
2.3 pthread_cond_wait
作用:
1、pthread_cond_timedwait()和pthread_cond_wait()这两个函数需要在条件变量上进行阻塞操作。应用程序必须确保在调用这些函数时,调用线程已锁定相应的互斥锁;否则,将出现错误或未定义的行为。
2、:a、阻塞等待条件变量(cond)的满足
b、释放已掌握的互斥锁(解锁),相当于pthread_mutex_unlock()(ab两步为原子操作)
c、当被唤醒,pthread_cond_wait返回时,解除阻塞并重新申请获取互斥锁,
pthread_mutex_lock();
问题1:这个函数的解锁和阻塞等待为什么要保证原子操作?
如果不是原子操作有可能会造成 "信号丢失" 这个问题:
2.4 pthread_cond_timedwait
自带超时机制
作用:
1、pthread_cond_timedwait()和pthread_cond_wait()这两个函数需要在条件变量上进行阻塞操作。应用程序必须确保在调用这些函数时,调用线程已锁定相应的互斥锁;否则,将出现错误或未定义的行为。
2、pthread_cond_timedwait() 函数的功能与 pthread_cond_wait() 函数类似,不同之处在于:如果由abstime指定的绝对时间在条件 cond 被触发或广播之前已经过去(即系统时间等于或超过 abstime),或者在调用时该绝对时间已经过去,那么将返回错误(ETIMEDOUT)。当出现此类超时情况时,pthread_cond_timedwait() 仍会释放并重新获取由 mutex 指定的互斥锁,并且可能消费同时发往条件变量的条件信号。
相对时间与绝对时间:
此函数的参数abstime代表的是一个绝对时间,struct timespec这个结构体类型声明如下:
struct timespec { time_t tv_sec; /* Seconds 秒数*/ long tv_nsec; /* Nanoseconds [0 .. 999999999] 纳秒数*/ };相对时间(relative):指从"当前时刻"开始计算的时长
比如sleep(),usleep()等都使用相对时间来计时
绝对时间(absolute):指的具体的某个时间点,Linux中使用的是Unix Epoch(1970年1月1日
00:00:00)
比如pthread_cond_timedwait()与sem_timedwait()等使用绝对时间来计时。
如何获取当前的绝对时间:使用函数clock_gettime()
作用:获取当前系统时间
参数:
clk_id:时钟类型:
| 时钟类型 | 说明 | 典型用途 |
|---|---|---|
| CLOCK_REALTIME | 真实世界时间(墙上时间) | 记录日志、文件时间戳、超时(timedwait) |
| CLOCK_MONOTONIC | 单调递增时间 | 测量时间间隔、性能计时(推荐) |
| CLOCK_MONOTONIC_RAW | 更纯粹的硬件单调时间 | 需要极高精度,不受 NTP 调整影响 |
| CLOCK_PROCESS_CPUTIME_ID | 当前进程的 CPU 时间 | 统计进程 CPU 使用率 |
| CLOCK_THREAD_CPUTIME_ID | 当前线程的 CPU 时间 | 统计线程 CPU 使用率 |
| CLOCK_BOOTTIME | 系统启动后的时间(含睡眠时间) | 适合需要考虑系统休眠的场景 |
*tp:传出参数
一般使用方法:
struct timespec ts;//定义结构体来接收时间 // 获取当前绝对时间 clock_gettime(CLOCK_REALTIME, &ts); // 在当前时间基础上增加相对时长 → 得到绝对超时时间 ts.tv_sec += 5; // 再过5秒 ts.tv_nsec += 300000000; // 再加0.3秒 // 处理纳秒进位 if (ts.tv_nsec >= 1000000000) { ts.tv_sec++; ts.tv_nsec -= 1000000000; } int ret = pthread_cond_timedwait(&cond, &mutex, &ts);2.5 pthread_cond_signal
作用:
1、这些函数将解除那些因等待条件变量而被阻塞的线程。
2、pthread_cond_signal() 函数应当解除至少一个正在指定条件变量 cond 上被阻塞的线程(如果存在任何线程被阻塞在该条件变量上的话)
2.6 pthread_cond_broadcast
作用:
1、这些函数将解除那些因等待条件变量而被阻塞的线程。
2、pthread_cond_broadcast() 函数的作用是解除当前所有在指定条件变量 cond 上被阻塞的线程的阻塞状态。
三. 生产者消费者条件变量模型
3.1 什么是生产者消费者模式
比如有两个线程A和B,它们共享一个固定大小的缓冲区,A线程产生数据放入缓冲区,B线程从缓冲区中取出数据进行计算,那么这里其实就是一个生产者和消费者的模式,A相当于生产者,B相当于消费者
3.2 为什么使用生产者消费者模式
在多线程开发中,如果生产者生产数据的速度很快,而消费者消费数据的速度很慢,导致缓冲区满,那么生产者就必须等待消费者将数据消费完成之后才能继续生产数据;同理若消费者的速度大于生产者,那么消费者就会经常处于等待状态,为了达到生产者和消费者在生产数据和消费数据之间的平衡,那么就需要一个缓冲区用来存储生产者生产的数据,所以就引入了生产者-消费者模式
这里的缓冲区的作用就是为了平衡生产者和消费者的处理能力,起到一个数据缓存的作用,同时也达到了一个解耦的作用
3.3 代码实现
线程同步典型的案例即为生产者消费者模型,而借助条件变量来实现这一模型,是比较常见的一种方法。假定有两个线程,一个模拟生产者行为,一个模拟消费者行为。两个线程同时操作一个共享资源(一般称之为汇聚),生产向其中添加产品,消费者从中消费掉产品。
这里实现了一个简单的生产者消费者模型来说明条件变量的使用
情况一:单个消费者
typedef struct msg{ int data; struct msg *pnext; }msg; msg *head = NULL; //共享资源 void sys_err(int ret, char *str) { fprintf(stderr,"%s error:%s\n",str,strerror(ret)); exit(1); } pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//全局互斥锁 pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;//全局条件变量 void *fconsumer(void *arg) { int ret; msg *msg_consumer; while(1){ ret = pthread_mutex_lock(&mutex);//先加锁 if(ret != 0){ sys_err(ret,"consumer lock"); } //head为空,说明没有共享资源 //如果有资源则线程直接读即可,没有资源才进行阻塞等待条件变量 //if(head == NULL){ while(head == NULL){ ret = pthread_cond_wait(&has_product, &mutex); if(ret != 0){ sys_err(ret, "consumer wait"); } } msg_consumer = head; head = head->pnext;//模拟消费一个产品 ret = pthread_mutex_unlock(&mutex);//解锁 互斥量 if(ret != 0){ sys_err(ret, "consumer unlock"); } printf("--------consumer:msg_consumer read :%d\n",msg_consumer->data); free(msg_consumer); usleep(rand() % 500000 + 100000); } } void *fproducter(void *arg) { int ret; while(1){ //生产时,无需加锁 msg *msg_producter = malloc(sizeof(msg)); msg_producter->data = rand() % 100 + 1;//模拟生产一个产品 msg_producter->pnext = NULL; //向共享资源写,需要加锁 ret = pthread_mutex_lock(&mutex); if(ret != 0){ sys_err(ret, "producter lock"); } msg_producter->pnext = head; head = msg_producter; ret = pthread_mutex_unlock(&mutex); if(ret != 0){ sys_err(ret, "producter unlock"); } printf("====producter:msg_producter write = %d\n",msg_producter->data); //通知阻塞在条件变量上的线程(至少一个),将其唤醒 ret = pthread_cond_signal(&has_product); if(ret != 0){ sys_err(ret, "producter signal"); } usleep(rand() % 500000 + 100000); } } int main() { srand(time(NULL)); pthread_t tid_producter, tid_consumer; int ret = 0; //创建生产者线程 ret = pthread_create(&tid_producter, NULL, fproducter, NULL); if(ret != 0){ sys_err(ret, "pthread_create producter"); } //创建消费者线程 ret = pthread_create(&tid_consumer, NULL, fconsumer, NULL); if(ret != 0){ sys_err(ret, "pthread_create consumer"); } ret = pthread_join(tid_producter,NULL); if(ret != 0){ sys_err(ret, "pthread_join producter"); } ret = pthread_join(tid_consumer,NULL); if(ret != 0){ sys_err(ret, "pthread_join conmuser"); } return 0; }结果:
情况二:多个消费者
注意:当有多个消费者时,在消费者逻辑中使用if(head == NULL)判断会出现逻辑错误,如下:
这里就是一个虚假唤醒的错误,产生虚假唤醒的一般两种情况:
1、情况一:就是这里所提到的情况:生产者只生产了一个产品,但是却唤醒了多个消费者。
2、情况二:线程在没有收到任何pthread_cond_signal()或pthread_cond_broadcast()
信号的情况下,被操作系统自动唤醒的现象。
代码:
typedef struct msg{ int data; struct msg *pnext; }msg; msg *head = NULL; //共享资源 void sys_err(int ret, char *str) { fprintf(stderr,"%s error:%s\n",str,strerror(ret)); exit(1); } pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//全局互斥锁 pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;//全局条件变量 void *fconsumer(void *arg) { int ret; msg *msg_consumer; while(1){ ret = pthread_mutex_lock(&mutex);//先加锁 if(ret != 0){ sys_err(ret,"consumer lock"); } //head为空,说明没有共享资源 //如果有资源则线程直接读即可,没有资源才进行阻塞等待条件变量 //如果有多个消费者时,使用if判断会出现逻辑错误 if(head == NULL){ //while(head == NULL){ ret = pthread_cond_wait(&has_product, &mutex); if(ret != 0){ sys_err(ret, "consumer wait"); } } msg_consumer = head; head = head->pnext;//模拟消费一个产品 ret = pthread_mutex_unlock(&mutex);//解锁 互斥量 if(ret != 0){ sys_err(ret, "consumer unlock"); } printf("--------consumer:tid is %lu,msg_consumer read :%d\n",pthread_self(),msg_consumer->data); free(msg_consumer); usleep(rand() % 500000 + 100000); } } void *fproducter(void *arg) { int ret; while(1){ //生产时,无需加锁 msg *msg_producter = malloc(sizeof(msg)); msg_producter->data = rand() % 100 + 1;//模拟生产一个产品 msg_producter->pnext = NULL; //向共享资源写,需要加锁 ret = pthread_mutex_lock(&mutex); if(ret != 0){ sys_err(ret, "producter lock"); } msg_producter->pnext = head; head = msg_producter; ret = pthread_mutex_unlock(&mutex); if(ret != 0){ sys_err(ret, "producter unlock"); } printf("====producter:msg_producter write = %d\n",msg_producter->data); //通知阻塞在条件变量上的线程(至少一个),将其唤醒 //ret = pthread_cond_signal(&has_product); ret = pthread_cond_broadcast(&has_product); if(ret != 0){ sys_err(ret, "producter signal"); } usleep(rand() % 500000 + 100000); } } int main() { srand(time(NULL)); pthread_t tid_producter, tid_consumer1, tid_consumer2; int ret = 0; //创建生产者线程 ret = pthread_create(&tid_producter, NULL, fproducter, NULL); if(ret != 0){ sys_err(ret, "pthread_create producter"); } //创建消费者线程1 ret = pthread_create(&tid_consumer1, NULL, fconsumer, NULL); if(ret != 0){ sys_err(ret, "pthread_create consumer"); } //创建消费者线程2 ret = pthread_create(&tid_consumer2, NULL, fconsumer, NULL); if(ret != 0){ sys_err(ret, "pthread_create consumer"); } ret = pthread_join(tid_producter,NULL); if(ret != 0){ sys_err(ret, "pthread_join producter"); } ret = pthread_join(tid_consumer1,NULL); if(ret != 0){ sys_err(ret, "pthread_join conmuser"); } ret = pthread_join(tid_consumer2,NULL); if(ret != 0){ sys_err(ret, "pthread_join conmuser"); } return 0; }正确结果:
使用if(head == NULL)判断的错误情况结果:
这里如果使用if判断,则在模拟消费一个产品:head = head->pnext时,会对空指针head进行解引用操作,导致段错误(segmentation fault):