多线程

多线程主要是
线程的函数,创建、传参
互斥锁
条件变量
生产消费者模型
源自变量
死锁
*线程异步操作

std::thread

c++提供的线程类std::thread基于这个类创建一个新的线程非常简单,只需要提供线程函数或者函数对象即可,并且可以同时指定线程函数的参数。

// ①
thread() noexcept;
// ②
thread( thread&& other ) noexcept;
// ③
template< class Function, class... Args >
explicit thread( Function&& f, Args&&... args );
// ④
thread( const thread& ) = delete;
  • 构造函数1:默认构造函数,构造一个线程对象,在这个线程中不执行任何处理动作
  • 构造函数2:移动构造函数,将other的线程所有权转移给新的thread对象。之后other不在表示执行线程。
  • 执行函数3:创建线程对象,并在该线程中之形函数f中的业务逻辑,args是要传递给函数f的参数
void func(int num, string str)
{
  for (size_t i = 0; i < 10; i++)
  {
    cout << "子线程 " << i << " num :" << num << " str " << str << endl;
  }
}

void do_some_work()
{
  for (size_t i = 0; i < 5; i++)
  {
    cout << "work " << i << endl;
  }
}

void test0()
{
    //创建线程1,传入参数
  thread t(func, 520, "i love you");
  //创建线程2,无参
  std::thread my_tread(do_some_work);
    //获取线程id
  cout << " t " << t.get_id() << endl;
  cout << " t1 " << my_tread.get_id() << endl;
    //等待线程终止
  t.join();
  my_tread.join();
}

detach

线程分离函数detach,在线程分离之后,主线程退出也会一并销毁创建出的所有子线程,在主线程退出之前,它可以脱离主线程继续独立的运行,任务执行完毕之后,这个子线程会自动释放自己占用的系统资源。

void detach();

joinable

判断主线程和子线程是否处于关联状态,一般情况下两者处于关联状态,该函数返回一个布尔类型,但是调用detach后线程就不关联了

operator =

线程中的资源是不能被复制的,因此通过=操作符进行赋值操作最终并不会得到两个完全相同的对象。

// move (1)	
thread& operator= (thread&& other) noexcept;
// copy [deleted] (2)	
thread& operator= (const other&) = delete;

通过上面=操作符重载声明可以得知:

  • 如果 other 是一个右值,会进行资源所有权的转移
  • 如果 other 不是右值,禁止拷贝,该函数被显示删除(=delete),不可用

静态函数

thread线程类还提供了一个静态方法,计算当前计算机的cpu核心数,根据这个可以在程序中创建出数量相等的线程,每个线程独占一个cpu核心,这些线程就不用分时复用cpu时间片,此时程序并发效率是最高的

#include <iostream>
#include <thread>
using namespace std;

int main()
{
    int num = thread::hardware_concurrency();
    cout << "CPU number: " << num << endl;
}

线程同步

进行多线程编程,如果需要多个线程对一块内存操作,比如:同时读、同时写、同时读写对于后两种情况来说,如果不做任何的认为干涉就会出现各种各样的错误数据(比如变量自加一,首先从内存中取出变量放入寄存器,寄存器自加一,再从寄存器放回内存)
为了解决多线程数据混乱的方案就是进行线程同步,最常用的就是互斥锁,在c++11中提供了四种互斥锁

  • std::mutex独占的互斥锁,不能递归使用
  • std::timed_mutex带超时的独占互斥锁,不能递归使用
  • std::recursive_mutex递归互斥锁,不带超时功能
  • std::recursive_timed_mutex带超时的递归锁

也有称作为互斥量,两者是一个东西

成员函数

lock

用于给临界区加锁,并且只能有一个线程获得锁的所有权。他有阻塞线程的作用

void lock();
try_lock

