C++11 的标准异步库至少包含下面内容:
- std::promise
- std::future
- std::packaged_task
- std::async
它们的作用要从std::thread
的一个示例说起:
int a = 1;
std::thread thread([a](int b) {
return a + b;
}, 2);
thread.join();
上面这段代码会新创建一个线程来计算1 + 2
。但我们没办法获取线程的计算返回值。异步库则封装了新建线程执行任务,同时获取任务返回值的操作。比如可以使用std::promise
和std::future
:
int a = 1;
std::promise<int> res;
std::future<int> future = res.get_future();
std::thread thread([a](int b, std::promise<int>& res) {
res.set_value(a + b);
}, 2, res);
std::cout << future.get() << std::endl;
也可以使用std::packaged_task
和std::future
:
int a = 1;
std::packaged_task<int(int)> task = [a](int b) {
return a + b;
};
std::future<int> future = task.get_future();
task(2);
std::cout << future.get() << std::endl; // 2
而使用std::async
可以大大简化上面的工作:
std::future<int> future3 = std::async([a](int b) {
return a + b;
}, 2);
std::future<int> future4 = std::async([a](int b) {
return a + b;
}, 3);
std::cout << future3.get() + future4.get() << std::endl; // 7
std::async
的源代码位于https://gcc.gnu.org/onlinedocs/gcc-7.5.0/libstdc++/api/a00074_source.html,其函数实现摘抄如下:
template<typename _Fn, typename... _Args>
std::future<__async_result_of<_Fn, _Args...>> async(launch __policy, _Fn&& __fn, _Args&&... __args) {
std::shared_ptr<__future_base::_State_base> __state;
auto caller = std::thread::__make_invoker(std::forward<_Fn>(__fn), std::forward<_Args>(__args)...);
if ((__policy & launch::async) == launch::async) {
__try {
__state = std::make_shared<__future_base::_Async_state_impl>(std::move(caller));
}
catch(const system_error& __e) {
if (__e.code() != errc::resource_unavailable_try_again || (__policy & launch::deferred) != launch::deferred)
throw;
}
}
if (!__state) {
__state = std::make_shared<__future_base::_Deferred_state>(std::move(caller));
}
return future<__async_result_of<_Fn, _Args...>>(__state);
}
如果lanch policy
含std::launch::async
,则在下面构造类中启动线程:
template<typename _BoundFn, typename _Res>
class __future_base::_Async_state_impl final : public __future_base::_Async_state_commonV2 {
public:
explicit _Async_state_impl(_BoundFn&& __fn)
: _M_result(new _Result<_Res>()), _M_fn(std::move(__fn))
{
_M_thread = std::thread{ [this] {
__try {
_M_set_result(_S_task_setter(_M_result, _M_fn));
}
__catch (const __cxxabiv1::__forced_unwind&) {
// make the shared state ready on thread cancellation
if (static_cast<bool>(_M_result))
this->_M_break_promise(std::move(_M_result));
__throw_exception_again;
}
}
};
}
~_Async_state_impl() { if (_M_thread.joinable()) _M_thread.join(); }
private:
_Ptr_type _M_result;
_BoundFn _M_fn;
}
如果有异常或者lanch policy
含std::launch::deferred
,不会启动线程,只是保存caller
待用:
template<typename _BoundFn, typename _Res>
class __future_base::_Deferred_state final : public __future_base::_State_base
{
public:
explicit _Deferred_state(_BoundFn&& __fn) : _M_result(new _Result<_Res>()), _M_fn(std::move(__fn))
{ }
private:
typedef __future_base::_Ptr<_Result<_Res>> _Ptr_type;
_Ptr_type _M_result;
_BoundFn _M_fn;
virtual void _M_complete_async() {
_M_set_result(_S_task_setter(_M_result, _M_fn), true);
}
virtual bool _M_is_deferred_future() const { return true; }
};
【未完待续】
Q. E. D.