pthread(POXIS thread)
互斥锁和自旋锁
Mutex 想要使用 lock 去获得一个锁的时候,如果此时这个锁正被其他线程持有,那么使用 lock 的线程就会被阻塞,进行上下文切换,将线程放置到等待队列中。此时 CPU 可以被安排去做其他事情,等待锁的所有权空出来后再再次执行该线程。
Spin 想要使用 lock 获得一个锁的时候,如果此时无法获取到锁,会一直忙等待,直到获取该锁为止。
具体来说,一般获取锁的等待时间较长的时候使用 Mutex,获取锁的等待时间较短的时候使用 Spin。
条件变量
std::condition_variable
使用函数:
-
pthread_cond_init/destroy 类似于互斥锁和自旋锁
-
pthread_cond_wait
函数定义:
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex)
cond:指向条件变量的指针。
mutex:指向互斥锁的指针。
调用该函数之前,必须先获得互斥锁 mutex,在等待期间,互斥锁会自动释放,并使当前线程处于阻塞状态,直到条件变量被唤醒并且重新获得互斥锁为止。 -
pthread_cond_signal/broadcast
用于唤醒等待条件变量的一个线程。signal 如果有多个线程在等待条件变量,则会唤醒其中一个线程;broadcast 则是唤醒所有等待的线程。
如果没有线程在等待条件变量,则该函数不会产生任何效果。
阻塞式循环队列
上网抄个代码:
#include <unistd.h>
#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
static const int kItemRepositorySize = 10; // Item buffer size.
static const int kItemsToProduce = 1000; // How many items we plan to produce.
struct ItemRepository {
int item_buffer[kItemRepositorySize]; // 产品缓冲区,配合 read_position 和 write_position 模型环形队列.
size_t read_position; // 消费者读取产品位置.
size_t write_position; // 生产者写入产品位置.
std::mutex mtx; // 互斥量,保护产品缓冲区
std::condition_variable repo_not_full; // 条件变量,指示产品缓冲区不为满.
std::condition_variable repo_not_empty; // 条件变量,指示产品缓冲区不为空.
} gItemRepository; // 产品库全局变量,生产者和消费者操作该变量.
typedef struct ItemRepository ItemRepository;
void ProduceItem(ItemRepository *ir, int item)
{
std::unique_lock<std::mutex> lock(ir->mtx);
while(((ir->write_position + 1) % kItemRepositorySize)
== ir->read_position) { // item buffer is full, just wait here.
std::cout << "Producer is waiting for an empty slot...\n";
(ir->repo_not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生.
}
(ir->item_buffer)[ir->write_position] = item; // 写入产品.
(ir->write_position)++; // 写入位置后移.
if (ir->write_position == kItemRepositorySize) // 写入位置若是在队列最后则重新设置为初始位置.
ir->write_position = 0;
(ir->repo_not_empty).notify_all(); // 通知消费者产品库不为空.
lock.unlock(); // 解锁.
}
int ConsumeItem(ItemRepository *ir)
{
int data;
std::unique_lock<std::mutex> lock(ir->mtx);
// item buffer is empty, just wait here.
while(ir->write_position == ir->read_position) {
std::cout << "Consumer is waiting for items...\n";
(ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生.
}
data = (ir->item_buffer)[ir->read_position]; // 读取某一产品
(ir->read_position)++; // 读取位置后移
if (ir->read_position >= kItemRepositorySize) // 读取位置若移到最后,则重新置位.
ir->read_position = 0;
(ir->repo_not_full).notify_all(); // 通知消费者产品库不为满.
lock.unlock(); // 解锁.
return data; // 返回产品.
}
void ProducerTask() // 生产者任务
{
for (int i = 1; i <= kItemsToProduce; ++i) {
// sleep(1);
std::cout << "Produce the " << i << "^th item..." << std::endl;
ProduceItem(&gItemRepository, i); // 循环生产 kItemsToProduce 个产品.
}
}
void ConsumerTask() // 消费者任务
{
static int cnt = 0;
while(1) {
sleep(1);
int item = ConsumeItem(&gItemRepository); // 消费一个产品.
std::cout << "Consume the " << item << "^th item" << std::endl;
if (++cnt == kItemsToProduce) break; // 如果产品消费个数为 kItemsToProduce, 则退出.
}
}
void InitItemRepository(ItemRepository *ir)
{
ir->write_position = 0; // 初始化产品写入位置.
ir->read_position = 0; // 初始化产品读取位置.
}
int main()
{
InitItemRepository(&gItemRepository);
std::thread producer(ProducerTask); // 创建生产者线程.
std::thread consumer(ConsumerTask); // 创建消费之线程.
producer.join();
consumer.join();
}
上述代码是单生产者和单消费者模型,多生产者多消费者模型只需要增加两个计数器用来计算当前已经生产/消费了多少个物品。
无锁循环队列
内核实现了一个无锁循环队列 kfifo
粘贴一下代码:
/**
* __kfifo_put - puts some data into the FIFO, no locking version
* @fifo: the fifo to be used.
* @buffer: the data to be added.
* @len: the length of the data to be added.
*
* This function copies at most @len bytes from the @buffer into
* the FIFO depending on the free space, and returns the number of
* bytes copied.
*
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
*/
unsigned int __kfifo_put(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
//计算写入空间:队列大小 - 写入位置 + 读取位置
len = min(len, fifo->size - fifo->in + fifo->out);
/*
* Ensure that we sample the fifo->out index -before- we
* start putting bytes into the kfifo.
*
* 内存屏障(全屏障)
*/
smp_mb();
/* first put the data starting from fifo->in to buffer end */
/* 从队列写入位置 (mod 队列大小) 写入到队列结尾处 */
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
/* then put the rest (if any) at the beginning of the buffer */
/* 从队列起始位置向后写入剩余数据 */
memcpy(fifo->buffer, buffer + l, len - l);
/*
* Ensure that we add the bytes to the kfifo -before-
* we update the fifo->in index.
*
* 内存屏障(写屏障)
*/
smp_wmb();
//单调递增写入位置
fifo->in += len;
return len;
}
/**
* __kfifo_get - gets some data from the FIFO, no locking version
* @fifo: the fifo to be used.
* @buffer: where the data must be copied.
* @len: the size of the destination buffer.
*
* This function copies at most @len bytes from the FIFO into the
* @buffer and returns the number of copied bytes.
*
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
*/
unsigned int __kfifo_get(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
//计算读取空间:写入位置 - 读取位置
len = min(len, fifo->in - fifo->out);
/*
* Ensure that we sample the fifo->in index -before- we
* start removing bytes from the kfifo.
*
* 内存屏障(读屏障)
*/
smp_rmb();
/* first get the data from fifo->out until the end of the buffer */
/* 从缓存读取位置 (mod 队列大小) 读取到缓存结尾处 */
l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
/* then get the rest (if any) from the beginning of the buffer */
/* 从缓存起始位置读取剩余数据 */
memcpy(buffer + l, fifo->buffer, len - l);
/*
* Ensure that we remove the bytes from the kfifo -before-
* we update the fifo->out index.
*
* 内存屏障(全屏障)
*/
smp_mb();
//单调递增读取位置
fifo->out += len;
return len;
}
shared_ptr
shared_ptr
本身不是一个线程安全的 STL,因此并发读写对应内存区域是不安全的。- 由于赋值操作涉及原内存释放、修改指针指向等多个修改操作,其过程不是原子操作,因此对
shared_ptr
进行并发赋值不是线程安全的。 - 对
shared_ptr
进行并发拷贝,对数据指针和控制块指针仅进行读取并复制,然后对引用计数进行递增,而引用计数增加是原子操作。因此是线程安全的。
#include<iostream>
#include<atomic>
using namespace std;
class Counter
{
public:
Counter()
{
count = 1;
}
void add()
{
count++;
}
void sub()
{
count--;
}
int get() const
{
return count;
}
private:
std::atomic<int> count;
};
template <typename T>
class Sp
{
public:
Sp(); //默认构造函数
Sp(T *ptr); //参数构造函数
Sp(const Sp &obj); //复制构造函数
~Sp(); //析构函数
Sp &operator=(const Sp &obj); //重载=
T *get(); //得到共享指针指向的类
int getcount(); //得到引用计数器
private:
T *my_ptr; //共享指针所指向的对象
Counter* counter; //引用计数器
void clear(); //清理函数
};
//默认构造函数,参数为空,构造一个引用计数器
template<typename T>
Sp<T>::Sp()
{
my_ptr = nullptr;
counter = new Counter();
counter->add();
}
//复制构造函数,新的共享指针指向旧的共享指针所指对象
template<typename T>
Sp<T>::Sp(const Sp &obj)
{
//将所指对象也变为目标所指的对象
my_ptr = obj.my_ptr;
//获取引用计数器,使得两个共享指针用一个引用计数器
counter = obj.counter;
//使这个对象的引用计数器 +1
counter->add();
};
//重载=
template<typename T>
Sp<T> &Sp<T>::operator=(const Sp&obj)
{
//清理当前所引用对象和引用计数器
clear();
//指向新的对象,并获取目标对象的引用计数器
my_ptr = obj.my_ptr;
counter = obj.counter;
//引用计数器 +1
counter->add();
//返回自己
return *this;
}
//创建一个共享指针指向目标类,构造一个新的引用计数器
template<typename T>
Sp<T>::Sp(T *ptr)
{
my_ptr = ptr;
counter = new Counter();
}
//析构函数,出作用域的时候,调用清理函数
template<typename T>
Sp<T>:: ~Sp()
{
clear();
}
//清理函数,调用时将引用计数器的值减 1,若减为 0,清理指向的对象内存区域
template<typename T>
void Sp<T>::clear()
{
//引用计数器 -1
counter->sub();
//如果引用计数器变为 0,清理对象
if(0 == counter->get())
{
if(my_ptr)
{
delete my_ptr;
}
delete counter;
}
}
//当前共享指针指向的对象,被几个共享指针所引用
template<typename T>
int Sp<T>::getcount()
{
return counter->get();
}
template<typename T>
T * sp<T>::get()
{
return my_ptr;
}