- 信号量原理
- 信号量的概念
- 信号量函数
- 二元信号量模拟实现互斥功能
- 基于环形队列的生产消费模型
- 空间资源和消费数据资源
- 生产者和消费者申请和释放资源
- 必须遵守两个规则
- 代码模拟
- 单生产单消费
- 多生产多消费
- 为什么用信号量
信号量原理
- 通常我们将被多个执行流访问的资源叫做临界资源,临界资源需要进行保护,否则会出现数据不一致的问题
- 当我们用互斥锁进行保护这块临界资源时,相当于我们将这块临界资源看成了整体,同一时刻只允许一个执行流访问。
- 但实际我们可以将这块临界资源再次分割成多个区域,当多个执行流需要访问临界资源时,如果这些执行流访问的是临界资源的不同区域,那么我可以让这些执行流同时访问临界资源的不同区域,此时不会出现数据不一致等问题。
信号量的概念
信号量(信号塔)本质就是一个计数器,是描述临界资源中资源的数目的,信号量能够更细粒度的对临界资源进行管理。
每个执行流进入临界区时,都会提前申请信号量,申请成功后就有了操作特定临界资源的权限,当操作完毕后,就会释放对应的信号量,同样,如果申请失败也会被挂起。
信号量PV操作:
- P操作:我们将申请信号量的方式称为P操作,本质就是申请获得临界资源中某块资源的使用权限,申请成功后,临界资源中的资源数目减1
- V操作:我们将释放信号量的方式称为V操作,本质就是归还临界资源中某块资源的使用权限,归还成功后,临界资源的资源数目加1.
PV操作必须是原子操作
多个执行流为了访问临界资源会竞争式的申请信号量,因此信号量是会被多个执行流同时访问的,也就是说信号量本质也是临界资源。
但信号量本质就是用于保护临界资源的,我们不可能再用信号量去保护信号量,所以信号量的PV操作必须是原子操作。
申请信号量失败被挂起等待
当执行流就行申请信号量失败时,也就是说临界资源已经全部申请空了,此时该执行流就会在对应的信号量的等待队列中当中进行等待,直到有信号量被释放时再被唤醒。
注意: 信号量的本质是计数器,但不意味着只有计数器,信号量还包括一个等待队列。
信号量函数
初始化信号量
初始化信号量的函数叫做sem_init,该函数的函数原型如下:
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数说明:
- sem:需要初始化的信号量
- pshared:传入0值表示线程间共享,传入非零值表示进程间共享
- value:信号量的初始值(计数器的初始值)
返回值说明:
- 初始化信号量成功返回0,失败返回-1
注意:POSIX信号量和System V信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,但POSIX信号量可以用于线程间同步。
销毁信号量
int sem_destroy(sem_t *sem);
参数说明:
- sem:需要销毁的信号量。
返回值说明:
- 销毁信号量成功返回0,失败返回-1.
等待信号量
int sem_wait(sem_t *sem);
参数说明:
- sem:需要等待的信号量
返回值说明:
- 等待信号量成功返回0,信号量的值减一。
- 等待信号量失败返回-1,信号量的值保持不变。
发布信号量(释放信号量)
int sem_post(sem_t *sem);
参数说明:
- sem:需要发布的信号量。
返回值说明:
- 发布信号量成功返回0,信号量的值加一。
- 发布信号量失败返回-1,信号量的值保持不变。
二元信号量模拟实现互斥功能
信号量本质是一个计数器,如果将信号量的初始值设置为 1 ,那么此时该信号量叫做二元信号量。
信号量的初始值为1,说明信号量所描述的临界资源只有一份,此时信号量的作用基本等价于互斥锁。
例如,下面我们还是用以前的多线程抢票系统,其中我们用二元信号量模拟实现多线程互斥。
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
int tickets = 2000;
void* TicketGrabbing(void* arg)
{
std::string name = (char*)arg;
while (true){
if (tickets > 0){
usleep(1000);
std::cout << name << " get a ticket, tickets left: " << --tickets << std::endl;
}
else{
break;
}
}
std::cout << name << " quit..." << std::endl;
pthread_exit((void*)0);
}
int main()
{
pthread_t tid1, tid2, tid3, tid4;
pthread_create(&tid1, nullptr, TicketGrabbing, (void*)"thread 1");
pthread_create(&tid2, nullptr, TicketGrabbing, (void*)"thread 2");
pthread_create(&tid3, nullptr, TicketGrabbing, (void*)"thread 3");
pthread_create(&tid4, nullptr, TicketGrabbing, (void*)"thread 4");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
pthread_join(tid4, nullptr);
return 0;
}
运行代码后可以看到,线程打印输出剩余票数时出现了票数剩余为负数的情况,这是不符合我们预期的。
此时,我们在代码中加入二元信号量,让每一个线程在进入临界资源的时候都进行申请信号量。此时因为信号量只有 1 个,所以自燃就形成了互斥效果
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
class Sem{
public:
Sem(int num)
{
sem_init(&_sem, 0, num);
}
~Sem()
{
sem_destroy(&_sem);
}
void P()
{
sem_wait(&_sem);
}
void V()
{
sem_post(&_sem);
}
private:
sem_t _sem;
};
Sem sem(1); //二元信号量
int tickets = 2000;
void* TicketGrabbing(void* arg)
{
std::string name = (char*)arg;
while (true){
sem.P();
if (tickets > 0){
usleep(1000);
std::cout << name << " get a ticket, tickets left: " << --tickets << std::endl;
sem.V();
usleep(2000);
}
else{
sem.V();
break;
}
}
std::cout << name << " quit..." << std::endl;
pthread_exit((void*)0);
}
int main()
{
pthread_t tid1, tid2, tid3, tid4;
pthread_create(&tid1, nullptr, TicketGrabbing, (void*)"thread 1");
pthread_create(&tid2, nullptr, TicketGrabbing, (void*)"thread 2");
pthread_create(&tid3, nullptr, TicketGrabbing, (void*)"thread 3");
pthread_create(&tid4, nullptr, TicketGrabbing, (void*)"thread 4");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
pthread_join(tid4, nullptr);
return 0;
}
基于环形队列的生产消费模型
空间资源和消费数据资源
在这里,生产者关注的是空间资源,而消费者关注的是数据资源
因为他们关注的资源不同,所以我们可以使用两种不同的信号量来代表他们
- 生产者关注的是环形队列中是否有空间(blank),只要有空间,生产者就可以进行生产
- 消费者关注的是环形队列当中是否有数据(data),只要有数据消费者就可以进行消费。
blank_sem(生产者)和data_sum(消费者)的初始值设置
对于生产者来说,生产者每次生产数据前都需要申请blank_sem:
- 如果blank_sem的值不为0,则信号量申请成功,此时生产者可以进行生产操作。
- 如果blank_sem的值为0,说明信号量空间不足,此时会被进行阻塞等待,知道环形队列当中有新的空间后再被唤醒。
对于消费者来说,我们每次申请的为data_sem
- data_sem的初始值我们应该设置为0,因为刚开始时环形队列当中没有数据。
生产者和消费者申请和释放资源
当生产者生产完数据后,应该释放的为data_sem:
- 这里要注意,当对blank_sem进行了p操作后,进行释放时,应该为用户的信号量data_sem
- 因为当生产者生产完数据后,意味着环形队列当中多了一个data位置
消费者申请数据资源,释放空间资源
- 这里要注意,当对data_sem进行了p操作后,进行释放时,应该为用户的信号量blank_sem
- 因为当消费者用完数据后,意味着环形队列当中多了一个生产者位置、
必须遵守两个规则
第一个规则:生产者和消费者不能对同一个位置进行访问。
生产者和消费者在访问环形队列时:
- 如果生产者和消费者访问的是环形队列当中的同一个位置,那么此时生产者和消费者就相当于同时对这一块临界资源进行了访问,这当然是不允许的。
- 而如果生产者和消费者访问的是环形队列当中的不同位置,那么此时生产者和消费者是可以同时进行生产和消费的,此时不会出现数据不一致等问题。
第二个规则:无论是生产者还是消费者,都不应该将对方套一个圈以上。
- 生产者从消费者的位置开始一直按顺时针方向进行生产,如果生产者生产的速度比消费者消费的速度快,那么当生产者绕着消费者生产了一圈数据后再次遇到消费者,此时生产者就不应该再继续生产了,因为再生产就会覆盖还未被消费者消费的数据。
- 同理,消费者从生产者的位置开始一直按顺时针方向进行消费,如果消费者消费的速度比生产者生产的速度快,那么当消费者绕着生产者消费了一圈数据后再次遇到生产者,此时消费者就不应该再继续消费了,因为再消费就会消费到缓冲区中保存的废弃数据。
代码模拟
逻辑实现块RingQueue
单生产单消费
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
#define NUM 8
template <class T>
class RingQueue
{
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
public:
RingQueue(int cap = NUM) : _cap(cap), _p_pos(0), _c_pos(0)
{
_q.resize(_cap);
sem_init(&_blank_sem, 0, _cap); // blank_sem初始值设置为环形队列的容量
sem_init(&_data_sem, 0, 0); // data_sem初始值设置为0
}
// 向环形队列插入数据(生产者调用)
void push(const T &data)
{
// 1. 可以不用在临界区内部做判断,就可以知道临界资源的使用情况
// 2. 什么时候用锁,什么时候用sem?
// 一定要先申请座位,然后再去考虑上锁,别有了锁没有资源,此时会存在死锁
P(_blank_sem); // P()
// 一定有对应的空间资源给我!不用做判断,是哪一个呢?
_q[_p_pos++] = data;
_p_pos %= _cap;
std::cout << "给用户信号量" << std::endl;
V(_data_sem);
}
// 从环形队列获取数据(消费者调用)
void pop(T *out)
{
P(_data_sem);
*out = _q[_c_pos++];
_c_pos %= _cap;
std::cout << "给生产商信号量" << std::endl;
V(_blank_sem);
}
~RingQueue()
{
sem_destroy(&_blank_sem);
sem_destroy(&_data_sem);
}
private:
std::vector<T> _q; // 环形队列
int _cap; // 环形队列的容量上限
int _p_pos; // 生产者位置
int _c_pos; // 消费者位置
sem_t _blank_sem; // 描述空间资源
sem_t _data_sem; // 描述消费数据资源
};
计算块Task
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
class Task
{
public:
Task()
{
}
Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
{
}
void operator()()
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if (_y == 0)
_exitCode = -1;
else
_result = _x / _y;
}
break;
case '%':
{
if (_y == 0)
_exitCode = -2;
else
_result = _x % _y;
}
break;
default:
break;
}
usleep(100000);
}
std::string formatArg()
{
return std::to_string(_x) + _op + std::to_string(_y) + "= ?";
}
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
}
~Task()
{
}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
主函数 main
#include "RingQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <pthread.h>
#include <memory>
#include <sys/types.h>
#include <unistd.h>
#include <cstring>
using namespace std;
const char *ops = "+-*/%";
void *consumerRoutine(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
sleep(1);
Task t;
rq->pop(&t);
t();
cout << "consumer done, 处理完成的任务是: " << t.formatRes() << endl;
}
}
void *productorRoutine(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
sleep(1);
int x = rand() % 100;
int y = rand() % 100;
char op = ops[(x + y) % strlen(ops)];
Task t(x, y, op);
rq->push(t);
cout << "productor done, 生产的任务是: " << t.formatArg() << endl;
}
cout << "程序结束 " << endl;
}
int main()
{
srand(time(nullptr) ^ getpid());
RingQueue<Task> *rq = new RingQueue<Task>();
// 单生产单消费
pthread_t c, p;
pthread_create(&c, nullptr, consumerRoutine, rq);
pthread_create(&p, nullptr, productorRoutine, rq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete rq;
return 0;
}
多生产多消费
当我们进行多生产,多消费的时候,我们这时就需要上锁了,因为这时有多个线程进行同时消费和生产,所以这时我们需要对我们的代码进行追加两把锁
我们要先进性申请资源,后申请锁,因为如果先申请锁的画,如果这时的信号处于满的状态,那么此时我们的锁就会一直卡的,当其他线程进行申请时,就会一直处于堆积的状态
逻辑实现块
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
#define NUM 8
template <class T>
class RingQueue
{
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
void Lock(pthread_mutex_t &m)
{
pthread_mutex_lock(&m);
}
void Unlock(pthread_mutex_t &m)
{
pthread_mutex_unlock(&m);
}
public:
RingQueue(int cap = NUM) : _cap(cap), _p_pos(0), _c_pos(0)
{
_q.resize(_cap);
sem_init(&_blank_sem, 0, _cap); // blank_sem初始值设置为环形队列的容量
sem_init(&_data_sem, 0, 0); // data_sem初始值设置为0
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
// 向环形队列插入数据(生产者调用)
void push(const T &data)
{
// 1. 可以不用在临界区内部做判断,就可以知道临界资源的使用情况
// 2. 什么时候用锁,什么时候用sem?
// 一定要先申请座位,然后再去考虑上锁,别有了锁没有资源,此时会存在死锁
P(_blank_sem); // P()
Lock(_p_mutex); //? 1
// 一定有对应的空间资源给我!不用做判断,是哪一个呢?
_q[_p_pos++] = data;
_p_pos %= _cap;
std::cout << "给用户信号量" << std::endl;
Unlock(_p_mutex);
V(_data_sem);
}
// 从环形队列获取数据(消费者调用)
void pop(T *out)
{
P(_data_sem);
Lock(_c_mutex); //?
*out = _q[_c_pos++];
_c_pos %= _cap;
std::cout << "给生产商信号量" << std::endl;
Unlock(_c_mutex);
V(_blank_sem);
}
~RingQueue()
{
sem_destroy(&_blank_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
std::vector<T> _q; // 环形队列
int _cap; // 环形队列的容量上限
int _p_pos; // 生产者位置
int _c_pos; // 消费者位置
sem_t _blank_sem; // 描述空间资源
sem_t _data_sem; // 描述消费数据资源
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;
};
主函数文章来源:https://uudwc.com/A/PmvBb
int main()
{
srand(time(nullptr) ^ getpid());
RingQueue<Task> *rq = new RingQueue<Task>();
// 单生产单消费
// pthread_t c, p;
// pthread_create(&c, nullptr, consumerRoutine, rq);
// pthread_create(&p, nullptr, productorRoutine, rq);
// pthread_join(c, nullptr);
// pthread_join(p, nullptr);
//多生产多消费
pthread_t c[3],p[2];
for(int i = 0;i < 3;i++)
{
pthread_create(c + i,nullptr,consumerRoutine,rq);
}
for(int i = 0;i < 2;i++)
{
pthread_create(p + i,nullptr,productorRoutine,rq);
}
for(int i = 0;i < 3;i++)
{
pthread_join(c[i],nullptr);
}
for(int i = 2;i < 2;i++)
{
pthread_join(p[i],nullptr);
}
delete rq;
return 0;
}
文章来源地址https://uudwc.com/A/PmvBb
为什么用信号量
- 可以不用在临界区内部做判断,就可以知道临界资源的使用情况
- 社么时候用锁,什么时候用信号量
- 当临界资源被多个线程整体使用的时候就需要用到锁
- 当只有单个线程时,这时就不需要在加锁了