现代c++并发深入

文章时效性提示

这是一篇发布于 71 天前的文章,部分信息可能已发生改变,请注意甄别。

更加深入的探讨并发.

std::thread

多线程的构造,使用std::thread时我们需要注意哪些?

可以接受哪些参数

std::thread接受一个可调用对象和其参数.

可调用\ *(Callable)* 类型是可应用INVOKE和INVOKE操作(例如用于 std::function、std::bind和 std::thread::thread)的类型.

如果满足下列条件,那么类型 T可调用 (Callable) 的:

给定

  • T 类型的对象 f
  • 适合的实参类型列表 ArgTypes
  • 适合的返回类型 R

那么下列表达式必须合法:

表达式要求
INVOKE(f, [std::declval()…) 该表达式在不求值语境中良构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Task_2 {
public:
void operator()() { std::puts("operator()()const\n"); }
operator bool() { return true; }
}; // 函数对象
struct X{
void task_run(int) const;
}
void f(int); // 函数
X x;
std::thread t{&X::task_run,&x,3}; //成员指针必须和对象一起使用,这是唯一标准用法,成员指针不可以转换到函数指针单独使用,即使是非静态成员函数没有使用任何数据成员.
std::thread t{ std::bind(&X::task_run, &x ,3) };
std::thread t{[]{std::puts("Hi!")}}; // lambda函数
Task_2 task_2{};
std::thread t{task_2}; // 左值 左值引用
std::thread ta{Task_2{}}; // 临时对象
std::thread ta1{[] { std::puts("Hi"); }}; // 临时对象
std::thread ta2{h}; // 左值

重要的构造函数如下(MSVC实现)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public:
template <class _Fn, class... _Args, enable_if_t<!is_same_v<_Remove_cvref_t<_Fn>, thread>, int> = 0>
_NODISCARD_CTOR_THREAD explicit thread(_Fn&& _Fx, _Args&&... _Ax) {
_Start(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)...);
}
template <class _Fn, class... _Args>
void _Start(_Fn&& _Fx, _Args&&... _Ax) {
using _Tuple = tuple<decay_t<_Fn>, decay_t<_Args>...>;
auto _Decay_copied = _STD make_unique<_Tuple>(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)...);
constexpr auto _Invoker_proc = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(_Args)>{});

_Thr._Hnd =
reinterpret_cast<void*>(_CSTD _beginthreadex(nullptr, 0, _Invoker_proc, _Decay_copied.get(), 0, &_Thr._Id));

if (_Thr._Hnd) { // ownership transferred to the thread
(void) _Decay_copied.release();
} else { // failed to start thread
_Thr._Id = 0;
_Throw_Cpp_error(_RESOURCE_UNAVAILABLE_TRY_AGAIN);
}
}

首先根据传入的可调用对象和参数使用转发引用,并确保传入的可调用对象去掉const/volatile引用后不等于std::thread. 如果传入左值,则为左值引用,调用_Start时传入左引用,否则传入右值. forward内部根据传入的是左值还是右值转为对应的引用

1
2
3
4
5
6
7
8
9
10
_EXPORT_STD template <class _Ty>
_NODISCARD _MSVC_INTRINSIC constexpr _Ty&& forward(remove_reference_t<_Ty>& _Arg) noexcept {
return static_cast<_Ty&&>(_Arg);
}

_EXPORT_STD template <class _Ty>
_NODISCARD _MSVC_INTRINSIC constexpr _Ty&& forward(remove_reference_t<_Ty>&& _Arg) noexcept {
static_assert(!is_lvalue_reference_v<_Ty>, "bad forward call");
return static_cast<_Ty&&>(_Arg);
}

在_Start中使用tuple保存函数和参数类型,使用make_unique得到指向_tuple的指针,使用传入的左值或者右值,比如如果参数如果传递一个左值num(即使是引用),会使用拷贝构造,如果传入一个右值会调用对应移动构造,

这里需要介绍一些左值和右值,以及模板编程中template<T> void( std::remove_reference_t<T>&)template<T> void( std::remove_reference_t<T>&&)

值类别 - cppreference.com

变量的类型是右值引用,由它的名字构成的表达式仍是左值表达式

转换到左值引用类型的转型表达式以及转换到函数的右值引用类型的转型表达式是左值;而转换到对象的右值引用类型的类型转换表达式是亡值,也就是右值,通过static_cast这种方式返回了右值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template <class _Tuple, size_t... _Indices>
_NODISCARD static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
return &_Invoke<_Tuple, _Indices...>;
}

template <class _Tuple, size_t... _Indices>
static unsigned int __stdcall _Invoke(void* _RawVals) noexcept /* terminates */ {
// adapt invoke of user's callable object to _beginthreadex's thread procedure
const unique_ptr<_Tuple> _FnVals(static_cast<_Tuple*>(_RawVals));
_Tuple& _Tup = *_FnVals.get(); // avoid ADL, handle incomplete types
_STD invoke(_STD move(_STD get<_Indices>(_Tup))...);
_Cnd_do_broadcast_at_thread_exit(); // TRANSITION, ABI
return 0;
}