也可以对临界区进行加锁
区别是
二者的区别在于 try_lock() 不会阻塞线程,lock() 会阻塞线程:

  • 如果互斥锁是未锁定状态,得到了互斥锁所有权并加锁成功,函数返回 true
  • 如果互斥锁是锁定状态,无法得到互斥锁所有权加锁失败,函数返回 false
unlock

对被锁定的锁解锁,但是-只有拥有互斥锁所有权的线程,也就是对互斥锁上锁的线程才能对其解锁,其他线程是没有权限做这件事情的

代码
int g_num = 0;
mutex g_num_mutex;

void slow_increment(int id)
{
  for (size_t i = 0; i < 3; i++)
  {
    g_num_mutex.lock();
    g_num++;
    cout << id << " - >" << g_num << endl;
    g_num_mutex.unlock();
    this_thread::sleep_for(chrono::seconds(1));
  }
}

/*
1.在所有线程的任务函数执行完毕之前互斥锁对象不能被析构,一定要在程序中保证这个对象的可用性
2.互斥锁的个数和共享资源的个数相等,也即是说每一个共享资源都应该对应一个互斥锁对象。互斥锁对象的个数和线程的个数没有关系
*/
void test0()
{
  thread t1(slow_increment, 0);
  thread t2(slow_increment, 1);
  t1.join();
  t2.join();
}

std::lock_guard

lock_guard是c++11新增的一个模板类,使用这个类可以简化互斥锁lock和unlock的写法,同时也更加安全。这个模板类的定义和常用的构造函数原型如下:

// 类的定义,定义于头文件 <mutex>
template< class Mutex >
class lock_guard;

// 常用构造函数
explicit lock_guard( mutex_type& m );

lock_guard 在使用上面提供的这个构造函数构造对象时,会自动锁定互斥量,而在退出作用域后进行析构时就会自动解锁,从而保证了互斥量的正确操作,避免忘记 unlock() 操作而导致线程死锁。lock_guard 使用了 RAII 技术,就是在类构造函数中分配资源,在析构函数中释放资源,保证资源出了作用域就释放。

//由于lock_guard是独占锁,所以递归调用会导致死锁
struct Calculate
{
  Calculate() : m_i(6){}
  void mul(int x)
  {
    cout << "mul" << endl;
    lock_guard<mutex> locker(m_mutex);
    m_i *= x;
      /*
1.使用递归锁的场景往往都是可以简化的,使用递归互斥锁很容易放纵复杂逻辑的产生,从而导致bug的产生
2. 递归互斥锁比非递归互斥锁效率低一点
3. 递归互斥锁虽然允许同一线程获得同一互斥锁的所有权,但是最大次数并未具体说明,一旦超过一定次数,就会抛出std::system错误
*/
void test2()
{
  //Calculate cal;
  Calculate1 cal;
  cal.both(6, 3);
}
  }
  void div(int x){
    cout << "div" << endl;
    lock_guard<mutex> locker(m_mutex);
    m_i /= x;
  }

  void both(int x, int y)
  {
    cout << "both" << endl;
    lock_guard<mutex> locker(m_mutex);
    mul(x);
    div(y);
  }

  int m_i;
  mutex m_mutex;
};

void test2()
{
  Calculate cal;
  cal.both(6, 3);
}

std::recursive_mutex

递归互斥锁允许统一鲜橙多次获得互斥锁,可以用来解决同一线程多次获取互斥量时死锁的问题

//std::recursive_mutex允许统一线程多次获得互斥锁
struct Calculate1
{
  Calculate1() : m_i(6){}
  void mul(int x)
  {
    cout << "mul" << endl;
    lock_guard<recursive_mutex> locker(m_mutex);
    m_i *= x;
  }
  void div(int x){
    cout << "div" << endl;
    lock_guard<recursive_mutex> locker(m_mutex);
    m_i /= x;
  }

  void both(int x, int y)
  {
    cout << "both" << endl;
    lock_guard<recursive_mutex> locker(m_mutex);
    mul(x);
    div(y);
  }

  int m_i;
  recursive_mutex m_mutex;
};

