1.实验目的
1.掌握线程的概念及创建方法
2.掌握线程互斥锁及条件变量同步机制
3.掌握POSIX信号量机制
4.掌握生产者消费者原理
5.设计程序,两种方法实现多线程生产者消费者同步问题
2.实验截图及结果分析
(1)实验截图
(2)实验结果分析
本次实验通过使用条件变量和互斥锁以及Posix信号量两种方式实现生产者消费者同步问题,实验结果展示了程序在多线程环境下数据的正确生产与消费,体现了线程同步机制的有效性。
①条件变量和互斥锁实现方式
数据生产与消费顺序:从实验截图的输出结果来看,生产者1和生产者2交替向缓冲区写入数据,且写入位置和值符合预期。消费者1和消费者2也能按顺序从缓冲区读取数据,每次读取的数据值与生产者写入的值一致,这表明在条件变量和互斥锁的同步机制下,生产者和消费者线程之间的协作正常,数据的读写顺序正确,没有出现数据竞争或混乱的情况。
缓冲区满与空的处理:当缓冲区满时,生产者线程会根据条件判断进入阻塞等待状态,直到消费者从缓冲区取出数据,通过条件变量notfull唤醒生产者线程继续写入数据;当缓冲区空时,消费者线程会通过条件变量notempty等待,直到生产者写入数据后被唤醒读取数据。这说明条件变量和互斥锁有效地控制了缓冲区的状态,避免了缓冲区溢出和下溢的问题,保证了数据的安全性和完整性。
线程执行频率:在生产者1和生产者2以及消费者1和消费者2的代码中,均设置了每次操作后休眠1秒的机制,这使得线程的执行频率得到了有效控制,输出结果清晰有序,便于观察和分析线程的执行过程。
② Posix信号量实现方式
奇偶数据生产控制:生产者1按照预期只生产奇数,生产者2只生产偶数,这表明通过对n的奇偶判断以及信号量的同步机制,成功实现了对生产者生产数据的约束,保证了数据生产的正确性和特定性。
信号量同步效果:Posix信号量mutex、empty和full在控制线程同步方面发挥了重要作用。empty信号量控制了生产者向缓冲区写入数据的时机,只有当empty信号量的值大于0时,生产者才能写入数据,从而避免了缓冲区满时的写入操作;full信号量控制了消费者从缓冲区读取数据的时机,当full信号量的值大于0时,消费者才能读取数据,防止了缓冲区空时的读取操作;mutex信号量则保证了对缓冲区的访问是线程安全的,避免了多个线程同时访问缓冲区导致的数据错误。
线程协作稳定性:与条件变量和互斥锁实现方式类似,通过设置线程每次操作后的休眠时间,使得线程的执行过程清晰可见。在整个实验过程中,生产者和消费者线程能够稳定协作,没有出现线程死锁或数据错误的情况,证明了Posix信号量在解决生产者消费者同步问题上的可靠性。
③ 两种实现方式的对比
条件变量和互斥锁以及Posix信号量都能有效地解决生产者消费者同步问题,但在实现细节和适用场景上存在一定差异。条件变量和互斥锁的组合使用,通过对条件的判断和等待机制,更侧重于线程之间的条件同步;而Posix信号量则通过对信号量值的操作来控制线程的执行,更侧重于资源的计数和控制。在实际应用中,可根据具体需求和场景选择合适的同步机制。
3.实验程序
(1)采用条件变量和互斥锁解决生产者消费者问题
prodcons.c
#include <stdio.h> #include <pthread.h> #include <unistd.h> // 为了使用 sleep 函数 #define BUFSIZE 8 int n = 1, m = 1; struct prodcons { int buffer[BUFSIZE]; pthread_mutex_t lock; // 互斥LOCK int readpos, writepos; pthread_cond_t notempty; // 缓冲区非空条件判断 pthread_cond_t notfull; // 缓冲区未满条件判断 }; void init(struct prodcons *b) { pthread_mutex_init(&b->lock, NULL); pthread_cond_init(&b->notempty, NULL); pthread_cond_init(&b->notfull, NULL); b->readpos = 0; b->writepos = 0; } struct prodcons buffer; void *producer1(void *data) { while (1) { if (buffer.writepos % 2 == 0) { pthread_mutex_lock(&(buffer.lock)); // 等待缓冲区未满 if ((buffer.writepos + 1) % BUFSIZE == buffer.readpos) { // 缓冲区满,生产者将被挂起,直至重新被唤醒 pthread_cond_wait(&(buffer.notfull), &(buffer.lock)); } // 写数据,并移动指针 printf(" "); printf("producer1 start putting: put position is %d, put value is %d\n", buffer.writepos, n); buffer.buffer[buffer.writepos] = n; buffer.writepos++; if (buffer.writepos >= BUFSIZE) buffer.writepos = 0; // 设置缓冲区非空的条件变量 pthread_cond_signal(&(buffer.notempty)); pthread_mutex_unlock(&(buffer.lock)); n++; } sleep(1); // 生产者1每次操作后休眠1秒 } } void *producer2(void *data) { while (1) { if (buffer.writepos % 2 != 0) { pthread_mutex_lock(&(buffer.lock)); // 等待缓冲区未满 if ((buffer.writepos + 1) % BUFSIZE == buffer.readpos) { // 缓冲区满,生产者将被挂起,直至重新被唤醒 pthread_cond_wait(&(buffer.notfull), &(buffer.lock)); } // 写数据,并移动指针 printf(" "); printf("producer2 start putting: put position is %d, put value is %d\n", buffer.writepos, n); buffer.buffer[buffer.writepos] = n; buffer.writepos++; if (buffer.writepos >= BUFSIZE) buffer.writepos = 0; // 设置缓冲区非空的条件变量 pthread_cond_signal(&(buffer.notempty)); pthread_mutex_unlock(&(buffer.lock)); n++; } sleep(1); // 生产者2每次操作后休眠1秒 } } void *consumer1(void *data) { int d; while (1) { pthread_mutex_lock(&(buffer.lock)); if (buffer.writepos == buffer.readpos) { // 等待缓冲区非空 pthread_cond_wait(&(buffer.notempty), &(buffer.lock)); } // 读数据,移动读指针 d = buffer.buffer[buffer.readpos]; printf("first consumer: get position is %d, get value is %d\n", buffer.readpos, d); buffer.readpos++; if (buffer.readpos >= BUFSIZE) buffer.readpos = 0; // 设置缓冲区未满的条件变量 pthread_cond_signal(&(buffer.notfull)); pthread_mutex_unlock(&(buffer.lock)); sleep(1); // 消费者1每次操作后休眠1秒 } return NULL; } void *consumer2(void *data) { int d; while (1) { pthread_mutex_lock(&(buffer.lock)); if (buffer.writepos == buffer.readpos) { // 等待缓冲区非空 pthread_cond_wait(&(buffer.notempty), &(buffer.lock)); } // 读数据,移动读指针 d = buffer.buffer[buffer.readpos]; printf("second consumer: get position is %d, get value is %d\n", buffer.readpos, d); buffer.readpos++; if (buffer.readpos >= BUFSIZE) buffer.readpos = 0; // 设置缓冲区未满的条件变量 pthread_cond_signal(&(buffer.notfull)); pthread_mutex_unlock(&(buffer.lock)); sleep(1); // 消费者2每次操作后休眠1秒 } return NULL; } int main(void) { pthread_t th_a, th_b, th_c, th_d; void *retval; init(&buffer); pthread_create(&th_a, NULL, producer1, 0); pthread_create(&th_b, NULL, producer2, 0); pthread_create(&th_c, NULL, consumer1, 0); pthread_create(&th_d, NULL, consumer2, 0); pthread_join(th_a, &retval); pthread_join(th_b, &retval); pthread_join(th_c, &retval); pthread_join(th_d, &retval); return 0; }(2)使用Posix信号量解决生产者消费者问题
prodcons-semwait.c
#include <stdio.h> #include <stdlib.h> #include <time.h> #include <sys/types.h> #include <pthread.h> #include <semaphore.h> #include <string.h> #include <unistd.h> #define BUFSIZE 8 sem_t mutex, empty, full; int n = 1, m = 1; struct prodcons { int buffer[BUFSIZE]; int readpos, writepos; }; void init(struct prodcons *b) { b->readpos = 0; b->writepos = 0; } struct prodcons buffer; // producer1 只生产奇数 void *producer1(void *arg) { while (1) { if (n % 2 != 0) { sem_wait(&empty); sem_wait(&mutex); printf("producer1 is putting: put pos is %d, put %d\n", buffer.writepos, n); buffer.buffer[buffer.writepos] = n; buffer.writepos++; if (buffer.writepos >= BUFSIZE) { buffer.writepos = 0; } n++; sem_post(&mutex); sem_post(&full); } sleep(1); // 控制生产速度 } return NULL; } // producer2 只生产偶数 void *producer2(void *arg) { while (1) { if (n % 2 == 0) { sem_wait(&empty); sem_wait(&mutex); printf("producer2 is putting: put pos is %d, put %d\n", buffer.writepos, n); buffer.buffer[buffer.writepos] = n; buffer.writepos++; if (buffer.writepos >= BUFSIZE) { buffer.writepos = 0; } n++; sem_post(&mutex); sem_post(&full); } sleep(1); // 控制生产速度 } return NULL; } void *consumer1(void *arg) { int data; while (1) { sem_wait(&full); sem_wait(&mutex); data = buffer.buffer[buffer.readpos]; printf("consumer1 is getting: get pos is %d, get %d\n", buffer.readpos, data); buffer.readpos++; if (buffer.readpos >= BUFSIZE) { buffer.readpos = 0; } sleep(1); // 控制消费速度 sem_post(&mutex); sem_post(&empty); } return NULL; } void *consumer2(void *arg) { int data; while (1) { sem_wait(&full); sem_wait(&mutex); data = buffer.buffer[buffer.readpos]; printf("consumer2 is getting: get pos is %d, get %d\n", buffer.readpos, data); buffer.readpos++; if (buffer.readpos >= BUFSIZE) { buffer.readpos = 0; } sleep(1); // 控制消费速度 sem_post(&mutex); sem_post(&empty); } return NULL; } int main(void) { pthread_t th_a, th_b, th_c, th_d; void *retval; init(&buffer); sem_init(&mutex, 0, 1); sem_init(&empty, 0, 8); sem_init(&full, 0, 0); pthread_create(&th_a, NULL, producer1, NULL); pthread_create(&th_b, NULL, producer2, NULL); pthread_create(&th_c, NULL, consumer1, NULL); pthread_create(&th_d, NULL, consumer2, NULL); pthread_join(th_a, &retval); pthread_join(th_b, &retval); pthread_join(th_c, &retval); pthread_join(th_d, &retval); sem_destroy(&mutex); sem_destroy(&empty); sem_destroy(&full); return 0; }