_beginthreadex是windows上创建多线程的API,传入一个函数指针(也就是_Invoke)和指向一个tuple(包含函数和参数)的指针,在_Invoke中, _STD invoke(_STD move(_STD get<_Indices>(_Tup))…);`相当于根据可调用对象及其参数进行了调用

传递的参数

根据源码实现,传递的参数会在thread对象中创建新值.

auto _Decay_copied = _STD make_unique<_Tuple>(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)...);创建,而_Tuple中的类型就是左值,using _Tuple = tuple<decay_t<_Fn>, decay_t<_Args>...>;,所以创建时会调用拷贝或移动构造创建新的值,而在实际调用对应可调用对象时,又会使用右值_STD invoke(_STD move(_STD get<_Indices>(_Tup))...);,这意味着,即使函数参数是引用,

1
2
3
4
5
void f(const int& n){};
int main() {
auto num{5};
std::jthread(f,num);
}

上面的num首先会在std::jthread中创建变量,auto _Decaycopied = std::make_unique<_Tuple>(std::forward\(__Fn),std::forward\(args)…);

使用invoke调用,std::invoke(std::move(std::get<_Indices>(_Tup))…);调用函数时使用了std::move作为右值传递,所以有const int& n = std::move(_num),如果不加上const会发生运行编译错误,此外这样也支持了只能移动的对象.

如何使得传递引用呢,

1
2
3
4
5
void f(const int& n){};
int main() {
auto num{5};
std::jthread(f,std::ref(num));
}

实现中将类型先经过 decay 处理,如果要传递引用,则必须用类包装,使用 std::ref 函数会返回一个包装对象

使用可调用对象的方式是利用_Invoke,其参数是上面的_Decay_copied,将其转为unique_ptr,再通过智能指针转为_Tup&,然后使用 std::invoke 进行调用

std::ref是一个对象,但是使用时隐式转为引用,所以在构造时直接拷贝赋值std::reference_wrapper\.

1
2
3
4
5
6
  int num2{3};
auto n2 = std::ref(num2);
auto ttc = n2;
std::cout << ttc << std::endl;
ttc += 12;
std::cout << n2 << std::endl;

join与detach

detach() 是线程分离,线程对象放弃了线程资源的所有权,此时thread根本没有关联任何线程.调用 join() 是:“阻塞当前线程直至 *this 所标识的线程结束其执行”,线程对象都没有线程,就不需要阻塞了.

不能拷贝构造/赋值与转移所有权

传入可调用对象以及参数,构造 std::thread 对象,启动线程,而线程对象拥有了线程的所有权,线程是一种系统资源,所以可称作“线程资源”.

std::thread 不可复制.两个 std::thread 对象不可表示一个线程,std::thread 对线程资源是独占所有权.而移动操作可以将一个 std::thread 对象的线程资源所有权转移给另一个 std::thread 对象.

1
2
3
4
5
6
7
8
9
10
int main() {
std::thread t{ [] {
std::cout << std::this_thread::get_id() << '\n';
} };
std::cout << t.joinable() << '\n'; // 线程对象 t 当前关联了活跃线程 打印 1
std::thread t2{ std::move(t) }; // 将 t 的线程资源的所有权移交给 t2
std::cout << t.joinable() << '\n'; // 线程对象 t 当前没有关联活跃线程 打印 0
//t.join(); // Error! t 没有线程资源
t2.join(); // t2 当前持有线程资源
}
1
2
3
4
5
6
7
8
9
std::thread f(){
std::thread t{ [] {} };
return t;
}

int main(){
std::thread rt = f();
rt.join();
}

return t 重载决议[1]选择到了移动构造,将 t 线程资源的所有权转移给函数调用 f() 返回的临时 std::thread 对象中,然后这个临时对象再用来初始化 rt ,临时对象是右值表达式,这里一样选择到移动构造,将临时对象的线程资源所有权移交给 rt.此时 rt 具有线程资源的所有权,由它调用 join() 正常析构

1
2
3
4
5
6
7
8
9
void f(std::thread t){
t.join();
}

int main(){
std::thread t{ [] {} };
f(std::move(t));
f(std::thread{ [] {} });
}

std::move 将 t 转换为了一个右值表达式,初始化函数f 形参 t,选择到了移动构造转移线程资源的所有权,在函数中调用 t.join() 后正常析构.std::thread{ [] {} } 构造了一个临时对象,本身就是右值表达式,初始化函数f 形参 t,移动构造转移线程资源的所有权到 t,t.join() 后正常析构.

std::this_thread

  • get_id

  • sleep_for

  • yield

  • sleep_until

数据竞争

当某个表达式的求值写入某个内存位置,而另一求值读或修改同一内存位置时,称这些表达式冲突.拥有两个冲突的求值的程序就有数据竞争,除非

  • 两个求值都在同一线程上,或者在同一信号处理函数中执行,或
  • 两个冲突的求值都是原子操作(见 std::atomic),或
  • 一个冲突的求值发生早于 另一个(见 std::memory_order)

如果出现数据竞争,那么程序的行为未定义.

互斥量

互斥量用于保护多线程下的共享数据的读写

互斥量(Mutex),又常被称为互斥锁、互斥体(或者直接被称作“锁”),是一种用来保护临界区的特殊对象,其相当于实现了一个公共的“标志位”.它可以处于锁定(locked)状态,也可以处于解锁(unlocked)状态:

  1. 如果互斥量是锁定的,通常说某个特定的线程正持有这个锁.
  2. 如果没有线程持有这个互斥量,那么这个互斥量就处于解锁状态.
1
2
3
4
std::mutex m{};
m.lock();
// do something...
m.unlock()

如果多个线程中,其中一个线程在执行互斥区操作,其他线程执行到了m.lock()时会阻塞直到m.unlock释放.

1
2
3
4
std::mutex m{};
m.try_lock();
// do something...
m.unlock()

try_lock不会阻塞,而是会返回一个bool值,如果失败了就返回false,上锁成功返回true

如何管理互斥量

使用std::lock_guardstd::scoped_lock

1
2
3
4
5
6
7
8
9
10
11
12
13
class MyLockGuard {
public:
using mutext_type = T;
explicit MyLockGuard(T t_mutex) : _MyMutex(t_mutex) { _MyMutex.lock(); }
MyLockGuard(T& _Mtx, std::adopt_lock_t) noexcept : _MyMutex(_Mtx) {}
MyLockGuard(const MyLockGuard&) = delete;
MyLockGuard& operator=(const MyLockGuard&) = delete;

~MyLockGuard() noexcept { _MyMutex.unlock(); }

private:
T& _MyMutex;
};

提供便利 RAII 风格机制的互斥包装器,它在作用域块的存在期间占有一或多个互斥体.不可复制、移动.当创建对象时,它尝试取得给定互斥体的所有权.当控制离开创建对象的作用域时,析构并释放互斥体

1
2
3
4
5
6
7
8
9
10
11
12
13
void foor_guard() {
std::lock_guard<std::mutex> lock{m};
std::cout << "foor" << std::endl;
}
void add_to_list(int n, std::list<int>& list) {
std::vector<int> numbers(n + 1);
std::iota(numbers.begin(), numbers.end(), 0);
int sum = std::accumulate(numbers.begin(), numbers.end(), 0);
{
std::lock_guard<std::mutex> lc{m};
list.push_back(sum);
}
}

scoped_lock 类类似,它在作用域块的存在期间占有一或多个互斥体.

互斥量保护数据的问题

当使用lock_guard时,如果将指针或者引用传递给外部值,这样就脱离mutex管理了.

简而言之:切勿将受保护数据的指针或引用传递到互斥量作用域之外,不然保护将形同虚设共享数据 | 现代C++并发编程教程 (mq-b.github.io)

互斥可能导致的死锁

当有多个互斥量时可能遇到死锁.避免死锁的一般建议是让两个互斥量以相同的顺序上锁,总在互斥量 B 之前锁住互斥量 A,就通常不会死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
std::mutex m1,m2;
std::size_t n{};

void f(){
std::lock_guard<std::mutex> lc1{ m1 };
std::lock_guard<std::mutex> lc2{ m2 };
++n;
}
void f2() {
std::lock_guard<std::mutex> lc1{ m2 };
std::lock_guard<std::mutex> lc2{ m1 };
++n;
}

上面代码就有可能死锁. 修改上锁顺序即可.

但是即使上锁顺序相同,也有可能导致死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct X{
X(const std::string& str) :object{ str } {}

friend void swap(X& lhs, X& rhs);
private:
std::string object;
std::mutex m;
};

void swap(X& lhs, X& rhs) {
if (&lhs == &rhs) return;
std::lock_guard<std::mutex> lock1{ lhs.m };
std::lock_guard<std::mutex> lock2{ rhs.m };
swap(lhs.object, rhs.object);
}
1
2
3
X a{"a"},b{"b"};
std::thread t{[&]{swap(a,b);}};
std::thread t2{[&]{swap(b,a);}};

解决方法是是使用std::lock,可以同时对多个互斥量上锁,如果已经上锁会抛出异常并unlock解锁这些互斥量,或者使用刚才的std::scoped_lock,提供与lock_guard同样的RAII包装.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void swap(Xa& lhs, Xa& rhs) {
if (&lhs == &rhs) return;
std::lock(lhs.m, rhs.m);
std::lock_guard<std::mutex> lock1{lhs.m,std::adopt_lock};
std::lock_guard<std::mutex> lock2{rhs.m,std::adopt_lock};
swap(lhs.object, rhs.object);
}


void swap(Xa& lhs, Xa& rhs) {
if (&lhs == &rhs) return;
std::scoped_lock(lhs.m, rhs.m);
swap(lhs.object, rhs.object);
}

使用 std::scoped_lock 可以将所有 std::lock 替换掉,减少错误发生

Tips for avoiding dead lock

避免嵌套锁

线程获取一个锁时,就别再获取第二个锁.每个线程只持有一个锁,自然不会产生死锁.如果必须要获取多个锁,使用 std::lockstd::scoped_lock

避免在持有锁时调用外部代码

因为代码是外部提供的,所以没办法确定外部要做什么.外部程序可能做任何事情,包括获取锁.在持有锁的情况下,如果用外部代码要获取一个锁,就会违反第一个指导意见,并造成死锁(有时这是无法避免的)

使用固定顺序获取锁

避免死锁

std::unique_lock

unique_lock更加灵活,它不能拷贝,内部有一个_Owns变量表明是否有锁的拥有权(或者说是否已经上锁),默认构造函数调用时owns为false并上锁

1
2
3
4
5
6
_NODISCARD_CTOR_LOCK explicit unique_lock(_Mutex& _Mtx)
: _Pmtx(_STD addressof(_Mtx)), _Owns(false) { // construct and lock
_Pmtx->lock();
_Owns = true;
}

此外还有std::defer_lockstd::adopt_lock分别表示没有上锁(_Owns为false),构造函数中不会上锁和已经上锁(_Owns为true),,构造函数中不会 上锁

1
2
3
4
5
unique_lock(_Mutex& _Mtx, defer_lock_t) noexcept
: _Pmtx(_STD addressof(_Mtx)), _Owns(false) {} // construct but don't lock
lock_guard(_Mutex& _Mtx, adopt_lock_t) noexcept // strengthened
: _MyMutex(_Mtx) {} // construct but don't lock

unique_lock类中也有lock和unlock方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void lock() { // lock the mutex
_Validate();
_Pmtx->lock();
_Owns = true;
}

void unlock() {
if (!_Pmtx || !_Owns) {
_Throw_system_error(errc::operation_not_permitted);
}

_Pmtx->unlock();
_Owns = false;
}

简而言之:

  • 使用 std::defer_lock 构造函数不上锁,要求构造之后上锁
  • 使用 std::adopt_lock 构造函数不上锁,要求在构造之前互斥量上锁
  • 默认构造会上锁,要求构造函数之前和构造函数之后都不能再次上锁

通常建议优先 std::lock_guard,当无法满足你的需求或者显得代码非常繁琐,那么可以考虑使用 std::unique_lock

std::mutex是不能拷贝也不能移动的量,在unique_lock中保存了指向它的指针,而unique_lock是可以移动的,所以可以利用unique_lock转移互斥量(准确地说需要互斥量在这些作用域内存活并通过移动构造、赋值进行转移)

保护共享数据初始化

一些数据在多线程环境下进行初始化时可能会导致多次初始化,也是数据竞争的行为. 可以采用其他方式进行保护.

1
2
3
4
5
6
7
8
9
10
11
struct Data{

};
std::once_flag flag;
std::shared_ptr<Data> data;
void init_resouce() {
data.reset(new Data);
}
void foo() {
std::call_once(flag,init_resouce);
}

std::call_once可以接收可调用对象,传入flag表明之及逆行一次初始化,使得线程安全. 静态局部变量初始化在 C++11 是线程安全

读写锁保护不常更新的数据结构

有时多线程环境下,一个线程基本只用来写,其他线程用来读. 可以使用std::shared_mutex保证写线程独占权和读线程的访问权.

shared_mutex 类是一个同步原语,可用于保护共享数据不被多个线程同时访问.与便于独占访问的其他互斥体类型不同,shared_mutex 拥有两个访问级别:

  • 共享 - 多个线程能共享同一互斥体的所有权.

  • 独占 - 仅一个线程能占有互斥.

若一个线程已获取独占 锁(通过 lock、try_lock,则无其他线程能获取该锁(包括共享的).

若一个线程已获取共享 锁(通过 lock_shared、try_lock_shared),则无其他线程能获取独占 锁,但可以获取共享 锁.

仅当任何线程均未获取独占 锁时,共享 锁能被多个线程获取.

在一个线程内,同一时刻只能获取一个锁(共享独占

std::shared_lock主要是提供了lock_shared的作用,区分了共享锁和独占锁,如果有独占锁,多个线程不管使用共享锁还是独占锁都需要阻塞,反之如果全是共享锁则不会阻塞(因为全是读操作)

1
2
3
4
5
6
7
8
9
10
11
std::map<std::string, std::string> data_;
std::shared_mutex mtx;
void writeData() {
std::lock_guard lg{mtx};
data_["fa"] = "af";
}
void readData() {
std::shared_lock sl{mtx};
auto it = data_.find("aa");
std::cout << (it == data_.end() ? it->second : "");
}

std::recursive_mutex

在同一线程多次lock一个普通mutex,是未定义行为. 如果在一个线程多次lock,另一个线程就一直无法拿到锁了. 使用std::recursive_mutex使得同一线程在lock和unlock次数一样的情况下才会真正释放锁.

它允许同一线程多次锁定同一个互斥量,而不会造成死锁.当同一线程多次对同一个 std::recursive_mutex 进行锁定时,只有在解锁与锁定次数相匹配时,互斥量才会真正释放.但它并不影响不同线程对同一个互斥量进行锁定的情况.不同线程对同一个互斥量进行锁定时,会按照互斥量的规则进行阻塞

在使用迭代函数中使用锁时可以使用这个互斥量

通常不直接调用 unlock(),而是使用std::unique_lock与std::lock_guard管理排他性锁定.

调用lock时所有权层数+1,调用unlock时,如果所有权层数为1,解锁互斥量,否则-1.

new、delete的线程安全性

如果标准达到 C++11,要求下列函数是线程安全的:

  • newdelete 运算符的版本
  • 全局 new 运算符和 delete 运算符的用户替换版本
  • std::calloc、std::malloc、std::realloc、std::aligned_alloc](C++17 起)、std::free

内存分配、释放操作是线程安全,构造和析构不涉及共享资源.而局部对象 p 对于每个线程来说是独立的.换句话说,每个线程都有其自己的 p 对象实例,因此它们不会共享同一个对象,自然没有数据竞争

1
2
3
4
5
T* p = nullptr;
void f(){
p = new T{}; // 存在数据竞争
delete p;
}

如果 p 是全局对象(或者外部的,只要可被多个线程读写),多个线程同时对其进行访问和修改时,就可能会导致数据竞争和未定义行为.因此,确保全局对象的线程安全访问通常需要额外的同步措施,比如互斥量或原子操作.

1
2
3
4
5
6
7
8
9
10
11
12
int n = 1;

struct X{
X(int v){
::n += v;
}
};

void f(){
X* p = new X{ 1 }; // 存在数据竞争
delete p;
}

C++ 只保证了 operator newoperator delete 这两个方面的线程安全

new 表达式线程安全要考虑三方面:operator new、构造函数、修改指针.

delete 表达式线程安全考虑两方面:operator delete、析构函数

线程存储期

线程存储期的对象在线程开始时分配,并在线程结束时释放. 使用thread_local声明变量,声明线程存储期的对象,每一个线程都有独立的 thread_local 对象

1
2
3
4
5
6
7
8
9
10
int global_counter = 0; // 静态存储期
thread_local int thread_local_counter=0; // 线程存储期
void print_counters() {
std::cout<<"global: "<<global_counters++<<'\n';
std::cout<<"thread_local"<<thread_local_counter<<'\n';
}
int main(){
std::thread{print_counters}.join();
std::thread{print_counters}.join();
}

同步操作

条件变量

条件变量有std::condition_variablestd::condition_variable_any,

std::condition_variable是与std::mutex 一起使用的同步原语,它能用于阻塞一个线程,或同时阻塞多个线程,直至另一线程修改共享变量(条件)并通知std::condition_variable

条件变量用于同步,可以阻塞线程并使用notify_one让解除相关线程阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void wait_for_flag() {
std::unique_lock ul{cv_mutex};
// ul.unlock(); and block the thread
cv.wait(ul, [] { return flag; });
// when get notified,relive the thread and lock the mutex
std::cout << "arrived\n";
}

void set_flag_true() {
{
std::lock_guard lg{cv_mutex};
std::cout << "set_flag_true\n";
std::this_thread::sleep_for(1s);
flag = true;
}
cv.notify_all();
}
int main() {
std::jthread t1{wait_for_flag}, t2{wait_for_flag}, t3{set_flag_true};
return 0;
}

此外还有std::condition_variable_any,相对于只在 std::unique_lock上工作的 std::condition_variable,condition_variable_any 能在任何满足可基本锁定要求的锁上工作(只需要lock和unlock方法)

future获得线程结果

如果要获得一个线程处理后的结果,可以通过使用condition_variable同步,cv.notify(),cv.wait(). 但是更好的方式是通过future获取返回值

类模板 std::future 提供访问异步操作结果的机制:

  • (通过std::async,std::packaged_task或 std::promise创建的)异步操作能提供一个 std::future 对象给该异步操作的创建者.

  • 然后,异步操作的创建者可以使用多个方法查询、等待或从 std::future 提取值.若异步操作尚未提供值,则这些方法可能阻塞.

  • 当异步操作准备好发送结果给创建者时,它可以修改与创建者的 std::future 相链接的共享状态

std::thread 没提供直接从线程获取返回值的机制.所以可以使用 std::async 函数模板,使用async与thread类似,默认按值赋值,内部将参数副本转换为右值. future的get和wait方法也是用于同步的,

1
2
3
4
5
6
void f(const int& p) {}
void f2(int& p ){}

int n = 0;
std::async(f, n); // OK! 可以通过编译,不过引用的并非是局部的n
std::async(f2, n); // Error! 无法通过编译

async接受所有可调用对象(函数,类成员方法,仿函数类,lambda),与thread类似,其有不同的异步执行策略,std::launch::defered与std::launch::async

  1. std::launch::async 在不同线程上执行异步任务.
  2. std::launch::deferred 惰性求值,不创建线程,等待 future 对象调用 waitget 成员函数的时候执行任务.

如果从 std::async 获得的 std::future没有被移动或绑定到引用,那么在完整表达式结尾, std::future析构函数将阻塞,直到到异步任务完成.因为临时对象的生存期就在这一行,而对象生存期结束就会调用调用析构函数.

被移动的 std::future 没有所有权,失去共享状态,不能调用 getwait 成员函数. 此外还有valid检查 future 当前是否关联共享状态,即是否当前关联任务.还未关联,或者任务已经执行完(调用了 get()、set()),都会返回 false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct X{
int operator()(int n)const{
return n * n;
}
};
struct Y{
int f(int n)const{
return n * n;
}
};
void f(int& p) { std::cout << &p << '\n'; }

int main(){
Y y;
int n = 0;
auto t1 = std::async(X{}, 10);
auto t2 = std::async(&Y::f,&y,10);
auto t3 = std::async([] {});
auto t4 = std::async(f, std::ref(n));
std::cout << &n << '\n';
}

得到的future使用wait同步等待处理,或者使用get获得结果.

packaged_task

类模板 std::packaged_task 包装任何可调用目标(函数、lambda 表达式、bind 表达式或其他函数对象),使得能异步调用它.其返回值或所抛异常被存储于能通过 std::future对象访问的共享状态中.

其重载了()操作符,所以本身也是一个可调用目标,可以传给一个线程. 如果不使用多线程

1
2
3
4
5
6
7
8
void f(int){};

int main() {
auto task_2{std::packaged_task<int(int)>{f}};
auto fut_2{task_2.get_future()};
task_2(10);
std::cout << fut_2.get() << '\n';
}

上面任务并不会在线程中执行,所以并没有并发、异步. 所以需要结合多线程,但注意packaged_task不能拷贝,只能移动.

1
2
3
4
5
6
7
auto task_2{std::packaged_task<int(int)>{foo}};
// std::packaged_task<int(int)> task_2{f};
auto fut_2{task_2.get_future()};
std::thread fut_t{std::move(task_2), 10};
// task_2(10);
fut_t.join();
std::cout << fut_2.get() << '\n';

std::packaged_task 也可以在线程中传递,在需要的时候获取返回值,而非将它自己作为可调用对象. 也就是说thread启动一个可调用对象,这个可调用对象中会调用这个packaged_task,可以通过future获得值.

使用std::promise设置值

如果要设置一个值,可以传递reference_wrapper\然后join线程即可获取值.

1
2
3
4
void set_val(int &n) { n = 20; }
int tn{0};
std::jthread jd{set_val, std::ref(tn)};
jd.join();

但是考虑到这样也许并不好,不仅必须传入通过传入引用、指针,控制颗粒度也不够.

类模板 std::promise 提供用以存储一个值或一个异常,之后通过 std::promise 对象所创建的std::future对象异步获得.注意 std::promise 只应当使用一次.

每个promise都与一个共享状态 关联,其中含有一些状态信息和一个结果,它可能尚未求值、已求值为一个值(可能为 void),或者求值为一个异常.promise可以对共享状态做三件事:

  • 使就绪:promise存储结果或异常于共享状态.标记共享状态为就绪,并除阻在该共享状态所关联的未来体上等待的任何线程.
  • 释放:promise放弃其对共享状态的引用.若这是最后一个这种引用,则销毁共享状态.除非这是 std::async所创建的未就绪的共享状态,否则此操作不阻塞.
  • 抛弃:promise存储以 std::future_errc::broken_promise 为错误码的 std::future_error 类型的异常,令共享状态为就绪,然后释放
1
2
3
4
5
6
7
void f(std::promise<int> obj,int num){
obj.set_value(num*num);
}
std::promise<int> p;
auto fut = p.get_future();
std::jthread t{f,std::move(p),3};
int result = p.get();

std::promise 只能移动,不可复制,所以需要使用std::move.

在主线程中通过与其关联的 future 对象的 get() 成员函数获取这个值,如果promise的值还没有被设置,那么将阻塞当前线程

除了返回一般值外还可以设置异常,但一个promise只能要么设置异常要么设置值,如果设置异常,则通过在promise所在函数中使用try与get进行捕获.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void calculate_square(std::promise<int> promiseObj, int num) {
std::this_thread::sleep_for(100ms);
if (!num) {
promiseObj.set_value(num * num);
} else {
promiseObj.set_exception(std::current_exception());
}
}
std::thread t1t(calculate_square, std::move(promise), num);
try {
std::cout << "等待线程执行...\n";
int result = fut_3.get(); // 获取结果或等待异常
std::cout << "Result is " << result << std::endl;
} catch (std::exception &e) {
std::cerr << "来自线程的异常" << e.what() << '\n';
}

t1t.join();

future的状态变化

future保有共享状态,只能移动,调用 get 函数后,future对象会失去共享状态,std::future 所引用的共享状态不与另一异步返回对象共享

  • 移动语义:因为移动操作标志着所有权的转移,意味着 future 不再拥有共享状态.getwait 函数要求 future 对象拥有共享状态,否则会抛出异常.
  • 共享状态失效:调用 get 成员函数时,future 对象必须拥有共享状态,但调用完成后,它就会失去共享状态,不能再次调用 get.

future 是一次性的,它的结果只能被一个线程获取.get() 成员函数只能调用一次,当结果被某个线程获取后,std::future 就无法再用于其他线程.

使用shared_future共享状态

目前shared_xx学习到的有,shared_ptr,shared_mutex,shared_lock,现在又有了shared_future. unique_ptr与unique_lock,future都表示独占所有权(只能移动),而shared_xx本身可以复制,并且可以共享.

主要用于在不同线程中共享一个任务/线程中的数据,它也通过wait和get获取数据.

通过future.share或直接通过future移动构造shared_future

具体使用通过传入shared_future的拷贝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int task(int num){ return num*num;}

int main() {
auto fut = std::async(task,10);
std::shared_future<int> fut_shared = fut.share();
std::jthread thread1{[fut_shared]{
int result = fut_shared.get();
return result*2;
}}
std::jthread thread2{[fut_shared]{
int result = fut_shared.get();
return result*2;
}}
}

按复制捕获 std::shared_future 对象,每个线程都有一个 shared_future 的副本,这样不会出现数据竞争问题.

限时等待

使用wait_forwait_until进行限时等待,可以通过future或者条件变量等, 限时等待用于在while循环中等待,可以判断结果,与std::future_status,std::cv_status

Concurrency in C++20

信号量

C++ 提供了两个信号量类型:std::counting_semaphorestd::binary_semaphore

信号量是更轻量的同步原语.

mutex,条件变量都是同步原语. 但mutex常用于互斥解决数据竞争,而条件

提供releaseacquire两种方法,分别增加内部计数器并解除获得者以及减少内部计数器或阻塞到直至能如此

信号量常用于发信/提醒而非互斥,通过初始化该信号量为 0 从而阻塞尝试 acquire() 的接收者,直至提醒者通过调用 release(n) “发信”.在此方面可把信号量当作条件变量的替代品,通常它有更好的性能

counting_semaphore能设置最大信号量值,binary_semaphore最大值就是1,相当于控制了同时访问者.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
std::counting_semaphore<3> semaphore{3};

void handle_request(int request_id) {
std::cout << "进入handle_request尝试获取信号量\n";
semaphore.acquire(); // similar to cv.wait(lk)??
std::cout << "成功获取信号量\n";
// do something
std::this_thread::sleep_for(1s);

std::random_device rd;
std::mt19937 gen{rd()};
std::uniform_int_distribution dis{1, 10};
int process_time = dis(gen);
std::this_thread::sleep_for(std::chrono::seconds{process_time});
std::cout << std::format("请求 {} 已被处理\n", request_id);
semaphore.release();
}

std::latch

信号量方便同步,与条件变量类似. 而latch与barrier是线程协调机制,阻塞已知大小的线程组直至该组中的所有线程到达该屏障.

允许任何数量的线程阻塞直至期待数量的线程到达. latch是单次使用的线程屏障,latch不能重复使用,它会等到需要的线程的数量.

latch 类维护着一个 std::ptrdiff_t 类型的计数,且只能减少计数,无法增加计数.在创建对象的时候初始化计数器的值.线程可以阻塞,直到 latch 对象的计数减少到零.由于无法增加计数,这使得 latch 成为一种**单次使用的屏障

1
2
3
4
5
6
7
8
9
std::latch work_start{3};
void work() {
std::cout << "等待其他线程执行\n";
work_start.wait();
std::cout << "任务开始执行\n";
}
std::jthread thread{work};
std::this_thread::sleep_for(1s);
work_start.count_down();

count_down默认将值-1,直到为0时,wait解除阻塞. 此外也有arrive_and_wait相当于count_down(n);wait();,这样在可调用对象内部直接使用

std::barrier

可复用的线程屏障,可以在阶段完成之后将计数重置为构造时传递的值.

不同于 std::latch,屏障是可重用的:一旦到达的线程组被解除阻塞,即可重用同一屏障.与 std::latch 不同,会在线程解除阻塞前执行一个可能为空的可调用对象.

barrier也有wait与arrive,它能够多次使用,也就是说如果创建10个线程,每个线程的可调用对象使用barrier值设置为3,那么就会有阻塞,因为9个线程结束,最后一个线程减少barrier设置的值,但依旧不为0. 如果是latch,当内部计数值为0时还调用count_down是未定义行为

arrive_and_wait() 会在期待计数减少至 0 时调用我们构造 barrier 对象时传入的 lambda 表达式,并解除所有在阶段同步点上阻塞的线程.之后重置期待计数为构造中指定的值.屏障的一个阶段就完成了. 还有arrive_and_drop会将当前与最大的计数均-1.

std::barrier 要求其函数对象类型必须是不抛出异常的.

内存模型与原子操作

  • 内存模型定义了多线程程序中,读写操作如何在不同线程之间可见,以及这些操作在何种顺序下执行.内存模型确保程序的行为在并发环境下是可预测的.
  • 原子操作即不可分割的操作.系统的所有线程,不可能观察到原子操作完成了一半

std::atomic

c++标准定义了原子类型,这些类型的操作都是原子的,语言定义中只有这些类型的操作是原子的,虽然也可以用互斥量来模拟原子操作.

每个 std::atomic 模板的实例化和全特化均定义一个原子类型.如果一个线程写入原子对象,同时另一线程从它读取,那么行为有良好定义(使用load和store),std::atomic 既不可复制也不可移动.

标准原子类型的实现通常包括一个 is_lock_free() 成员函数,允许用户查询特定原子类型的操作是否是通过直接的原子指令实现(返回 true),还是通过锁来实现(返回 false)

也可以通过is_always_lock_free和一些宏来检查

1
2
3
4
std::atomic<int> aint = 10;
aint.is_lock_free(); // 成员函数
aint.is_always_lock_free; //编译器常量 constexpr
std::cout<<ATOMIC_INT_LOCK_FREE;

always_lock_free意味着一定无锁,ATOMIC_INT_LOCK_FREE的值若为0则一定有锁,为1则有时无锁,为2则一定无锁.

在实际应用中,如果一个类型的原子操作总是无锁的,可以更放心地在性能关键的代码路径中使用它.

如果发现某些原子类型在目标平台上是有锁的,我们可以考虑以下优化策略:

  1. 使用不同的数据结构:有时可以通过改变数据结构来避免对原子操作的依赖.
  2. 减少原子操作的频率:通过批处理等技术,减少对原子操作的调用次数.
  3. 使用更高效的同步机制:在一些情况下,其它同步机制(如读写锁)可能比原子操作更高效.

其实很多时候根本没这种性能的担忧,很多时候使用原子对象只是为了简单方便,比如 std::atomic<bool> 表示状态、std::atomic<int> 进行计数等.即使它们是用了锁,那也是封装好了的,起码用着方便,而不需要在代码中引入额外的互斥量来保护,更加简洁.这也是很正常的需求,各位不但要考虑程序的性能,同时也要考虑代码的简洁性、易用性.即使使用原子类型无法带来效率的提升,那也没有负提升.

常用的atomic特化有int,bool,flag,等

1
2
3
4
5
6
7
8
9
10
11
using atomic_char   = atomic<char>;
using atomic_schar = atomic<signed char>;
using atomic_uchar = atomic<unsigned char>;
using atomic_short = atomic<short>;
using atomic_ushort = atomic<unsigned short>;
using atomic_int = atomic<int>;
using atomic_uint = atomic<unsigned int>;
using atomic_long = atomic<long>;
using atomic_ulong = atomic<unsigned long>;
using atomic_llong = atomic<long long>;
using atomic_ullong = atomic<unsigned long long>;

原子类型常用方法包括load,store,exchange等,不同特化也有不同方法. 可以为自定义类型创建atomic,需要满足可复制构造,可复制赋值以及可平凡复制

1
2
3
4
5
static_assert(std::is_trivially_copyable<trivial_type>::value, "");
static_assert(std::is_copy_constructible<trivial_type>::value, "");
static_assert(std::is_move_constructible<trivial_type>::value, "");
static_assert(std::is_copy_assignable<trivial_type>::value, "");
static_assert(std::is_move_assignable<trivial_type>::value, "");

原子类型的操作函数有一个内存序参数,对原子对象的访问可以建立线程间同步,并按std::memory_order对非原子内存访问定序. 任何 std::atomic类型,初始化不是原子操作,其他方法是原子操作.与大多数赋值运算符不同,原子类型的赋值运算不返回到它的左侧参数的引用.它们会返回存储值的副本

std::atomic_flag

std::atomic_flag 是一种原子布尔类型.与所有std::atomic的特化不同,它保证是无锁的.与std::atomic\ 不同,std::atomic_flag 不提供加载或存储操作.

1
2
std::atomic_flag flag{};
bool r = flag.test_and_set();

当标志对象已初始化,它只能做三件事情:销毁、清除、设置.这些操作对应的函数分别是:

  1. clear() (清除):将标志对象的状态原子地更改为清除(false)
  2. test_and_set(测试并设置):将标志对象的状态原子地更改为设置(true),并返回它先前保有的值.
  3. 销毁:对象的生命周期结束时,自动调用析构函数进行销毁操作.

适合使用atomic_flag做一个自旋锁,也就是通过while忙等

1
2
3
4
5
6
7
8
9
10
11
12
class spinlock_mutex {
std::atomic_flag flag{};
public:
spinlock_mutex()noexcept = default;
void lock()noexcept {
while (flag.test_and_set(std::memory_order_acquire));
}

void unlock()noexcept {
flag.clear(std::memory_order_release);
}
};

std::atomic\

布尔原子类型,但比atomic_flag多load,store等方法,

1
2
3
4
std::atomic<bool> b{true};
b.load(true);
b = false;//表达式值为false
auto value = (b = false);

exchange以 desired 原子地替换底层值.操作为读-修改-写操作.根据 order 的值影响内存

1
2
3
4
std::atomic<bool> b{true};
bool x = b.load();
b.store(true);
x = b.exchange(false); // x->true

compare_exchange_weakcompare_exchange_strong

原子地比较 this 和 expected 的对象表示(C++20 前),值表示 (C++20 起).如果它们逐位相等,那么以 desired 替换前者(进行读修改写操作).否则,将 this 中的实际值加载进 expected(进行加载操作).

也就是当前值与预期一致时,存储新值否则得到当前值存储在expected中

compare_exchange_weak:尝试将原子对象的当前值与预期值进行比较,如果相等则将其更新为新值并返回 true;否则,将原子对象的值加载进 expected(进行加载操作)并返回 false.此操作可能会由于某些硬件的特性而出现假失败,需要在循环中重试

1
2
3
4
std::atomic<bool> flag{ false };
bool expected = false;

while (!flag.compare_exchange_weak(expected, true));

compare_exchange_strong:类似于 compare_exchange_weak,但不会出现假失败,因此不需要重试.适用于需要确保操作成功的场合.

1
2
3
4
5
6
7
8
9
10
11
12
std::atomic<bool> flag{ false };
bool expected = false;

void try_set_flag() {
// 尝试将 flag 设置为 true,如果当前值为 false
if (flag.compare_exchange_strong(expected, true)) {
std::cout << "flag 为 false,设为 true.\n";
}
else {
std::cout << "flag 为 true, expected 设为 true.\n";
}
}

compare_exchange_weakcompare_exchange_strong 允许指定成功和失败情况下的内存序.这意味着你可以根据成功或失败的情况,为原子操作指定不同的内存序

std::atmoc

std::atomic<T*> 是一个原子指针类型,T 是指针所指向的对象类型.操作是针对 T 类型的指针进行的.虽然 std::atomic<T*> 不能被拷贝和移动,但它可以通过符合类型的指针进行构造和赋值.

除了常见的load,store,exchange等,还有fetch_add,fetch_sub等,确保多线程下的指针操作.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct Foo {};

Foo array[5]{};
std::atomic<Foo*> p{ array };

// p 加 2,并返回原始值
Foo* x = p.fetch_add(2);
assert(x == array);
assert(p.load() == &array[2]);

// p 减 1,并返回原始值
x = (p -= 1);
assert(x == &array[1]);
assert(p.load() == &array[1]);

// 函数也允许内存序作为给定函数的参数
p.fetch_add(3, std::memory_order_release);

std::atomic\

若多个执行线程不同步地同时访问同一 std::shared_ptr 对象,且任何这些访问使用了 shared_ptr 的非 const 成员函数,则将出现数据竞争,除非通过 std::atomic<std::shared_ptr> 的实例进行所有访问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
std::atomic<std::shared> data{};
void writer() {
for(int i=0;i<10;++i) {
std::shared_ptr<Data> new_data = std::make_shared<Data>(i);
data.store(new_data);
}
}
void reader() {
for (int i = 0; i < 10; ++i) {
if (auto sp = data.load()) {
std::cout << "读取线程值: " << sp->get_value() << std::endl;
}
else {
std::cout << "没有读取到数据" << std::endl;
}
std::this_thread::sleep_for(10ms);
}
}

最后原子类型还提供了waitnotify_xx的方法,wait进行原子等待操作,如果值与this->load()值表示相同,则阻塞直到 this 被 notify_one() 或 notify_all() 提醒,如果不同直接返回. notif_xx进行原子提醒操作,如果有线程被 this 上的原子等待操作(即 wait())阻塞,那么解除锁定这种线程;否则不做任何事

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
std::atomic<std::shared_ptr<int>> ptr = std::make_shared<int>();

void wait_for_wake_up(){
std::osyncstream{ std::cout }
<< "线程 "
<< std::this_thread::get_id()
<< " 阻塞,等待更新唤醒\n";

// 等待 ptr 变为其它值
ptr.wait(ptr.load());

std::osyncstream{ std::cout }
<< "线程 "
<< std::this_thread::get_id()
<< " 已被唤醒\n";
}

void wake_up(){
std::this_thread::sleep_for(5s);

// 更新值并唤醒
ptr.store(std::make_shared<int>(10));
ptr.notify_one();
}

std::atomic_ref

std::atomic_ref 类模板对它引用的对象应用原子操作.在 std::atomic_ref 对象的生存期中,认为它引用的对象是原子对象.如果一个线程写入原子对象,同时另一线程从它读取,那么其行为有良好定义.另外,对原子对象的访问可以建立线程间同步,和按 std::memory_order 所指定定序非原子内存访问.

对象的生存期必须超出所有引用该对象的 std::atomic_ref 的生存期.任何 std::atomic_ref 实例所引用的对象仍存在时,必须只通过这些 std::atomic_ref 实例排他地访问该对象.std::atomic_ref 对象所引用对象的任何子对象均不可同时被任何其他 std::atomic_ref 对象引用.

通过 std::atomic_ref 应用到对象的原子操作,相对于通过任何其他引用同一对象的 std::atomic_ref 应用的操作来说都是原子的.

1
2
3
std::atomic<std::shared_ptr<int>> ptr = std::make_shared<int>(10);
std::atomic_ref<int> ref{ *ptr.load() };
ref = 100; // 原子地赋 100 给被引用的对象

std::memory_order

std::memory_order 指定内存访问,包括常规的非原子内存访问,如何围绕原子操作排序.在没有任何约束的多处理器系统上,多个线程同时读或写数个变量时,一个线程能观测到变量值更改的顺序不同于另一个线程写它们的顺序.实际上,更改的顺序甚至能在多个读取线程间相异.一些类似的效果还能在单处理器系统上出现,因为内存模型允许编译器进行变换.

库中所有原子操作的默认行为提供序列一致定序.该默认行为可能有损性能,不过可以给予库的原子操作额外的 std::memory_order 实参,以指定确切的约束,在原子性外,编译器和处理器还必须强制该操作.

std::memory_order 是一个枚举类型,用来指定原子操作的内存顺序,影响这些操作的行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
typedef enum memory_order {
memory_order_relaxed,
memory_order_consume,
memory_order_acquire,
memory_order_release,
memory_order_acq_rel,
memory_order_seq_cst
} memory_order;

// C++20 起则为:

enum class memory_order : /* 未指明 */ {
relaxed, consume, acquire, release, acq_rel, seq_cst
};
inline constexpr memory_order memory_order_relaxed = memory_order::relaxed;
inline constexpr memory_order memory_order_consume = memory_order::consume;
inline constexpr memory_order memory_order_acquire = memory_order::acquire;
inline constexpr memory_order memory_order_release = memory_order::release;
inline constexpr memory_order memory_order_acq_rel = memory_order::acq_rel;
inline constexpr memory_order memory_order_seq_cst = memory_order::seq_cst;

这 6 个常量,每一个常量都表示不同的内存次序

大体来说可以将它们分为三类.

  1. memory_order_relaxed 宽松定序:不是定序约束,仅对此操作要求原子性.

  2. memory_order_seq_cst 序列一致定序,这是库中所有原子操作的默认行为,也是最严格的内存次序,是绝对安全的.

参考资料

  1. 使用线程 | 现代C++并发编程教程 (mq-b.github.io)
  2. C++ Concurrency in Action
-------------本文结束感谢您的阅读-------------
感谢阅读.

欢迎关注我的其它发布渠道