文章目录
多线程
多线程主要是
线程的函数,创建、传参
互斥锁
条件变量
生产消费者模型
源自变量
死锁
*线程异步操作
std::thread
c++提供的线程类std::thread
基于这个类创建一个新的线程非常简单,只需要提供线程函数或者函数对象即可,并且可以同时指定线程函数的参数。
// ①
thread() noexcept;
// ②
thread( thread&& other ) noexcept;
// ③
template< class Function, class... Args >
explicit thread( Function&& f, Args&&... args );
// ④
thread( const thread& ) = delete;
- 构造函数1:默认构造函数,构造一个线程对象,在这个线程中不执行任何处理动作
- 构造函数2:移动构造函数,将other的线程所有权转移给新的thread对象。之后other不在表示执行线程。
- 执行函数3:创建线程对象,并在该线程中之形函数f中的业务逻辑,args是要传递给函数f的参数
void func(int num, string str)
{
for (size_t i = 0; i < 10; i++)
{
cout << "子线程 " << i << " num :" << num << " str " << str << endl;
}
}
void do_some_work()
{
for (size_t i = 0; i < 5; i++)
{
cout << "work " << i << endl;
}
}
void test0()
{
//创建线程1,传入参数
thread t(func, 520, "i love you");
//创建线程2,无参
std::thread my_tread(do_some_work);
//获取线程id
cout << " t " << t.get_id() << endl;
cout << " t1 " << my_tread.get_id() << endl;
//等待线程终止
t.join();
my_tread.join();
}
detach
线程分离函数detach
,在线程分离之后,主线程退出也会一并销毁创建出的所有子线程,在主线程退出之前,它可以脱离主线程继续独立的运行,任务执行完毕之后,这个子线程会自动释放自己占用的系统资源。
void detach();
joinable
判断主线程和子线程是否处于关联状态,一般情况下两者处于关联状态,该函数返回一个布尔类型,但是调用detach后线程就不关联了
operator =
线程中的资源是不能被复制的,因此通过=操作符进行赋值操作最终并不会得到两个完全相同的对象。
// move (1)
thread& operator= (thread&& other) noexcept;
// copy [deleted] (2)
thread& operator= (const other&) = delete;
通过上面=操作符重载声明可以得知:
- 如果 other 是一个右值,会进行资源所有权的转移
- 如果 other 不是右值,禁止拷贝,该函数被显示删除(=delete),不可用
静态函数
thread线程类还提供了一个静态方法,计算当前计算机的cpu核心数,根据这个可以在程序中创建出数量相等的线程,每个线程独占一个cpu核心,这些线程就不用分时复用cpu时间片,此时程序并发效率是最高的
#include <iostream>
#include <thread>
using namespace std;
int main()
{
int num = thread::hardware_concurrency();
cout << "CPU number: " << num << endl;
}
线程同步
进行多线程编程,如果需要多个线程对一块内存操作,比如:同时读、同时写、同时读写对于后两种情况来说,如果不做任何的认为干涉就会出现各种各样的错误数据(比如变量自加一,首先从内存中取出变量放入寄存器,寄存器自加一,再从寄存器放回内存)
为了解决多线程数据混乱的方案就是进行线程同步,最常用的就是互斥锁,在c++11中提供了四种互斥锁
std::mutex
独占的互斥锁,不能递归使用std::timed_mutex
带超时的独占互斥锁,不能递归使用std::recursive_mutex
递归互斥锁,不带超时功能std::recursive_timed_mutex
带超时的递归锁
也有称作为互斥量,两者是一个东西
成员函数
lock
用于给临界区加锁,并且只能有一个线程获得锁的所有权。他有阻塞线程的作用
void lock();
try_lock
也可以对临界区进行加锁
区别是
二者的区别在于 try_lock() 不会阻塞线程,lock() 会阻塞线程:
- 如果互斥锁是未锁定状态,得到了互斥锁所有权并加锁成功,函数返回 true
- 如果互斥锁是锁定状态,无法得到互斥锁所有权加锁失败,函数返回 false
unlock
对被锁定的锁解锁,但是-只有拥有互斥锁所有权的线程,也就是对互斥锁上锁的线程才能对其解锁,其他线程是没有权限做这件事情的
代码
int g_num = 0;
mutex g_num_mutex;
void slow_increment(int id)
{
for (size_t i = 0; i < 3; i++)
{
g_num_mutex.lock();
g_num++;
cout << id << " - >" << g_num << endl;
g_num_mutex.unlock();
this_thread::sleep_for(chrono::seconds(1));
}
}
/*
1.在所有线程的任务函数执行完毕之前互斥锁对象不能被析构,一定要在程序中保证这个对象的可用性
2.互斥锁的个数和共享资源的个数相等,也即是说每一个共享资源都应该对应一个互斥锁对象。互斥锁对象的个数和线程的个数没有关系
*/
void test0()
{
thread t1(slow_increment, 0);
thread t2(slow_increment, 1);
t1.join();
t2.join();
}
std::lock_guard
lock_guard是c++11新增的一个模板类,使用这个类可以简化互斥锁lock和unlock的写法,同时也更加安全。这个模板类的定义和常用的构造函数原型如下:
// 类的定义,定义于头文件 <mutex>
template< class Mutex >
class lock_guard;
// 常用构造函数
explicit lock_guard( mutex_type& m );
lock_guard
在使用上面提供的这个构造函数构造对象时,会自动锁定互斥量,而在退出作用域后进行析构时就会自动解锁,从而保证了互斥量的正确操作,避免忘记 unlock() 操作而导致线程死锁。lock_guard 使用了 RAII 技术,就是在类构造函数中分配资源,在析构函数中释放资源,保证资源出了作用域就释放。
//由于lock_guard是独占锁,所以递归调用会导致死锁
struct Calculate
{
Calculate() : m_i(6){}
void mul(int x)
{
cout << "mul" << endl;
lock_guard<mutex> locker(m_mutex);
m_i *= x;
/*
1.使用递归锁的场景往往都是可以简化的,使用递归互斥锁很容易放纵复杂逻辑的产生,从而导致bug的产生
2. 递归互斥锁比非递归互斥锁效率低一点
3. 递归互斥锁虽然允许同一线程获得同一互斥锁的所有权,但是最大次数并未具体说明,一旦超过一定次数,就会抛出std::system错误
*/
void test2()
{
//Calculate cal;
Calculate1 cal;
cal.both(6, 3);
}
}
void div(int x){
cout << "div" << endl;
lock_guard<mutex> locker(m_mutex);
m_i /= x;
}
void both(int x, int y)
{
cout << "both" << endl;
lock_guard<mutex> locker(m_mutex);
mul(x);
div(y);
}
int m_i;
mutex m_mutex;
};
void test2()
{
Calculate cal;
cal.both(6, 3);
}
std::recursive_mutex
递归互斥锁允许统一鲜橙多次获得互斥锁,可以用来解决同一线程多次获取互斥量时死锁的问题
//std::recursive_mutex允许统一线程多次获得互斥锁
struct Calculate1
{
Calculate1() : m_i(6){}
void mul(int x)
{
cout << "mul" << endl;
lock_guard<recursive_mutex> locker(m_mutex);
m_i *= x;
}
void div(int x){
cout << "div" << endl;
lock_guard<recursive_mutex> locker(m_mutex);
m_i /= x;
}
void both(int x, int y)
{
cout << "both" << endl;
lock_guard<recursive_mutex> locker(m_mutex);
mul(x);
div(y);
}
int m_i;
recursive_mutex m_mutex;
};
/*
1.使用递归锁的场景往往都是可以简化的,使用递归互斥锁很容易放纵复杂逻辑的产生,从而导致bug的产生
2. 递归互斥锁比非递归互斥锁效率低一点
3. 递归互斥锁虽然允许同一线程获得同一互斥锁的所有权,但是最大次数并未具体说明,一旦超过一定次数,就会抛出std::system错误
*/
void test2()
{
//Calculate cal;
Calculate1 cal;
cal.both(6, 3);
}
- 使用递归互斥锁的场景往往都是可以简化的,使用递归互斥锁很容易放纵复杂逻辑的产生,从而导致bug的产生
- 递归互斥锁比非递归互斥锁效率要低一些。
- 递归互斥锁虽然允许同一个线程多次获得同一个互斥锁的所有权,但最大次数并未具体说明,一旦超过一定的次数,就会抛出std::system错误。
std::timed_mutex
超时独占锁,主要是在获取互斥锁资源时增加了超时等待功能,因为不知道获取锁资源需要等待多长时间,为了保证不一直等待下去,设置了一个超时时长,超时后线程就可以解除阻塞去做其他事情了。
std::timed_mutex 比 std::_mutex 多了两个成员函数:try_lock_for() 和try_lock_until():
/*
std::timed_mutex超时独占互斥锁,主要是在获取互斥锁资源时增加了超时等待功能,因为不知道获取锁资源需要等待多长时间
,为了保证不一直等待下去,设置了一个超时时长,超时后线程就可以解除阻塞去做其他的事情了
多两个成员函数 try_lock_for()和try_lock_until()
try_lock_for()是当线程获取不到互斥锁资源时,让线程阻塞一定时间
try_lock_until()是当线程获取不到互斥锁资源时,让线程阻塞到某一个指定的时间点
关于两个函数的返回值,当得到互斥锁的所有权之后,函数会马上解除阻塞,返回true,如果阻塞的时长用完或达到指定时间点之后
函数会解除阻塞,返回false
*/
timed_mutex g_mutex;
void work()
{
chrono::seconds timeout(1);
while (true)
{
if (g_mutex.try_lock_for(timeout))
{
cout << "当前线程id " << this_thread::get_id() << " 得到互斥锁所有权 " << endl;
//模拟处理任务用了一定时长
this_thread::sleep_for(chrono::seconds(10));
g_mutex.unlock();
break;
}else
{
cout << "当前线程id " << this_thread::get_id() << " 没有得到互斥锁所有权 " << endl;
//模拟其他任务
this_thread::sleep_for(chrono::seconds(1));
}
}
}
void test3()
{
thread t1(work);
thread t2(work);
t1.join();
t2.join();
}
读写锁
读可以共享,写必须独占,且写和读不能共存shared_mutex
class MTVVector
{
std::vecotr m_arr;
mutable std::shared_mutex m_mtx;
public:
void push_back(int val)
{
//写锁
m_mtx.lock();
m_arr.push_back(cal);
m_mtx.unlock();
}
size_t size() const{
//读锁
m_mtx.lock_shared();
m_arr.size();
m_mtx.unlock();
}
};
raii思想的
class MTVVector
{
std::vecotr m_arr;
mutable std::shared_mutex m_mtx;
public:
void push_back(int val)
{
//写锁
std::unique_lock qrd(m_mtx);
m_arr.push_back(cal);
}
size_t size() const{
//读锁
std::shared_lock qrd(m_mtx);
m_arr.size();
}
};
条件变量
条件变量是c++11提供的另外一种用于等待的同步机制,他能阻塞一个或多个线程,直到收到另外一个线程发出的通知或者超时时,才会唤醒当前阻塞的线程,条件变量需要和互斥量配合起来使用,c++11提供了两种条件变量
- condition_variable:需要配合 std::unique_lockstd::mutex 进行 wait 操作,也就是阻塞线程的操作。
- condition_variable_any:可以和任意带有 lock()、unlock() 语义的 mutex 搭配使用,也就是说有四种:
- std::mutex:独占的非递归互斥锁
- std::timed_mutex:带超时的独占非递归互斥锁
- std::recursive_mutex:不带超时功能的递归互斥锁
- std::recursive_timed_mutex:带超时的递归互斥锁
条件变量通常用于生产者消费者模型,大致使用过程如下
- 拥有条件变量的线程获取互斥量
- 循环检查某个条件,如果条件不满足阻塞当前线程,否则继续乡下执行
- 产品数量达到上限,生产者阻塞,否则一直生产
- 产品的数量为0,消费者阻塞,否则消费者一直消费
- 条件满足之后调用notify_one()或者notify_all()唤醒一个或者所有被阻塞的线程
生产消费者模型
/*
条件变量
*/
#include <iostream>
#include <thread>
#include <chrono>
#include <mutex>
#include <functional>
#include <condition_variable>
#include <list>
/*
condition_variable需要配合std::unique_lock<std::mutex>进行
wait操作,也就是阻塞线程的操作
condition_variable_any:可以和任意带有lock(),unlock()语义的mutex搭配使用
也就说有四种
std::mutex 独占的非递归互斥锁
std::timed_mutex 带超时的独占非递归互斥锁
std::recursive_mutex 不带超时功能的递归互斥锁
std::recursive_timed_mutex 带超时的递归互斥锁
*/
using namespace std;
class SyncQueue
{
private:
//存储队列数据
list<int> m_queue;
//互斥锁
mutex m_mutex;
//不为空的条件变量
condition_variable m_notEmpty;
//没有满的条件变量
condition_variable m_notFull;
//任务队列的最大任务数
int m_max_size;
public:
SyncQueue(int max_size) : m_max_size(max_size){}
//入队
void put(const int& x)
{
unique_lock<mutex> locker(m_mutex);
//判断任务队列是否已经满了
while (m_queue.size() == m_max_size)
{
cout << "任务队列已满,请耐心等候 ..." << endl;
//阻塞线程
m_notFull.wait(locker);
}
//将任务放入到任务队列中
m_queue.push_back(x);
cout << x << " 被生产" << endl;
//通知消费者消费
m_notEmpty.notify_one();
}
int take()
{
unique_lock<mutex> locker(m_mutex);
while (m_queue.empty())
{
cout << "任务队列已空,请耐心等候..." << endl;
m_notEmpty.wait(locker);
}
//从任务队列中取出任务(消费)
int x = m_queue.front();
m_queue.pop_front();
cout << x << " 被消费" << endl;
//通知生产者进行生产
m_notFull.notify_one();
return x;
}
bool empty()
{
lock_guard<mutex> locker(m_mutex);
return m_queue.empty();
}
bool full()
{
lock_guard<mutex> locker(m_mutex);
return m_queue.size() == m_max_size;
}
int size()
{
lock_guard<mutex> locker(m_mutex);
return m_queue.size();
}
~SyncQueue(){m_queue.clear();}
};
void test0()
{
SyncQueue taskQ(50);
//生产者
auto produce = bind(&SyncQueue::put, &taskQ, placeholders::_1);
//消费者
auto consume = bind(&SyncQueue::take, &taskQ);
//生产者消费者各三个线程
thread t1[3];
thread t2[3];
for (size_t i = 0; i < 3; i++)
{
t1[i] = thread(produce, i+100);
t2[i] = thread(consume);
}
for (size_t i = 0; i < 3; i++)
{
t1[i].join();
t2[i].join();
}
return;
}
class SyncQueueT
{
private:
//存储队列数据
list<int> m_queue;
//互斥锁
mutex m_mutex;
//不为空的条件变量
condition_variable_any m_not_empty;
//没有满的条件变量
condition_variable_any m_not_full;
//任务队列的最大任务个数
int m_max_size;
public:
SyncQueueT(int maxsize): m_max_size(maxsize){}
void put(const int& x){
lock_guard<mutex> locker(m_mutex);
//根据条件阻塞线程
m_not_full.wait(m_mutex, [this](){
return m_queue.size() != m_max_size;
});
//将任务放入到任务队列中去
m_queue.push_back(x);
cout << x << " +++" << endl;
this_thread::sleep_for(chrono::seconds(1));
//通知消费者消费
m_not_empty.notify_one();
}
int take(){
lock_guard<mutex> locker(m_mutex);
m_not_empty.wait(m_mutex, [this](){
return !m_queue.empty();
});
//从任务队列中取出任务去消费
int x = m_queue.front();
m_queue.pop_front();
//通知生产者去生产
m_not_full.notify_one();
cout << x << " ---" << endl;
return x;
}
bool empty(){}
bool full(){}
int size(){}
};
void test1(){
SyncQueueT taskQ(50);
//生产者
auto produce = bind(&SyncQueueT::put, &taskQ, placeholders::_1);
//消费者
auto consume = bind(&SyncQueueT::take, &taskQ);
//生产者消费者各三个线程
thread t1[100];
thread t2[100];
for (size_t i = 0; i < 100; i++)
{
t1[i] = thread(produce, i+100);
t2[i] = thread(consume);
}
for (size_t i = 0; i < 100; i++)
{
t1[i].join();
t2[i].join();
}
return;
}
int main()
{
test0();
cout << "--------------------------- " << endl;
test1();
return 0;
}
原子变量
c++提供了原子类型std::atomic,通过这个原子类型管理的内部变量就可以称之为源自变量,我们可以给原子类型指定bool/char/int/long/指针等类型作为模板参数(不支持浮点型和复合型)
原子指的是一系列不可被cpu上下文交换的机器指令,这些指令组合在一起就形成了原子操作,在多核cpu下,当某个cpu核心开始运行原子操作时,会暂停其他cpu内核对内存的操作,以保证原子操作不会被其他cpu内核做干扰
原子操作是通过指令提供的,所以性能比锁和消息传递好很多。
#include <iostream>
#include <thread>
#include <chrono>
#include <atomic>
#include <functional>
using namespace std;
struct Couter
{
// Couter():m_value(0){}
void increment(){
for (size_t i = 0; i < 10000; i++)
{
m_value++;
// cout << "increment number " << m_value
// << ", thread id " << this_thread::get_id() << endl;
this_thread::sleep_for(chrono::milliseconds(1));
}
}
void decrement()
{
for (size_t i = 0; i < 10000; i++)
{
m_value --;
// cout << "decrement number " << m_value << ", thread id"
// << this_thread::get_id() << endl;
this_thread::sleep_for(chrono::milliseconds(1));
}
}
// int m_value;
atomic_int m_value;
// atomic<int> m_value = 0;
};
void test0()
{
Couter c;
c.m_value = 0;
auto increment = bind(&Couter::increment, &c);
auto decrement = bind(&Couter::decrement, &c);
thread t1(increment);
thread t2(decrement);
t1.join();
t2.join();
std::cout << " value " << c.m_value << std::endl;
return;
}
int main()
{
test0();
return 0;
}
死锁
永远不要同时持有两把锁
保证双方上锁顺序一致
用std::lock同时对多个上锁
std::mutex mtx1;
std::mutex mtx2;
void thread()
{
std::lock(mtx1, mtx2);
mtx1.unlock();
mtx2.unlock();
}
void thread1()
{
std::lock(mtx1, mtx2);
mtx1.unlock();
mtx2.unlock();
}
自旋锁
使用递归锁
线程池
#pragma once
#include <iostream>
#include <thread>
#include <queue>
#include <vector>
#include <atomic>
#include <stdexcept>
#include <functional>
#include <future>
#include <mutex>
#include <condition_variable>
#include <string>
/*
c++11相关新特性
类型(模板)别名
emplace_back 直接在容器尾部创建这个元素,而push_back先构造一个临时对象,然后把临时的copy构造函数拷贝或者移动到容器后面
*/
namespace std
{
//线程池最大容量
#define THREADPOOL_MAX_NUM 16
//#define THREADPOOL_AUTO_GROW
class ThreadPool
{
private:
using Task = function<void()>; //定义类型
//线程池
vector<thread> _pool;
//任务队列
queue<Task> _tasks;
//同步
mutex _lock;
//条件阻塞
condition_variable _task_cv;
//线程池是否运行
atomic<bool> _run{true};
//空闲线程池数量
atomic<int> _idlThrNum{0};
public:
inline ThreadPool(unsigned short size = 4){addThread(size);}
inline ~ThreadPool()
{
_run = false;
//唤醒所有线程执行
_task_cv.notify_all();
for(thread& thread : _pool)
{
if (thread.joinable())
{
//等待任务结束
thread.join();
}
}
}
public:
//提交一个任务
//调用.get()方法获取返回值会等待任务执行完,获取返回值
//两种方法可以实现调用类成员
//一种是 bind
//另外一种是 mem_fn
template<class F, class... Args>
auto commit(F&& f, Args&&... args) ->future<decltype(f(args...))>
{
//停止?
if (!_run)
{
throw runtime_error("commit on Thread is stoped.");
}
//typename std::result_of<F(Args...)>::type, 函数f的返回值类型
using RetType = decltype(f(args...));
auto task = make_shared<packaged_task<RetType()>>(
bind(forward<F>(f), forward<Args>(args)...)
);//把函数入口及参数,打包(绑定)
future<RetType> future = task->get_future();
{
//添加任务到队列
//对当前块的语句加锁
lock_guard<mutex> lock{_lock};
_tasks.emplace([task](){
(*task)();
});
}
#ifdef THREADPOOL_AUTO_GROW
if (_idlThrNum < 1 && _pool.size() < THREADPOOL_MAX_NUM)
{
addThread(1);
}
#endif
//唤醒一个线程执行
_task_cv.notify_one();
return future;
}
//空闲线程数量
int idlCount(){return _idlThrNum;}
//线程数量
int hrCount(){return _pool.size();}
#ifdef THREADPOOL_AUTO_GROW
private:
#endif
void addThread(unsigned short size)
{
for (; _pool.size() < THREADPOOL_MAX_NUM && size > 0; --size)
{
_pool.emplace_back([this]{
while (_run)
{
//获取一个待执行的task
Task task;
{
unique_lock<mutex> lock{_lock};
_task_cv.wait(lock, [this]{
return !_run || !_tasks.empty();
}); //wait 直到有 task
if (!_run && _tasks.empty())
{
return;
}
//按照先入先出从队列取出一个task
task = move(_tasks.front());
_tasks.pop();
}
_idlThrNum --;
//执行任务
task();
_idlThrNum ++;
}
});
_idlThrNum ++;
}
}
};
}