/*
1.使用递归锁的场景往往都是可以简化的,使用递归互斥锁很容易放纵复杂逻辑的产生,从而导致bug的产生
2. 递归互斥锁比非递归互斥锁效率低一点
3. 递归互斥锁虽然允许同一线程获得同一互斥锁的所有权,但是最大次数并未具体说明,一旦超过一定次数,就会抛出std::system错误
*/
void test2()
{
  //Calculate cal;
  Calculate1 cal;
  cal.both(6, 3);
}
  • 使用递归互斥锁的场景往往都是可以简化的,使用递归互斥锁很容易放纵复杂逻辑的产生,从而导致bug的产生
  • 递归互斥锁比非递归互斥锁效率要低一些。
  • 递归互斥锁虽然允许同一个线程多次获得同一个互斥锁的所有权,但最大次数并未具体说明,一旦超过一定的次数,就会抛出std::system错误。

std::timed_mutex

超时独占锁,主要是在获取互斥锁资源时增加了超时等待功能,因为不知道获取锁资源需要等待多长时间,为了保证不一直等待下去,设置了一个超时时长,超时后线程就可以解除阻塞去做其他事情了。
std::timed_mutex 比 std::_mutex 多了两个成员函数:try_lock_for() 和try_lock_until():

/*
std::timed_mutex超时独占互斥锁,主要是在获取互斥锁资源时增加了超时等待功能,因为不知道获取锁资源需要等待多长时间
,为了保证不一直等待下去,设置了一个超时时长,超时后线程就可以解除阻塞去做其他的事情了
多两个成员函数 try_lock_for()和try_lock_until()
try_lock_for()是当线程获取不到互斥锁资源时,让线程阻塞一定时间
try_lock_until()是当线程获取不到互斥锁资源时,让线程阻塞到某一个指定的时间点
关于两个函数的返回值,当得到互斥锁的所有权之后,函数会马上解除阻塞,返回true,如果阻塞的时长用完或达到指定时间点之后
函数会解除阻塞,返回false
*/
timed_mutex g_mutex;
void work()
{
  chrono::seconds timeout(1);
  while (true)
  {
    if (g_mutex.try_lock_for(timeout))
    {
      cout << "当前线程id " << this_thread::get_id() << " 得到互斥锁所有权 " << endl;
      //模拟处理任务用了一定时长
      this_thread::sleep_for(chrono::seconds(10));
      g_mutex.unlock();
      break;
    }else
    {
      cout << "当前线程id " << this_thread::get_id() << " 没有得到互斥锁所有权 " << endl;
      //模拟其他任务
      this_thread::sleep_for(chrono::seconds(1));
    }
  }
}

void test3()
{
  thread t1(work);
  thread t2(work);
  t1.join();
  t2.join();
}

读写锁

读可以共享,写必须独占,且写和读不能共存
shared_mutex

class MTVVector
{
std::vecotr m_arr;
mutable std::shared_mutex m_mtx;
public:
    void push_back(int val)
    {
        //写锁
        m_mtx.lock();
        m_arr.push_back(cal);
        m_mtx.unlock();
    }
    size_t size() const{
        //读锁
        m_mtx.lock_shared();
        m_arr.size();
        m_mtx.unlock();
    }
};

raii思想的

class MTVVector
{
std::vecotr m_arr;
mutable std::shared_mutex m_mtx;
public:
    void push_back(int val)
    {
        //写锁
        std::unique_lock qrd(m_mtx);
        m_arr.push_back(cal);
    }
    size_t size() const{
        //读锁
        std::shared_lock qrd(m_mtx);
        m_arr.size();
    }
};

条件变量

