Linux线程安全
Linux线程互斥
-
临界资源
多线程执行流共享的资源就叫做临界资源
-
临界区
每个线程内部,访问临界资源的代码块
-
互斥
任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,对临界资源起保护作用
-
原子性
不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
-
线程安全
-
多个线程同时操作临界资源而不会出现数据二义性
-
在线程中是否对临界资源进行了非原子操作
-
可重入/不可重入函数:多个执行流中是否可以同时进入函数运行而不会出现问题
- 线程安全的实现:
同步:临界资源的合理访问(时序可控)
互斥:临界资源同一时间访问
-
互斥实现
-
互斥锁(特点:保证安全不保证合理)
![](C:\Users\some yuan\Documents\笔记\linux系统\lock.png)
原理:
为了实现互斥锁操作,大多数体系结构都提供了swap和exchange指令,该指令的作用是吧寄存器和内存单元的数据相交换,由于只有一条指令。保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。
-
操作步骤:
-
定义互斥锁变量
// 声明 pthread_mutex_t mutex;
-
初始化互斥锁变量
// 静态分配 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER // 动态分配 int pthread_mutex_init (pthread_mutex_t *__mutex, const pthread_mutexattr_t *__mutexattr) 参数 _mutex: 锁变量 _mutexattr: NULL
-
加锁
int pthread_mutex_lock (pthread_mutex_t *__mutex) 返回值:成功返回0,失败返回错误码
可能遇到的情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
- 发起函数调用时,其他线程已经锁定互斥量。或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_lock调用会陷入阻塞(执行流被挂起), 等待互斥量解锁
-
解锁
int pthread_mutex_unlock (pthread_mutex_t *__mutex) 返回值:成功返回0,失败返回错误码
-
销毁互斥锁
int pthread_mutex_destroy(pthread_mutex_t *mutex)
注意:
- 使用PTHREAD_MUTEX_INITIALIZER初始化的互斥量不需要销毁
- 不要销毁一个已经加锁的互斥量
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
-
// 使用互斥实现抢票的线程安全 #include <stdio.h> #include <unistd.h> #include <pthread.h> int ticket = 100; pthread_mutex_t mutex; void *thr_start(void *arg) { char *id = (char *)arg; while (1) { pthread_mutex_lock(&mutex); if (ticket > 0) { usleep(1000); printf("%s sell ticket %d\n", id, ticket); ticket--; pthread_mutex_unlock(&mutex); } else { pthread_mutex_unlock(&mutex); break; } } return NULL; } int main() { pthread_t t1, t2, t3, t4; pthread_mutex_init(&mutex, NULL); pthread_create(&t1, NULL, thr_start, "thread 1"); pthread_create(&t2, NULL, thr_start, "thread 1"); pthread_create(&t3, NULL, thr_start, "thread 1"); pthread_create(&t4, NULL, thr_start, "thread 1"); pthread_join(t1, NULL); pthread_join(t2, NULL); pthread_join(t3, NULL); pthread_join(t4, NULL); pthread_mutex_destroy(&mutex); return 0; }
-
-
可重入与线程安全
-
线程安全: 多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或静态变量进行操作,并且没有锁保护的情况下,会出现该问题
-
重入: 同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入。一个函数在重入的情况下,运行结果不会出现任何不同或者问题,则该函数被称为可重入函数,否则,是不可重入函数
-
常见的线程不安全的情况
- 不保护共享变量的函数
- 函数状态随着被调用,状态发送变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数
-
常见线程安全的情况
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
- 类或者接口对于线程来说都是原子操作
- 多个线程之间的切换不会导致该接口的执行结果存在二义性
- 常见不可重入的情况
调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
可重入函数体内使用了静态的数据结构 - 常见可重入的情况
- 不使用全局变量或静态变量
- 不使用用malloc或者new开辟出的空间
- 不调用不可重入函数
- 不返回静态或全局数据,所有数据都有函数的调用者提供
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
- 可重入与线程安全联系
- 函数是可重入的,那就是线程安全的
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
- 可重入与线程安全区别
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。
-
-
死锁
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资
源而处于的一种永久等待状态产生的必要条件:
- 互斥条件 (一个资源每次只能被一个执行流使用)
- 不可剥夺条件 (一个执行流已获得的资源,在末使用完之前,不能强行剥夺)
- 请求与保持条件 (一个执行流因请求资源而阻塞时,对已获得的资源保持不放)
- 环路等待 (若干执行流之间形成一种头尾相接的循环等待资源的关系)
产生场景:加锁/ 解锁顺序不同
预防死锁:
- 破坏必要条件
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
-
Linux线程同步
-
条件变量
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
- 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量
-
同步与竞态
同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解
-
原因:
临界资源的合理性 — 生产者消费者模型
没有资源则等待(死等)生产资源后唤醒等待
-
条件变量的使用
-
声明条件变量
pthread_cond_t cond;
-
初始化条件变量
// 静态分配 pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 动态分配 int pthread_cond_init (pthread_cond_t *__restrict __cond, const pthread_condattr_t *__restrict __cond_attr) 参数: _restrict_cond: 条件变量 _restrict_cond_attr: NULL
-
等待/唤醒
// 条件变量需要添加互斥锁:因为条件变量本身只提供等待与唤醒的功能,具体什么时候等待 需要用户进行判断,这个临界资源的操作应该受保护,因此搭配互斥锁使用 int pthread_cond_wait(pthread_cond_t *restrict_cond, pthread_mutex_t *restrict_mutex); 参数: restrict_cond: 条件变量 restrict_mutex: 互斥锁 int pthread_cond_signal(pthread_cond_t *cond); 参数: cond: 条件变量 // pthread_cond_signal 唤醒至少一个等待的线程, 导致因为条件的判断是一个if语句 而造成唤醒多个等待线程。所以应该使用while循环判断
-
销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond); 参数: cond: 条件变量
-
eg:
// 生产者消费者模型 #include <pthread.h> #include <queue> #include <cstdio> class BlockQueue { private: std::queue<int> _queue; int _capacity; pthread_mutex_t _mutex; pthread_cond_t _consumer; pthread_cond_t _product; public: BlockQueue(int capacity = 10) : _capacity(capacity) { pthread_mutex_init(&_mutex, NULL); pthread_cond_init(&_consumer, NULL); pthread_cond_init(&_product, NULL); } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_consumer); pthread_cond_destroy(&_product); } // push bool Push(const int data); // pop bool Pop(int *data); private: bool mutexLock() { pthread_mutex_lock(&_mutex); return true; } bool mutexUnLock() { pthread_mutex_unlock(&_mutex); return true; } bool notifyProduct() { pthread_cond_signal(&_product); return true; } bool waitProduct() { pthread_cond_wait(&_product, &_mutex); return true; } bool notifyConsumer() { pthread_cond_signal(&_consumer); return true; } bool waitConsumer() { pthread_cond_wait(&_consumer, &_mutex); return true; } bool isEmpty() { return _queue.empty(); } bool isFull() { return _queue.size() == _capacity; } }; // push bool BlockQueue::Push(const int data) { mutexLock(); while (isFull()) { waitProduct(); } _queue.push(data); notifyConsumer(); mutexUnLock(); return true; } // pop bool BlockQueue::Pop(int *data) { mutexLock(); while (isEmpty()) { waitConsumer(); } *data = _queue.front(); _queue.pop(); notifyProduct(); mutexUnLock(); return true; } void *productStart(void *arg) { int data = 1; BlockQueue *q = (BlockQueue *)arg; while (1) { q->Push(data++); printf("tid=%lu product: %d\n", pthread_self(), data); } return NULL; } void *consumerStart(void *arg) { BlockQueue *q = (BlockQueue *)arg; int data; while (1) { q->Pop(&data); printf("tid=%lu consumer: %d\n", pthread_self(), data); } } int main() { BlockQueue blockQueue; pthread_t tidProduct[4]; pthread_t tidConsumer[4]; for (int i = 0; i < 4; i++) { pthread_create(&tidProduct[i], NULL, productStart, &blockQueue); } for (int i = 0; i < 4; i++) { pthread_create(&tidConsumer[i], NULL, consumerStart, &blockQueue); } for (int i = 0; i < 4; i++) { pthread_join(tidProduct[i], NULL); } for (int i = 0; i < 4; i++) { pthread_join(tidConsumer[i], NULL); } return 0; }
pthread_cond_wait
需要互斥量的原因:- 条件变量是实现同步的手段, 条件变量的改变,是产生在多线程间的。那么就需要发生对共享变量的更改,来触发条件变量的改变
- 既然修改了共享变量必然会涉及到竞态,所以需要互斥锁
注意:
pthread_cond_wait分为三步 // 就有可能没来得及挂起就已经有人唤醒, 导致该唤醒信号没有在等待线程挂起的时候唤醒。以至于发送死锁。
解锁 --> 休眠 --> 被唤醒后加锁
-
-
POSIX信号量
POSIX信号量和System V 信号量作用相同,都是用于实现同步,达到合理访问共享资源,但是POSIX是可以用于线程间同步的
根据信号量取值(代表可用资源的数目)的不同, POSIX 信号量还可以分为:
- 二值信号量 :信号量的值只有 0 和 1 ,这和互斥量很类型,若资源被锁住,信号量的值为 0 ,若资源可用,则信号量的值为 1 ;
- 计数信号量 :信号量的值在 0 到一个大于 1 的限制值( POSIX 指出系统的最大限制值至少要为 32767 )。该计数表示可用的资源的个数。
-
声明信号量
sem_t sem;
-
初始化信号量
int sem_init (sem_t *__sem, int __pshared, unsigned int __value) _sem: 信号量 _pshared: 0表示线程间共享,非零表示进程间共享 value: 信号量初始值
-
销毁信号量
int sem-destroy(sem_t* sem)
-
等待信号量
int sem_wait(sem_t* sem)
该操作会检查信号量的值,如果其值小于或等于0 ,那就阻塞,知道该值变成大于 0 ,然后等待进程将信号量的值减 1 ,进程获得共享资源的访问权限。这整个操作必须是一个原子操作。该操作还经常被称为 P 操作(荷兰语 Proberen ,意为:尝试)。
-
发布信号量
int sem_post(sem_t *sem)
该操作将信号量的值加 1 ,如果有进程阻塞着等待该信号量,那么其中一个进程将被唤醒。该操作也必须是一个原子操作。该操作还经常被称为 V 操作(荷兰语 Verhogen ,意为:增加)
-
eg
// 使用信号量完成同步 #include <cstdio> #include <vector> #include <semaphore.h> #include <iostream> using std::vector; class RingQueue { private: vector<int> _queue; int _capacity; int _head; int _tail; sem_t _size; // 表示可读资源的数量 sem_t _residue; // 表示可写资源的数量 sem_t _lock; public: RingQueue(int capacity = 10) : _capacity(capacity), _queue(capacity), _head(0), _tail(0) { sem_init(&_size, 0, 0); sem_init(&_residue, 0, capacity); sem_init(&_lock, 0, 1); } ~RingQueue() { sem_destroy(&_size); sem_destroy(&_residue); _queue.~vector(); } // 写入 void Push(const int &data); // 读取 void Pop(int *data); }; void RingQueue::Push(const int &data) { // 判断是否有资源能够写入 sem_wait(&_residue); // 加锁 sem_wait(&_lock); _queue[_tail] = data; _tail = (_tail + 1) % _capacity; // 解锁 sem_post(&_lock); sem_post(&_size); } void RingQueue::Pop(int *data) { // 判断是否有资源读取 sem_wait(&_size); // 加锁 sem_wait(&_lock); *data = _queue[_head]; _head = (_head + 1) % _capacity; // 解锁 sem_post(&_lock); sem_post(&_residue); } void *thr_consumer(void *arg) { RingQueue *q = (RingQueue*)arg; while(1) { int data; q->Pop(&data); std::cout<<"consumer thread get data:"<<data<<std::endl; } return NULL; } void *thr_productor(void *arg) { RingQueue *q = (RingQueue*)arg; int i = 0; while(1) { q->Push(i); std::cout<<"productor thread put data:"<<i<<std::endl; i++; } return NULL; } int main (int argc, char *argv[]) { pthread_t ctid[4], ptid[4]; int ret, i; RingQueue q; for (i = 0; i < 4; i++) { ret = pthread_create(&ctid[i], NULL, thr_consumer, (void*)&q); if (ret != 0) { std::cout<<"thread create error\n"; return -1; } } for (i = 0; i < 4; i++) { ret = pthread_create(&ptid[i], NULL, thr_productor, (void*)&q); if (ret != 0) { std::cout<<"thread create error\n"; return -1; } } for (i = 0; i < 4; i++) { pthread_join(ctid[i], NULL); } for (i = 0; i < 4; i++) { pthread_join(ptid[i], NULL); } return 0; }
上一篇: 工作中常用SQL代码整理
下一篇: 自定义loading加载