条件变量是c++11提供的另外一种用于等待的同步机制,他能阻塞一个或多个线程,直到收到另外一个线程发出的通知或者超时时,才会唤醒当前阻塞的线程,条件变量需要和互斥量配合起来使用,c++11提供了两种条件变量

  • condition_variable:需要配合 std::unique_lockstd::mutex 进行 wait 操作,也就是阻塞线程的操作。
  • condition_variable_any:可以和任意带有 lock()、unlock() 语义的 mutex 搭配使用,也就是说有四种:
    • std::mutex:独占的非递归互斥锁
    • std::timed_mutex:带超时的独占非递归互斥锁
    • std::recursive_mutex:不带超时功能的递归互斥锁
    • std::recursive_timed_mutex:带超时的递归互斥锁

条件变量通常用于生产者消费者模型,大致使用过程如下

  1. 拥有条件变量的线程获取互斥量
  2. 循环检查某个条件,如果条件不满足阻塞当前线程,否则继续乡下执行
    1. 产品数量达到上限,生产者阻塞,否则一直生产
    2. 产品的数量为0,消费者阻塞,否则消费者一直消费
  3. 条件满足之后调用notify_one()或者notify_all()唤醒一个或者所有被阻塞的线程

生产消费者模型

/*
条件变量
*/
#include <iostream>
#include <thread>
#include <chrono>
#include <mutex>
#include <functional>
#include <condition_variable>
#include <list>

/*
condition_variable需要配合std::unique_lock<std::mutex>进行
wait操作,也就是阻塞线程的操作

condition_variable_any:可以和任意带有lock(),unlock()语义的mutex搭配使用
也就说有四种
std::mutex 独占的非递归互斥锁
std::timed_mutex 带超时的独占非递归互斥锁
std::recursive_mutex 不带超时功能的递归互斥锁
std::recursive_timed_mutex 带超时的递归互斥锁
*/

using namespace std;

class SyncQueue
{
private:
//存储队列数据
    list<int> m_queue;
    //互斥锁
    mutex m_mutex;
    //不为空的条件变量
    condition_variable m_notEmpty;
    //没有满的条件变量
    condition_variable m_notFull;
    //任务队列的最大任务数
    int m_max_size;

public:
    SyncQueue(int max_size) : m_max_size(max_size){}
    //入队
    void put(const int& x)
    {
        unique_lock<mutex> locker(m_mutex);
        //判断任务队列是否已经满了
        while (m_queue.size() == m_max_size)
        {
            cout << "任务队列已满,请耐心等候 ..." << endl;
            //阻塞线程
            m_notFull.wait(locker);
        }
        //将任务放入到任务队列中
        m_queue.push_back(x);
        cout << x << " 被生产" << endl;
        //通知消费者消费
        m_notEmpty.notify_one();
    }

    int take()
    {
        unique_lock<mutex> locker(m_mutex);
        while (m_queue.empty())
        {
            cout << "任务队列已空,请耐心等候..." << endl;
            m_notEmpty.wait(locker);
        }
        //从任务队列中取出任务(消费)
        int x = m_queue.front();
        m_queue.pop_front();
        cout << x << " 被消费" << endl;
        //通知生产者进行生产
        m_notFull.notify_one();
        return x;
    }

    bool empty()
    {
        lock_guard<mutex> locker(m_mutex);
        return m_queue.empty();
    }

    bool full()
    {
        lock_guard<mutex> locker(m_mutex);
        return m_queue.size() == m_max_size;
    }

    int size()
    {
        lock_guard<mutex> locker(m_mutex);
        return m_queue.size();
    }
    ~SyncQueue(){m_queue.clear();}
};

void test0()
{
    SyncQueue taskQ(50);
    //生产者
    auto produce = bind(&SyncQueue::put, &taskQ, placeholders::_1);
    //消费者
    auto consume = bind(&SyncQueue::take, &taskQ);

    //生产者消费者各三个线程
    thread t1[3];
    thread t2[3];
    for (size_t i = 0; i < 3; i++)
    {
        t1[i] = thread(produce, i+100);
        t2[i] = thread(consume);
    }

    for (size_t i = 0; i < 3; i++)
    {
        t1[i].join();
        t2[i].join();
    }
    return;
}

class SyncQueueT
{
private:
    //存储队列数据
    list<int> m_queue;
    //互斥锁
    mutex m_mutex;
    //不为空的条件变量
    condition_variable_any m_not_empty;
    //没有满的条件变量
    condition_variable_any m_not_full;
    //任务队列的最大任务个数
    int m_max_size;
public:
    SyncQueueT(int maxsize): m_max_size(maxsize){}
    void put(const int& x){
        lock_guard<mutex> locker(m_mutex);
        //根据条件阻塞线程
        m_not_full.wait(m_mutex, [this](){
            return m_queue.size() != m_max_size;
        });
        //将任务放入到任务队列中去
        m_queue.push_back(x);
        cout << x << " +++" << endl;
        this_thread::sleep_for(chrono::seconds(1));
        //通知消费者消费
        m_not_empty.notify_one();
    }

    int take(){
        lock_guard<mutex> locker(m_mutex);
        m_not_empty.wait(m_mutex, [this](){
            return !m_queue.empty();
        });
        //从任务队列中取出任务去消费
        int x = m_queue.front();
        m_queue.pop_front();
        //通知生产者去生产
        m_not_full.notify_one();
        cout << x << " ---" << endl;
        return x;
    }
    bool empty(){}
    bool full(){}
    int size(){}
};

void test1(){
    SyncQueueT taskQ(50);
    //生产者
    auto produce = bind(&SyncQueueT::put, &taskQ, placeholders::_1);
    //消费者
    auto consume = bind(&SyncQueueT::take, &taskQ);

    //生产者消费者各三个线程
    thread t1[100];
    thread t2[100];
    for (size_t i = 0; i < 100; i++)
    {
        t1[i] = thread(produce, i+100);
        t2[i] = thread(consume);
    }

    for (size_t i = 0; i < 100; i++)
    {
        t1[i].join();
        t2[i].join();
    }
    return;

}


int main()
{
    test0();
    cout << "--------------------------- " << endl;
    test1();
    return 0;
}

原子变量

c++提供了原子类型std::atomic,通过这个原子类型管理的内部变量就可以称之为源自变量,我们可以给原子类型指定bool/char/int/long/指针等类型作为模板参数(不支持浮点型和复合型)
原子指的是一系列不可被cpu上下文交换的机器指令,这些指令组合在一起就形成了原子操作,在多核cpu下,当某个cpu核心开始运行原子操作时,会暂停其他cpu内核对内存的操作,以保证原子操作不会被其他cpu内核做干扰
原子操作是通过指令提供的,所以性能比锁和消息传递好很多。

#include <iostream>
#include <thread>
#include <chrono>
#include <atomic>
#include <functional>

using namespace std;

struct Couter
{
  // Couter():m_value(0){}
  void increment(){
    for (size_t i = 0; i < 10000; i++)
    {
      m_value++;
      // cout << "increment number " << m_value
      //      << ", thread id " << this_thread::get_id() << endl;
      this_thread::sleep_for(chrono::milliseconds(1));
    }
  }
  
  void decrement()
  {
    for (size_t i = 0; i < 10000; i++)
    {
      m_value --;
      // cout << "decrement number " << m_value << ", thread id"
      //      << this_thread::get_id() << endl;
      this_thread::sleep_for(chrono::milliseconds(1));
    }
  }

  // int m_value;
  atomic_int m_value;
  // atomic<int> m_value = 0;
};

void test0()
{
  Couter c;
  c.m_value = 0;
  auto increment = bind(&Couter::increment, &c);
  auto decrement = bind(&Couter::decrement, &c);
  thread t1(increment);
  thread t2(decrement);
  t1.join();
  t2.join();
  std::cout << " value " << c.m_value << std::endl;
  return;
}

int main()
{
  test0();
  return 0;
}

死锁

永远不要同时持有两把锁

保证双方上锁顺序一致

用std::lock同时对多个上锁

std::mutex mtx1;
std::mutex mtx2;

void thread()
{
    std::lock(mtx1, mtx2);
    mtx1.unlock();
    mtx2.unlock();
}
void thread1()
{
    std::lock(mtx1, mtx2);
    mtx1.unlock();
    mtx2.unlock();
}

自旋锁

使用递归锁

线程池

#pragma once
#include <iostream>
#include <thread>
#include <queue>
#include <vector>
#include <atomic>
#include <stdexcept>
#include <functional>
#include <future>
#include <mutex>
#include <condition_variable>
#include <string>

/*
c++11相关新特性

类型(模板)别名
emplace_back 直接在容器尾部创建这个元素,而push_back先构造一个临时对象,然后把临时的copy构造函数拷贝或者移动到容器后面

*/

namespace std
{

//线程池最大容量
#define THREADPOOL_MAX_NUM 16
//#define  THREADPOOL_AUTO_GROW

class ThreadPool
{
private:
  using Task = function<void()>; //定义类型
  //线程池
  vector<thread> _pool;
  //任务队列
  queue<Task> _tasks;
  //同步
  mutex _lock;
  //条件阻塞
  condition_variable _task_cv;
  //线程池是否运行
  atomic<bool> _run{true};
  //空闲线程池数量
  atomic<int> _idlThrNum{0};

public:
  inline ThreadPool(unsigned short size = 4){addThread(size);}
  inline ~ThreadPool()
  {
    _run = false;
    //唤醒所有线程执行
    _task_cv.notify_all();
    for(thread& thread : _pool)
    {
      if (thread.joinable())
      {
        //等待任务结束
        thread.join();
      }
    }
  }

public:
  //提交一个任务
  //调用.get()方法获取返回值会等待任务执行完,获取返回值
  //两种方法可以实现调用类成员
  //一种是 bind
  //另外一种是 mem_fn
  template<class F, class... Args>
  auto commit(F&& f, Args&&... args) ->future<decltype(f(args...))>
  {
    //停止?
    if (!_run)
    {
      throw runtime_error("commit on Thread is stoped.");
    }
    //typename std::result_of<F(Args...)>::type, 函数f的返回值类型
    using RetType = decltype(f(args...));
    auto task = make_shared<packaged_task<RetType()>>(
      bind(forward<F>(f), forward<Args>(args)...)
    );//把函数入口及参数,打包(绑定)
    future<RetType> future = task->get_future();
    {
      //添加任务到队列
      //对当前块的语句加锁
      lock_guard<mutex> lock{_lock};
      _tasks.emplace([task](){
        (*task)();
      });
    }

#ifdef THREADPOOL_AUTO_GROW
    if (_idlThrNum < 1 && _pool.size() < THREADPOOL_MAX_NUM)
    {
      addThread(1);
    }
#endif
    //唤醒一个线程执行
    _task_cv.notify_one();
    return future;
  }

  //空闲线程数量
  int idlCount(){return _idlThrNum;}
  //线程数量
  int hrCount(){return _pool.size();}

#ifdef THREADPOOL_AUTO_GROW
private:
#endif
  void addThread(unsigned short size)
  {
    for (; _pool.size() < THREADPOOL_MAX_NUM && size > 0; --size)
    {
      _pool.emplace_back([this]{
        while (_run)
        {
          //获取一个待执行的task
          Task task;
          {
            unique_lock<mutex> lock{_lock};
            _task_cv.wait(lock, [this]{
              return !_run || !_tasks.empty();
            }); //wait 直到有 task
            if (!_run && _tasks.empty())
            {
              return;
            }
            //按照先入先出从队列取出一个task
            task = move(_tasks.front());
            _tasks.pop();
          }
          _idlThrNum --;
          //执行任务
          task();
          _idlThrNum ++;
        }
      });
      _idlThrNum ++;
    }
  }
};
}


版权声明:本文为qq825255961原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq825255961/article/details/124548190