#include "src/future.h" | #include "src/future.h" | ||||
#include "src/promise.h" | #include "src/promise.h" | ||||
#include "src/awaitable.h" | #include "src/awaitable.h" | ||||
#include "src/generator.h" | |||||
#include "src/rf_task.h" | #include "src/rf_task.h" | ||||
#include "src/utils.h" | #include "src/utils.h" | ||||
#include "src/channel.h" | #include "src/channel.h" | ||||
#include "src/event.h" | #include "src/event.h" | ||||
#include "src/generator.h" | |||||
#include "src/sleep.h" | #include "src/sleep.h" | ||||
#include "src/when.h" | #include "src/when.h" |
{ | { | ||||
struct state_event_t; | struct state_event_t; | ||||
//仿照cppcoro的event是行不通的。 | |||||
//虽然cppcoro的event的触发和等待之间是线程安全的,但是并不能实现只触发指定数量。并且多线程触发之间是不安全的。 | |||||
//所以,还得用锁结构来实现(等待实现,今日不空)。 | |||||
struct event_v2_impl : public std::enable_shared_from_this<event_v2_impl> | struct event_v2_impl : public std::enable_shared_from_this<event_v2_impl> | ||||
{ | { | ||||
event_v2_impl(bool initially) noexcept; | event_v2_impl(bool initially) noexcept; | ||||
{ | { | ||||
scoped_lock<detail::event_v2_impl::lock_type> lock_(_event->_lock); | scoped_lock<detail::event_v2_impl::lock_type> lock_(_event->_lock); | ||||
if (_event->try_wait_one()) | |||||
if ((_value = _event->try_wait_one()) != false) | |||||
return false; | return false; | ||||
_state = new detail::state_event_t(_value); | _state = new detail::state_event_t(_value); | ||||
{ | { | ||||
scoped_lock<detail::event_v2_impl::lock_type> lock_(_event->_lock); | scoped_lock<detail::event_v2_impl::lock_type> lock_(_event->_lock); | ||||
if (_event->try_wait_one()) | |||||
if ((_value = _event->try_wait_one()) != false) | |||||
return false; | return false; | ||||
_state = new detail::state_event_t(_value); | _state = new detail::state_event_t(_value); |
//---------------------------------------------------------------------------------------------- | //---------------------------------------------------------------------------------------------- | ||||
template<class _Ty> | |||||
template<class _Ty, class = std::void_t<>> | |||||
struct task_t; | struct task_t; | ||||
template<class _Ty> | template<class _Ty> | ||||
struct task_t<future_t<_Ty>> : public task_base_t | |||||
struct task_t<_Ty, std::void_t<is_future<std::remove_reference_t<_Ty>>>> : public task_base_t | |||||
{ | { | ||||
using value_type = _Ty; | |||||
using future_type = future_t<value_type>; | |||||
using future_type = std::remove_reference_t<_Ty>; | |||||
using value_type = typename future_type::value_type; | |||||
using state_type = state_t<value_type>; | using state_type = state_t<value_type>; | ||||
task_t() = default; | task_t() = default; | ||||
task_t(future_type&& f) | |||||
task_t(future_type& f) | |||||
{ | { | ||||
initialize(std::forward<future_type>(f)); | |||||
initialize(f); | |||||
} | } | ||||
protected: | protected: | ||||
void initialize(future_type&& f) | |||||
void initialize(future_type& f) | |||||
{ | { | ||||
_state = f._state.get(); | _state = f._state.get(); | ||||
} | } | ||||
using state_type = state_generator_t; | using state_type = state_generator_t; | ||||
task_t() = default; | task_t() = default; | ||||
task_t(future_type&& f) | |||||
task_t(future_type& f) | |||||
{ | { | ||||
initialize(std::forward<future_type>(f)); | |||||
initialize(f); | |||||
} | } | ||||
protected: | protected: | ||||
void initialize(future_type&& f) | |||||
void initialize(future_type& f) | |||||
{ | { | ||||
_state = f.detach_state(); | _state = f.detach_state(); | ||||
} | } | ||||
ctx_task_t(context_type ctx) | ctx_task_t(context_type ctx) | ||||
: _context(std::move(ctx)) | : _context(std::move(ctx)) | ||||
{ | { | ||||
this->initialize(_context()); | |||||
decltype(auto) f = _context(); | |||||
this->initialize(f); | |||||
} | } | ||||
}; | }; | ||||
} | } |
inline void operator + (_Ty&& t_) | inline void operator + (_Ty&& t_) | ||||
{ | { | ||||
if constexpr (is_callable_v<_Ty>) | if constexpr (is_callable_v<_Ty>) | ||||
new_task(new ctx_task_t<_Ty>(std::forward<_Ty>(t_))); | |||||
new_task(new ctx_task_t<_Ty>(t_)); | |||||
else | else | ||||
new_task(new task_t<_Ty>(std::forward<_Ty>(t_))); | |||||
new_task(new task_t<_Ty>(t_)); | |||||
} | } | ||||
inline bool empty() const | inline bool empty() const |
RESUMEF_NS | RESUMEF_NS | ||||
{ | { | ||||
struct switch_scheduler_t | |||||
struct switch_scheduler_awaitor | |||||
{ | { | ||||
switch_scheduler_t(scheduler_t* sch) | |||||
switch_scheduler_awaitor(scheduler_t* sch) | |||||
:_scheduler(sch) {} | :_scheduler(sch) {} | ||||
switch_scheduler_t(const switch_scheduler_t&) = default; | |||||
switch_scheduler_t(switch_scheduler_t&&) = default; | |||||
switch_scheduler_awaitor(const switch_scheduler_awaitor&) = default; | |||||
switch_scheduler_awaitor(switch_scheduler_awaitor&&) = default; | |||||
switch_scheduler_t& operator = (const switch_scheduler_t&) = default; | |||||
switch_scheduler_t& operator = (switch_scheduler_t&&) = default; | |||||
switch_scheduler_awaitor& operator = (const switch_scheduler_awaitor&) = default; | |||||
switch_scheduler_awaitor& operator = (switch_scheduler_awaitor&&) = default; | |||||
bool await_ready() noexcept | bool await_ready() noexcept | ||||
{ | { | ||||
scheduler_t* _scheduler; | scheduler_t* _scheduler; | ||||
}; | }; | ||||
inline switch_scheduler_t operator co_await(scheduler_t& sch) | |||||
inline switch_scheduler_awaitor operator co_await(scheduler_t& sch) | |||||
{ | { | ||||
return { &sch }; | return { &sch }; | ||||
} | } |
RESUMEF_NS | RESUMEF_NS | ||||
{ | { | ||||
template<class _Ty> | |||||
struct is_coroutine_handle : std::false_type {}; | |||||
template<class _PromiseT> | template<class _PromiseT> | ||||
struct is_promise : std::false_type {}; | |||||
struct is_coroutine_handle<std::experimental::coroutine_handle<_PromiseT>> : std::true_type {}; | |||||
template<class _Ty> | template<class _Ty> | ||||
struct is_promise<promise_t<_Ty>> : std::true_type {}; | |||||
constexpr bool is_coroutine_handle_v = is_coroutine_handle<remove_cvref_t<_Ty>>::value; | |||||
template<class _Ty> | template<class _Ty> | ||||
constexpr bool is_promise_v = is_promise<remove_cvref_t<_Ty>>::value; | |||||
constexpr bool is_valid_await_suspend_return_v = std::is_void_v<_Ty> || std::is_same_v<_Ty, bool> || is_coroutine_handle_v<_Ty>; | |||||
template<class _PromiseT> | |||||
struct any_type | |||||
{ | |||||
template<class U> | |||||
operator U () const | |||||
{ | |||||
return std::declval<U>(); | |||||
} | |||||
}; | |||||
template<class _Ty, class = std::void_t<>> | |||||
struct is_awaitor : std::false_type {}; | |||||
template<class _Ty> | |||||
struct is_awaitor | |||||
<_Ty, | |||||
std::void_t< | |||||
decltype(std::declval<_Ty>().await_ready()) | |||||
, decltype(std::declval<_Ty>().await_suspend(std::declval<std::experimental::coroutine_handle<promise_t<>>>())) | |||||
, decltype(std::declval<_Ty>().await_resume()) | |||||
> | |||||
> | |||||
: std::bool_constant< | |||||
std::is_constructible_v<bool, decltype(std::declval<_Ty>().await_ready())> | |||||
&& is_valid_await_suspend_return_v< | |||||
decltype(std::declval<_Ty>().await_suspend(std::declval<std::experimental::coroutine_handle<promise_t<>>>())) | |||||
> | |||||
> | |||||
{}; | |||||
template<class _Ty> | |||||
struct is_awaitor<_Ty&> : is_awaitor<_Ty> {}; | |||||
template<class _Ty> | |||||
struct is_awaitor<_Ty&&> : is_awaitor<_Ty> {}; | |||||
template<class _Ty> | |||||
constexpr bool is_awaitor_v = is_awaitor<remove_cvref_t<_Ty>>::value; | |||||
template<class _Ty, class = std::void_t<>> | |||||
struct is_future : std::false_type {}; | struct is_future : std::false_type {}; | ||||
template<class _Ty> | template<class _Ty> | ||||
struct is_future<future_t<_Ty>> : std::true_type {}; | |||||
struct is_future<_Ty, | |||||
std::void_t< | |||||
decltype(std::declval<_Ty>()._state), | |||||
typename _Ty::value_type, | |||||
typename _Ty::state_type, | |||||
typename _Ty::promise_type | |||||
> | |||||
> : std::true_type{}; | |||||
template<class _Ty> | template<class _Ty> | ||||
constexpr bool is_future_v = is_future<remove_cvref_t<_Ty>>::value; | constexpr bool is_future_v = is_future<remove_cvref_t<_Ty>>::value; | ||||
template<class _G> | |||||
template<class _Ty> | |||||
struct is_promise : std::false_type {}; | |||||
template<class _Ty> | |||||
struct is_promise<promise_t<_Ty>> : std::true_type {}; | |||||
template<class _Ty> | |||||
constexpr bool is_promise_v = is_promise<remove_cvref_t<_Ty>>::value; | |||||
template<class _Ty> | |||||
struct is_generator : std::false_type {}; | struct is_generator : std::false_type {}; | ||||
template <typename _Ty, typename _Alloc> | |||||
template <class _Ty, class _Alloc> | |||||
struct is_generator<generator_t<_Ty, _Alloc>> : std::true_type {}; | struct is_generator<generator_t<_Ty, _Alloc>> : std::true_type {}; | ||||
template<class _Ty> | template<class _Ty> | ||||
constexpr bool is_generator_v = is_generator<remove_cvref_t<_Ty>>::value; | constexpr bool is_generator_v = is_generator<remove_cvref_t<_Ty>>::value; | ||||
template<class _PromiseT> | |||||
template<class _Ty> | |||||
struct is_awaitable : std::false_type {}; | struct is_awaitable : std::false_type {}; | ||||
template<class _Ty> | template<class _Ty> | ||||
struct is_awaitable<awaitable_t<_Ty>> : std::true_type {}; | struct is_awaitable<awaitable_t<_Ty>> : std::true_type {}; | ||||
template<class _Ty> | template<class _Ty> | ||||
constexpr bool is_awaitable_v = is_awaitable<remove_cvref_t<_Ty>>::value; | constexpr bool is_awaitable_v = is_awaitable<remove_cvref_t<_Ty>>::value; | ||||
template<class _Ty> | |||||
constexpr bool is_await_suspend_v = is_future_v<_Ty> | |||||
|| is_awaitable_v<_Ty> | |||||
|| std::is_same_v<remove_cvref_t<_Ty>, switch_scheduler_t> | |||||
; | |||||
//copy from cppcoro | |||||
template<class T> | |||||
auto get_awaiter_impl(T&& value, int) noexcept(noexcept(static_cast<T&&>(value).operator co_await())) | |||||
-> decltype(static_cast<T&&>(value).operator co_await()) | |||||
{ | |||||
return static_cast<T&&>(value).operator co_await(); | |||||
} | |||||
template<class T> | |||||
auto get_awaiter_impl(T&& value, long) noexcept(noexcept(operator co_await(static_cast<T&&>(value)))) | |||||
-> decltype(operator co_await(static_cast<T&&>(value))) | |||||
{ | |||||
return operator co_await(static_cast<T&&>(value)); | |||||
} | |||||
template<class T, std::enable_if_t<is_awaitor_v<T&&>, int> = 0> | |||||
T&& get_awaiter_impl(T&& value, std::any) noexcept | |||||
{ | |||||
return static_cast<T&&>(value); | |||||
} | |||||
template<class T> | |||||
auto get_awaiter(T&& value) noexcept(noexcept(detail::get_awaiter_impl(static_cast<T&&>(value), 123))) | |||||
-> decltype(detail::get_awaiter_impl(static_cast<T&&>(value), 123)) | |||||
{ | |||||
return detail::get_awaiter_impl(static_cast<T&&>(value), 123); | |||||
} | |||||
} | } |
#pragma once | #pragma once | ||||
RESUMEF_NS | |||||
{ | |||||
using any_t = std::any; | |||||
using std::any_cast; | |||||
} | |||||
//纠结过when_any的返回值,是选用index + std::any,还是选用std::variant<>。最终选择了std::any。 | |||||
//std::variant<>存在第一个元素不能默认构造的问题,需要使用std::monostate来占位,导致下标不是从0开始。 | |||||
//而且,std::variant<>里面存在类型重复的问题,好几个操作都是病态的 | |||||
//最最重要的,要统一ranged when_any的返回值,还得做一个运行时通过下标设置std::variant<>的东西 | |||||
//std::any除了内存布局不太理想,其他方面几乎没缺点(在此应用下) | |||||
RESUMEF_NS | |||||
{ | |||||
namespace detail | |||||
{ | |||||
struct when_impl; | |||||
typedef _awaker<when_impl> when_awaker; | |||||
typedef std::shared_ptr<when_awaker> when_awaker_ptr; | |||||
struct when_impl : public std::enable_shared_from_this<when_impl> | |||||
{ | |||||
private: | |||||
//typedef spinlock lock_type; | |||||
typedef std::recursive_mutex lock_type; | |||||
when_awaker_ptr _awakes; | |||||
intptr_t _counter; | |||||
lock_type _lock; | |||||
public: | |||||
when_impl(intptr_t initial_counter_); | |||||
void signal(); | |||||
//如果已经触发了awaker,则返回true | |||||
bool wait_(const when_awaker_ptr & awaker); | |||||
template<class callee_t, class dummy_t = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, when_awaker_ptr>::value>> | |||||
auto wait(callee_t && awaker, dummy_t * dummy_ = nullptr) | |||||
{ | |||||
(void)dummy_; | |||||
return wait_(std::make_shared<when_awaker>(std::forward<callee_t>(awaker))); | |||||
} | |||||
when_impl(const when_impl &) = delete; | |||||
when_impl(when_impl &&) = delete; | |||||
when_impl & operator = (const when_impl &) = delete; | |||||
when_impl & operator = (when_impl &&) = delete; | |||||
}; | |||||
typedef std::shared_ptr<when_impl> when_impl_ptr; | |||||
using ignore_type = decltype(std::ignore); | |||||
template<class _Ty> | |||||
struct remove_future | |||||
{ | |||||
using type = std::remove_reference_t<_Ty>; | |||||
using value_type = type; | |||||
}; | |||||
template<> | |||||
struct remove_future<void> | |||||
{ | |||||
using type = void; | |||||
using value_type = ignore_type; | |||||
}; | |||||
template<class _Ty> | |||||
struct remove_future<future_t<_Ty> > : public remove_future<_Ty> | |||||
{ | |||||
}; | |||||
template<class _Ty> | |||||
struct remove_future<future_t<_Ty>&> : public remove_future<_Ty> | |||||
{ | |||||
}; | |||||
template<class _Ty> | |||||
struct remove_future<future_t<_Ty>&&> : public remove_future<_Ty> | |||||
{ | |||||
}; | |||||
template<class _Ty> | |||||
using remove_future_vt = typename remove_future<_Ty>::value_type; | |||||
template<class _Fty, class _Ty> | |||||
struct when_all_functor | |||||
{ | |||||
using value_type = _Ty; | |||||
using future_type = _Fty; | |||||
when_impl_ptr _e; | |||||
mutable future_type _f; | |||||
mutable std::reference_wrapper<value_type> _val; | |||||
when_all_functor(const detail::when_impl_ptr & e, future_type f, value_type & v) | |||||
: _e(e) | |||||
, _f(std::move(f)) | |||||
, _val(v) | |||||
{} | |||||
when_all_functor(when_all_functor &&) noexcept = default; | |||||
when_all_functor & operator = (const when_all_functor &) = default; | |||||
when_all_functor & operator = (when_all_functor &&) = default; | |||||
inline future_t<> operator ()() const | |||||
{ | |||||
_val.get() = co_await _f; | |||||
_e->signal(); | |||||
} | |||||
}; | |||||
template<class _Ty> | |||||
struct when_all_functor<future_t<>, _Ty> | |||||
{ | |||||
using value_type = _Ty; | |||||
using future_type = future_t<>; | |||||
when_impl_ptr _e; | |||||
mutable future_type _f; | |||||
mutable std::reference_wrapper<value_type> _val; | |||||
when_all_functor(const detail::when_impl_ptr & e, future_type f, value_type & v) | |||||
: _e(e) | |||||
, _f(std::move(f)) | |||||
, _val(v) | |||||
{} | |||||
when_all_functor(when_all_functor &&) noexcept = default; | |||||
when_all_functor & operator = (const when_all_functor &) = default; | |||||
when_all_functor & operator = (when_all_functor &&) = default; | |||||
inline future_t<> operator ()() const | |||||
{ | |||||
co_await _f; | |||||
_val.get() = std::ignore; | |||||
_e->signal(); | |||||
} | |||||
}; | |||||
template<class _Tup, size_t _Idx> | |||||
inline void when_all_one__(scheduler_t & , const when_impl_ptr & , _Tup & ) | |||||
{ | |||||
} | |||||
template<class _Tup, size_t _Idx, class _Fty, class... _Rest> | |||||
inline void when_all_one__(scheduler_t& s, const when_impl_ptr & e, _Tup & t, _Fty f, _Rest&&... rest) | |||||
{ | |||||
s + when_all_functor<_Fty, std::tuple_element_t<_Idx, _Tup> >{e, std::move(f), std::get<_Idx>(t)}; | |||||
when_all_one__<_Tup, _Idx + 1, _Rest...>(s, e, t, std::forward<_Rest>(rest)...); | |||||
} | |||||
template<class _Val, class _Iter, typename _Fty = decltype(*std::declval<_Iter>())> | |||||
inline void when_all_range__(scheduler_t& s, const when_impl_ptr & e, std::vector<_Val> & t, _Iter begin, _Iter end) | |||||
{ | |||||
using future_type = std::remove_reference_t<_Fty>; | |||||
const auto _First = begin; | |||||
for(; begin != end; ++begin) | |||||
s + when_all_functor<future_type, _Val>{e, *begin, t[begin - _First]}; | |||||
} | |||||
template<class _Tup, class... _Fty> | |||||
future_t<_Tup> when_all_count(size_t count, const std::shared_ptr<_Tup> & vals, scheduler_t & s, _Fty&&... f) | |||||
{ | |||||
awaitable_t<_Tup> awaitable; | |||||
when_impl_ptr _event = std::make_shared<when_impl>(count); | |||||
auto awaker = std::make_shared<when_awaker>( | |||||
[st = awaitable._state, vals](when_impl * e) -> bool | |||||
{ | |||||
if (e) | |||||
st->set_value(*vals); | |||||
else | |||||
st->throw_exception(channel_exception{ error_code::not_ready }); | |||||
return true; | |||||
}); | |||||
_event->wait_(awaker); | |||||
when_all_one__<_Tup, 0u, _Fty...>(s, _event, *vals, std::forward<_Fty>(f)...); | |||||
return awaitable.get_future(); | |||||
} | |||||
template<class _Tup, class _Iter> | |||||
future_t<_Tup> when_all_range(size_t count, const std::shared_ptr<_Tup> & vals, scheduler_t& s, _Iter begin, _Iter end) | |||||
{ | |||||
awaitable_t<_Tup> awaitable; | |||||
when_impl_ptr _event = std::make_shared<when_impl>(count); | |||||
auto awaker = std::make_shared<when_awaker>( | |||||
[st = awaitable._state, vals](when_impl * e) -> bool | |||||
{ | |||||
if (e) | |||||
st->set_value(*vals); | |||||
else | |||||
st->throw_exception(channel_exception{ error_code::not_ready }); | |||||
return true; | |||||
}); | |||||
_event->wait_(awaker); | |||||
when_all_range__(s, _event, *vals, begin, end); | |||||
return awaitable.get_future(); | |||||
} | |||||
using when_any_pair = std::pair<intptr_t, any_t>; | |||||
using when_any_result_ptr = std::shared_ptr<when_any_pair>; | |||||
template<class _Fty> | |||||
struct when_any_functor | |||||
{ | |||||
using value_type = when_any_pair; | |||||
using future_type = _Fty; | |||||
when_impl_ptr _e; | |||||
mutable future_type _f; | |||||
mutable when_any_result_ptr _val; | |||||
intptr_t _Idx; | |||||
when_any_functor(const when_impl_ptr & e, future_type f, const when_any_result_ptr & v, intptr_t idx) | |||||
: _e(e) | |||||
, _f(std::move(f)) | |||||
, _val(v) | |||||
, _Idx(idx) | |||||
{ | |||||
assert(idx >= 0); | |||||
} | |||||
when_any_functor(when_any_functor &&) noexcept = default; | |||||
when_any_functor & operator = (const when_any_functor &) = default; | |||||
when_any_functor & operator = (when_any_functor &&) = default; | |||||
inline future_t<> operator ()() const | |||||
{ | |||||
if (_val->first < 0) | |||||
{ | |||||
auto tval = co_await _f; | |||||
if (_val->first < 0) | |||||
{ | |||||
_val->first = _Idx; | |||||
_val->second = std::move(tval); | |||||
_e->signal(); | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
co_await _f; | |||||
} | |||||
} | |||||
}; | |||||
template<> | |||||
struct when_any_functor<future_t<>> | |||||
{ | |||||
using value_type = when_any_pair; | |||||
using future_type = future_t<>; | |||||
when_impl_ptr _e; | |||||
mutable future_type _f; | |||||
mutable when_any_result_ptr _val; | |||||
intptr_t _Idx; | |||||
when_any_functor(const when_impl_ptr & e, future_type f, const when_any_result_ptr & v, intptr_t idx) | |||||
: _e(e) | |||||
, _f(std::move(f)) | |||||
, _val(v) | |||||
, _Idx(idx) | |||||
{ | |||||
assert(idx >= 0); | |||||
} | |||||
when_any_functor(when_any_functor &&) noexcept = default; | |||||
when_any_functor & operator = (const when_any_functor &) = default; | |||||
when_any_functor & operator = (when_any_functor &&) = default; | |||||
inline future_t<> operator ()() const | |||||
{ | |||||
if (_val->first < 0) | |||||
{ | |||||
co_await _f; | |||||
if (_val->first < 0) | |||||
{ | |||||
_val->first = _Idx; | |||||
_e->signal(); | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
co_await _f; | |||||
} | |||||
} | |||||
}; | |||||
template<intptr_t _Idx> | |||||
inline void when_any_one__(scheduler_t & , const when_impl_ptr & , const when_any_result_ptr & ) | |||||
{ | |||||
} | |||||
template<intptr_t _Idx, class _Fty, class... _Rest> | |||||
inline void when_any_one__(scheduler_t & s, const when_impl_ptr & e, const when_any_result_ptr & t, _Fty f, _Rest&&... rest) | |||||
{ | |||||
s + when_any_functor<_Fty>{e, std::move(f), t, _Idx}; | |||||
when_any_one__<_Idx + 1, _Rest...>(s, e, t, std::forward<_Rest>(rest)...); | |||||
} | |||||
template<class... _Fty> | |||||
future_t<when_any_pair> when_any_count(size_t count, const when_any_result_ptr & val_ptr, scheduler_t & s, _Fty&&... f) | |||||
{ | |||||
awaitable_t<when_any_pair> awaitable; | |||||
when_impl_ptr _event = std::make_shared<when_impl>(count); | |||||
auto awaker = std::make_shared<when_awaker>( | |||||
[st = awaitable._state, val_ptr](when_impl * e) -> bool | |||||
{ | |||||
if (e) | |||||
st->set_value(*val_ptr); | |||||
else | |||||
st->throw_exception(channel_exception{ error_code::not_ready }); | |||||
return true; | |||||
}); | |||||
_event->wait_(awaker); | |||||
when_any_one__<0u, _Fty...>(s, _event, val_ptr, std::forward<_Fty>(f)...); | |||||
return awaitable.get_future(); | |||||
} | |||||
template<class _Iter, typename _Fty = decltype(*std::declval<_Iter>())> | |||||
inline void when_any_range__(scheduler_t & s, const when_impl_ptr & e, const when_any_result_ptr & t, _Iter begin, _Iter end) | |||||
{ | |||||
using future_type = std::remove_reference_t<_Fty>; | |||||
const auto _First = begin; | |||||
for (; begin != end; ++begin) | |||||
s + when_any_functor<future_type>{e, *begin, t, begin - _First}; | |||||
} | |||||
template<class _Iter> | |||||
future_t<when_any_pair> when_any_range(size_t count, const when_any_result_ptr & val_ptr, scheduler_t & s, _Iter begin, _Iter end) | |||||
{ | |||||
awaitable_t<when_any_pair> awaitable; | |||||
when_impl_ptr _event = std::make_shared<when_impl>(count); | |||||
auto awaker = std::make_shared<when_awaker>( | |||||
[st = awaitable._state, val_ptr](when_impl * e) -> bool | |||||
{ | |||||
if (e) | |||||
st->set_value(*val_ptr); | |||||
else | |||||
st->throw_exception(channel_exception{ error_code::not_ready }); | |||||
return true; | |||||
}); | |||||
_event->wait_(awaker); | |||||
when_any_range__(s, _event, val_ptr, begin, end); | |||||
return awaitable.get_future(); | |||||
} | |||||
} | |||||
template<class... _Fty> | |||||
auto when_all(scheduler_t & s, _Fty&&... f) -> future_t<std::tuple<detail::remove_future_vt<_Fty>...> > | |||||
{ | |||||
using tuple_type = std::tuple<detail::remove_future_vt<_Fty>...>; | |||||
auto vals = std::make_shared<tuple_type>(); | |||||
return detail::when_all_count(sizeof...(_Fty), vals, s, std::forward<_Fty>(f)...); | |||||
} | |||||
template<class... _Fty> | |||||
auto when_all(_Fty&&... f) -> future_t<std::tuple<detail::remove_future_vt<_Fty>...> > | |||||
{ | |||||
return when_all(*this_scheduler(), std::forward<_Fty>(f)...); | |||||
} | |||||
template<class _Iter, typename _Fty = decltype(*std::declval<_Iter>())> | |||||
auto when_all(scheduler_t & s, _Iter begin, _Iter end) -> future_t<std::vector<detail::remove_future_vt<_Fty> > > | |||||
{ | |||||
using value_type = detail::remove_future_vt<_Fty>; | |||||
using vector_type = std::vector<value_type>; | |||||
auto vals = std::make_shared<vector_type>(std::distance(begin, end)); | |||||
return detail::when_all_range(vals->size(), vals, s, begin, end); | |||||
} | |||||
template<class _Iter, typename _Fty = decltype(*std::declval<_Iter>())> | |||||
auto when_all(_Iter begin, _Iter end) -> future_t<std::vector<detail::remove_future_vt<_Fty> > > | |||||
{ | |||||
return when_all(*this_scheduler(), begin, end); | |||||
} | |||||
template<class... _Fty> | |||||
auto when_any(scheduler_t & s, _Fty&&... f) -> future_t<detail::when_any_pair> | |||||
{ | |||||
auto vals = std::make_shared<detail::when_any_pair>(-1, any_t{}); | |||||
return detail::when_any_count(sizeof...(_Fty) ? 1 : 0, vals, s, std::forward<_Fty>(f)...); | |||||
} | |||||
template<class... _Fty> | |||||
auto when_any(_Fty&&... f) -> future_t<detail::when_any_pair> | |||||
{ | |||||
return when_any(*this_scheduler(), std::forward<_Fty>(f)...); | |||||
} | |||||
template<class _Iter, typename _Fty = decltype(*std::declval<_Iter>())> | |||||
auto when_any(scheduler_t & s, _Iter begin, _Iter end) -> future_t<detail::when_any_pair> | |||||
{ | |||||
auto vals = std::make_shared<detail::when_any_pair>(-1, any_t{}); | |||||
return detail::when_any_range((begin != end) ? 1 : 0, vals, s, begin, end); | |||||
} | |||||
template<class _Iter, typename _Fty = decltype(*std::declval<_Iter>())> | |||||
auto when_any(_Iter begin, _Iter end) -> future_t<detail::when_any_pair> | |||||
{ | |||||
return when_any(*this_scheduler(), begin, end); | |||||
} | |||||
} | |||||
//#include "when_v1.h" | |||||
#include "when_v2.h" |
#include "../librf.h" | |||||
RESUMEF_NS | |||||
{ | |||||
namespace detail | |||||
{ | |||||
when_impl::when_impl(intptr_t initial_counter_) | |||||
: _counter(initial_counter_) | |||||
{ | |||||
} | |||||
void when_impl::signal() | |||||
{ | |||||
scoped_lock<lock_type> lock_(this->_lock); | |||||
if (--this->_counter == 0) | |||||
{ | |||||
_awakes->awake(this, 1); | |||||
} | |||||
} | |||||
bool when_impl::wait_(const when_awaker_ptr & awaker) | |||||
{ | |||||
assert(awaker); | |||||
scoped_lock<lock_type> lock_(this->_lock); | |||||
if (this->_counter == 0) | |||||
{ | |||||
awaker->awake(this, 1); | |||||
return true; | |||||
} | |||||
else | |||||
{ | |||||
this->_awakes = awaker; | |||||
return false; | |||||
} | |||||
} | |||||
} | |||||
} | |||||
#include "../librf.h" | |||||
RESUMEF_NS | |||||
{ | |||||
namespace detail | |||||
{ | |||||
when_impl::when_impl(intptr_t initial_counter_) | |||||
: _counter(initial_counter_) | |||||
{ | |||||
} | |||||
void when_impl::signal() | |||||
{ | |||||
scoped_lock<lock_type> lock_(this->_lock); | |||||
if (--this->_counter == 0) | |||||
{ | |||||
_awakes->awake(this, 1); | |||||
} | |||||
} | |||||
bool when_impl::wait_(const when_awaker_ptr & awaker) | |||||
{ | |||||
assert(awaker); | |||||
scoped_lock<lock_type> lock_(this->_lock); | |||||
if (this->_counter == 0) | |||||
{ | |||||
awaker->awake(this, 1); | |||||
return true; | |||||
} | |||||
else | |||||
{ | |||||
this->_awakes = awaker; | |||||
return false; | |||||
} | |||||
} | |||||
} | |||||
} |
#pragma once | |||||
RESUMEF_NS | |||||
{ | |||||
using any_t = std::any; | |||||
using std::any_cast; | |||||
} | |||||
//纠结过when_any的返回值,是选用index + std::any,还是选用std::variant<>。最终选择了std::any。 | |||||
//std::variant<>存在第一个元素不能默认构造的问题,需要使用std::monostate来占位,导致下标不是从0开始。 | |||||
//而且,std::variant<>里面存在类型重复的问题,好几个操作都是病态的 | |||||
//最最重要的,要统一ranged when_any的返回值,还得做一个运行时通过下标设置std::variant<>的东西 | |||||
//std::any除了内存布局不太理想,其他方面几乎没缺点(在此应用下) | |||||
RESUMEF_NS | |||||
{ | |||||
namespace detail | |||||
{ | |||||
struct when_impl; | |||||
typedef _awaker<when_impl> when_awaker; | |||||
typedef std::shared_ptr<when_awaker> when_awaker_ptr; | |||||
struct when_impl : public std::enable_shared_from_this<when_impl> | |||||
{ | |||||
private: | |||||
//typedef spinlock lock_type; | |||||
typedef std::recursive_mutex lock_type; | |||||
when_awaker_ptr _awakes; | |||||
intptr_t _counter; | |||||
lock_type _lock; | |||||
public: | |||||
when_impl(intptr_t initial_counter_); | |||||
void signal(); | |||||
//如果已经触发了awaker,则返回true | |||||
bool wait_(const when_awaker_ptr & awaker); | |||||
template<class callee_t, class dummy_t = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, when_awaker_ptr>::value>> | |||||
auto wait(callee_t && awaker, dummy_t * dummy_ = nullptr) | |||||
{ | |||||
(void)dummy_; | |||||
return wait_(std::make_shared<when_awaker>(std::forward<callee_t>(awaker))); | |||||
} | |||||
when_impl(const when_impl &) = delete; | |||||
when_impl(when_impl &&) = delete; | |||||
when_impl & operator = (const when_impl &) = delete; | |||||
when_impl & operator = (when_impl &&) = delete; | |||||
}; | |||||
typedef std::shared_ptr<when_impl> when_impl_ptr; | |||||
template<class _Fty, class _Ty> | |||||
struct when_all_functor | |||||
{ | |||||
using value_type = _Ty; | |||||
using future_type = _Fty; | |||||
when_impl_ptr _e; | |||||
mutable future_type _f; | |||||
mutable std::reference_wrapper<value_type> _val; | |||||
when_all_functor(const detail::when_impl_ptr & e, future_type f, value_type & v) | |||||
: _e(e) | |||||
, _f(std::move(f)) | |||||
, _val(v) | |||||
{} | |||||
when_all_functor(when_all_functor &&) noexcept = default; | |||||
when_all_functor & operator = (const when_all_functor &) = default; | |||||
when_all_functor & operator = (when_all_functor &&) = default; | |||||
inline future_t<> operator ()() const | |||||
{ | |||||
_val.get() = co_await _f; | |||||
_e->signal(); | |||||
} | |||||
}; | |||||
template<class _Ty> | |||||
struct when_all_functor<future_t<>, _Ty> | |||||
{ | |||||
using value_type = _Ty; | |||||
using future_type = future_t<>; | |||||
when_impl_ptr _e; | |||||
mutable future_type _f; | |||||
mutable std::reference_wrapper<value_type> _val; | |||||
when_all_functor(const detail::when_impl_ptr & e, future_type f, value_type & v) | |||||
: _e(e) | |||||
, _f(std::move(f)) | |||||
, _val(v) | |||||
{} | |||||
when_all_functor(when_all_functor &&) noexcept = default; | |||||
when_all_functor & operator = (const when_all_functor &) = default; | |||||
when_all_functor & operator = (when_all_functor &&) = default; | |||||
inline future_t<> operator ()() const | |||||
{ | |||||
co_await _f; | |||||
_val.get() = std::ignore; | |||||
_e->signal(); | |||||
} | |||||
}; | |||||
template<class _Tup, size_t _Idx> | |||||
inline void when_all_one__(scheduler_t & , const when_impl_ptr & , _Tup & ) | |||||
{ | |||||
} | |||||
template<class _Tup, size_t _Idx, class _Fty, class... _Rest> | |||||
inline void when_all_one__(scheduler_t& s, const when_impl_ptr & e, _Tup & t, _Fty f, _Rest&&... rest) | |||||
{ | |||||
s + when_all_functor<_Fty, std::tuple_element_t<_Idx, _Tup> >{e, std::move(f), std::get<_Idx>(t)}; | |||||
when_all_one__<_Tup, _Idx + 1, _Rest...>(s, e, t, std::forward<_Rest>(rest)...); | |||||
} | |||||
template<class _Val, class _Iter, typename _Fty = decltype(*std::declval<_Iter>())> | |||||
inline void when_all_range__(scheduler_t& s, const when_impl_ptr & e, std::vector<_Val> & t, _Iter begin, _Iter end) | |||||
{ | |||||
using future_type = std::remove_reference_t<_Fty>; | |||||
const auto _First = begin; | |||||
for(; begin != end; ++begin) | |||||
s + when_all_functor<future_type, _Val>{e, *begin, t[begin - _First]}; | |||||
} | |||||
template<class _Tup, class... _Fty> | |||||
future_t<_Tup> when_all_count(size_t count, const std::shared_ptr<_Tup> & vals, scheduler_t & s, _Fty&&... f) | |||||
{ | |||||
awaitable_t<_Tup> awaitable; | |||||
when_impl_ptr _event = std::make_shared<when_impl>(count); | |||||
auto awaker = std::make_shared<when_awaker>( | |||||
[st = awaitable._state, vals](when_impl * e) -> bool | |||||
{ | |||||
if (e) | |||||
st->set_value(*vals); | |||||
else | |||||
st->throw_exception(channel_exception{ error_code::not_ready }); | |||||
return true; | |||||
}); | |||||
_event->wait_(awaker); | |||||
when_all_one__<_Tup, 0u, _Fty...>(s, _event, *vals, std::forward<_Fty>(f)...); | |||||
return awaitable.get_future(); | |||||
} | |||||
template<class _Tup, class _Iter> | |||||
future_t<_Tup> when_all_range(size_t count, const std::shared_ptr<_Tup> & vals, scheduler_t& s, _Iter begin, _Iter end) | |||||
{ | |||||
awaitable_t<_Tup> awaitable; | |||||
when_impl_ptr _event = std::make_shared<when_impl>(count); | |||||
auto awaker = std::make_shared<when_awaker>( | |||||
[st = awaitable._state, vals](when_impl * e) -> bool | |||||
{ | |||||
if (e) | |||||
st->set_value(*vals); | |||||
else | |||||
st->throw_exception(channel_exception{ error_code::not_ready }); | |||||
return true; | |||||
}); | |||||
_event->wait_(awaker); | |||||
when_all_range__(s, _event, *vals, begin, end); | |||||
return awaitable.get_future(); | |||||
} | |||||
template<class _Fty> | |||||
struct when_any_functor | |||||
{ | |||||
using value_type = when_any_pair; | |||||
using future_type = _Fty; | |||||
when_impl_ptr _e; | |||||
mutable future_type _f; | |||||
mutable when_any_result_ptr _val; | |||||
intptr_t _Idx; | |||||
when_any_functor(const when_impl_ptr & e, future_type f, const when_any_result_ptr & v, intptr_t idx) | |||||
: _e(e) | |||||
, _f(std::move(f)) | |||||
, _val(v) | |||||
, _Idx(idx) | |||||
{ | |||||
assert(idx >= 0); | |||||
} | |||||
when_any_functor(when_any_functor &&) noexcept = default; | |||||
when_any_functor & operator = (const when_any_functor &) = default; | |||||
when_any_functor & operator = (when_any_functor &&) = default; | |||||
inline future_t<> operator ()() const | |||||
{ | |||||
if (_val->first < 0) | |||||
{ | |||||
auto tval = co_await _f; | |||||
if (_val->first < 0) | |||||
{ | |||||
_val->first = _Idx; | |||||
_val->second = std::move(tval); | |||||
_e->signal(); | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
co_await _f; | |||||
} | |||||
} | |||||
}; | |||||
template<> | |||||
struct when_any_functor<future_t<>> | |||||
{ | |||||
using value_type = when_any_pair; | |||||
using future_type = future_t<>; | |||||
when_impl_ptr _e; | |||||
mutable future_type _f; | |||||
mutable when_any_result_ptr _val; | |||||
intptr_t _Idx; | |||||
when_any_functor(const when_impl_ptr & e, future_type f, const when_any_result_ptr & v, intptr_t idx) | |||||
: _e(e) | |||||
, _f(std::move(f)) | |||||
, _val(v) | |||||
, _Idx(idx) | |||||
{ | |||||
assert(idx >= 0); | |||||
} | |||||
when_any_functor(when_any_functor &&) noexcept = default; | |||||
when_any_functor & operator = (const when_any_functor &) = default; | |||||
when_any_functor & operator = (when_any_functor &&) = default; | |||||
inline future_t<> operator ()() const | |||||
{ | |||||
if (_val->first < 0) | |||||
{ | |||||
co_await _f; | |||||
if (_val->first < 0) | |||||
{ | |||||
_val->first = _Idx; | |||||
_e->signal(); | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
co_await _f; | |||||
} | |||||
} | |||||
}; | |||||
template<intptr_t _Idx> | |||||
inline void when_any_one__(scheduler_t & , const when_impl_ptr & , const when_any_result_ptr & ) | |||||
{ | |||||
} | |||||
template<intptr_t _Idx, class _Fty, class... _Rest> | |||||
inline void when_any_one__(scheduler_t & s, const when_impl_ptr & e, const when_any_result_ptr & t, _Fty f, _Rest&&... rest) | |||||
{ | |||||
s + when_any_functor<_Fty>{e, std::move(f), t, _Idx}; | |||||
when_any_one__<_Idx + 1, _Rest...>(s, e, t, std::forward<_Rest>(rest)...); | |||||
} | |||||
template<class... _Fty> | |||||
future_t<when_any_pair> when_any_count(size_t count, const when_any_result_ptr & val_ptr, scheduler_t & s, _Fty&&... f) | |||||
{ | |||||
awaitable_t<when_any_pair> awaitable; | |||||
when_impl_ptr _event = std::make_shared<when_impl>(count); | |||||
auto awaker = std::make_shared<when_awaker>( | |||||
[st = awaitable._state, val_ptr](when_impl * e) -> bool | |||||
{ | |||||
if (e) | |||||
st->set_value(*val_ptr); | |||||
else | |||||
st->throw_exception(channel_exception{ error_code::not_ready }); | |||||
return true; | |||||
}); | |||||
_event->wait_(awaker); | |||||
when_any_one__<0u, _Fty...>(s, _event, val_ptr, std::forward<_Fty>(f)...); | |||||
return awaitable.get_future(); | |||||
} | |||||
template<class _Iter, typename _Fty = decltype(*std::declval<_Iter>())> | |||||
inline void when_any_range__(scheduler_t & s, const when_impl_ptr & e, const when_any_result_ptr & t, _Iter begin, _Iter end) | |||||
{ | |||||
using future_type = std::remove_reference_t<_Fty>; | |||||
const auto _First = begin; | |||||
for (; begin != end; ++begin) | |||||
s + when_any_functor<future_type>{e, *begin, t, begin - _First}; | |||||
} | |||||
template<class _Iter> | |||||
future_t<when_any_pair> when_any_range(size_t count, const when_any_result_ptr & val_ptr, scheduler_t & s, _Iter begin, _Iter end) | |||||
{ | |||||
awaitable_t<when_any_pair> awaitable; | |||||
when_impl_ptr _event = std::make_shared<when_impl>(count); | |||||
auto awaker = std::make_shared<when_awaker>( | |||||
[st = awaitable._state, val_ptr](when_impl * e) -> bool | |||||
{ | |||||
if (e) | |||||
st->set_value(*val_ptr); | |||||
else | |||||
st->throw_exception(channel_exception{ error_code::not_ready }); | |||||
return true; | |||||
}); | |||||
_event->wait_(awaker); | |||||
when_any_range__(s, _event, val_ptr, begin, end); | |||||
return awaitable.get_future(); | |||||
} | |||||
} | |||||
namespace when_v1 | |||||
{ | |||||
template<class... _Fty> | |||||
auto when_all(scheduler_t& s, _Fty&&... f) -> future_t<std::tuple<detail::remove_future_vt<_Fty>...> > | |||||
{ | |||||
using tuple_type = std::tuple<detail::remove_future_vt<_Fty>...>; | |||||
auto vals = std::make_shared<tuple_type>(); | |||||
return detail::when_all_count(sizeof...(_Fty), vals, s, std::forward<_Fty>(f)...); | |||||
} | |||||
template<class _Iter, typename _Fty = decltype(*std::declval<_Iter>())> | |||||
auto when_all(scheduler_t& s, _Iter begin, _Iter end) -> future_t<std::vector<detail::remove_future_vt<_Fty> > > | |||||
{ | |||||
using value_type = detail::remove_future_vt<_Fty>; | |||||
using vector_type = std::vector<value_type>; | |||||
auto vals = std::make_shared<vector_type>(std::distance(begin, end)); | |||||
return detail::when_all_range(vals->size(), vals, s, begin, end); | |||||
} | |||||
template<class... _Fty> | |||||
auto when_all(_Fty&&... f) -> future_t<std::tuple<detail::remove_future_vt<_Fty>...> > | |||||
{ | |||||
co_return co_await when_all(*current_scheduler(), std::forward<_Fty>(f)...); | |||||
} | |||||
template<class _Iter, typename _Fty = decltype(*std::declval<_Iter>())> | |||||
auto when_all(_Iter begin, _Iter end) -> future_t<std::vector<detail::remove_future_vt<_Fty> > > | |||||
{ | |||||
co_return co_await when_all(*current_scheduler(), begin, end); | |||||
} | |||||
template<class... _Fty> | |||||
auto when_any(scheduler_t& s, _Fty&&... f) -> future_t<detail::when_any_pair> | |||||
{ | |||||
auto vals = std::make_shared<detail::when_any_pair>(-1, any_t{}); | |||||
return detail::when_any_count(sizeof...(_Fty) ? 1 : 0, vals, s, std::forward<_Fty>(f)...); | |||||
} | |||||
template<class _Iter, typename _Fty = decltype(*std::declval<_Iter>())> | |||||
auto when_any(scheduler_t& s, _Iter begin, _Iter end) -> future_t<detail::when_any_pair> | |||||
{ | |||||
auto vals = std::make_shared<detail::when_any_pair>(-1, any_t{}); | |||||
return detail::when_any_range((begin != end) ? 1 : 0, vals, s, begin, end); | |||||
} | |||||
template<class... _Fty> | |||||
auto when_any(_Fty&&... f) -> future_t<detail::when_any_pair> | |||||
{ | |||||
co_return co_await when_any(*current_scheduler(), std::forward<_Fty>(f)...); | |||||
} | |||||
template<class _Iter, typename _Fty = decltype(*std::declval<_Iter>())> | |||||
auto when_any(_Iter begin, _Iter end) -> future_t<detail::when_any_pair> | |||||
{ | |||||
co_return co_await when_any(*current_scheduler(), begin, end); | |||||
} | |||||
} | |||||
} |
#include "../librf.h" | |||||
RESUMEF_NS | |||||
{ | |||||
namespace detail | |||||
{ | |||||
state_when_t::state_when_t(intptr_t counter_) | |||||
:_counter(counter_) | |||||
{ | |||||
} | |||||
void state_when_t::resume() | |||||
{ | |||||
coroutine_handle<> handler = _coro; | |||||
if (handler) | |||||
{ | |||||
_coro = nullptr; | |||||
_scheduler->del_final(this); | |||||
handler.resume(); | |||||
} | |||||
} | |||||
bool state_when_t::has_handler() const noexcept | |||||
{ | |||||
return (bool)_coro; | |||||
} | |||||
void state_when_t::on_cancel() noexcept | |||||
{ | |||||
scoped_lock<lock_type> lock_(_lock); | |||||
_counter.store(0); | |||||
this->_coro = nullptr; | |||||
} | |||||
bool state_when_t::on_notify_one() | |||||
{ | |||||
scoped_lock<lock_type> lock_(_lock); | |||||
if (_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) | |||||
{ | |||||
assert(this->_scheduler != nullptr); | |||||
if (this->_coro) | |||||
this->_scheduler->add_generator(this); | |||||
return true; | |||||
} | |||||
return false; | |||||
} | |||||
bool state_when_t::on_timeout() | |||||
{ | |||||
scoped_lock<lock_type> lock_(_lock); | |||||
return false; | |||||
} | |||||
} | |||||
} |
#pragma once | |||||
RESUMEF_NS | |||||
{ | |||||
using any_t = std::any; | |||||
using std::any_cast; | |||||
} | |||||
//纠结过when_any的返回值,是选用index + std::any,还是选用std::variant<>。最终选择了std::any。 | |||||
//std::variant<>存在第一个元素不能默认构造的问题,需要使用std::monostate来占位,导致下标不是从0开始。 | |||||
//而且,std::variant<>里面存在类型重复的问题,好几个操作都是病态的 | |||||
//最最重要的,要统一ranged when_any的返回值,还得做一个运行时通过下标设置std::variant<>的东西 | |||||
//std::any除了内存布局不太理想,其他方面几乎没缺点(在此应用下) | |||||
RESUMEF_NS | |||||
{ | |||||
namespace detail | |||||
{ | |||||
struct state_when_t : public state_base_t | |||||
{ | |||||
state_when_t(intptr_t counter_); | |||||
virtual void resume() override; | |||||
virtual bool has_handler() const noexcept override; | |||||
void on_cancel() noexcept; | |||||
bool on_notify_one(); | |||||
bool on_timeout(); | |||||
//将自己加入到通知链表里 | |||||
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>> | |||||
scheduler_t* on_await_suspend(coroutine_handle<_PromiseT> handler) noexcept | |||||
{ | |||||
_PromiseT& promise = handler.promise(); | |||||
auto* parent_state = promise.get_state(); | |||||
scheduler_t* sch = parent_state->get_scheduler(); | |||||
this->_scheduler = sch; | |||||
this->_coro = handler; | |||||
return sch; | |||||
} | |||||
typedef spinlock lock_type; | |||||
lock_type _lock; | |||||
std::atomic<intptr_t> _counter; | |||||
}; | |||||
template<class _Ty> | |||||
struct [[nodiscard]] when_future_t | |||||
{ | |||||
using value_type = _Ty; | |||||
using state_type = detail::state_when_t; | |||||
using promise_type = promise_t<value_type>; | |||||
using future_type = when_future_t<value_type>; | |||||
using lock_type = typename state_type::lock_type; | |||||
counted_ptr<state_type> _state; | |||||
std::shared_ptr<value_type> _values; | |||||
when_future_t(intptr_t count_) noexcept | |||||
: _state(new state_type(count_)) | |||||
, _values(std::make_shared<value_type>()) | |||||
{ | |||||
} | |||||
bool await_ready() noexcept | |||||
{ | |||||
return _state->_counter.load(std::memory_order_relaxed) == 0; | |||||
} | |||||
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>> | |||||
void await_suspend(coroutine_handle<_PromiseT> handler) | |||||
{ | |||||
_state->on_await_suspend(handler); | |||||
} | |||||
value_type await_resume() noexcept(std::is_nothrow_move_constructible_v<value_type>) | |||||
{ | |||||
return std::move(*_values); | |||||
} | |||||
}; | |||||
using ignore_type = std::remove_const_t<decltype(std::ignore)>; | |||||
template<class _Ty> | |||||
struct remove_future | |||||
{ | |||||
using type = std::remove_reference_t<_Ty>; | |||||
using value_type = type; | |||||
}; | |||||
template<> | |||||
struct remove_future<void> | |||||
{ | |||||
using type = void; | |||||
using value_type = ignore_type; | |||||
}; | |||||
template<class _Ty> | |||||
struct remove_future<future_t<_Ty> > : public remove_future<_Ty>{}; | |||||
template<class _Ty> | |||||
struct remove_future<future_t<_Ty>&> : public remove_future<_Ty>{}; | |||||
template<class _Ty> | |||||
struct remove_future<future_t<_Ty>&&> : public remove_future<_Ty>{}; | |||||
template<class _Ty> | |||||
using remove_future_t = typename remove_future<_Ty>::value_type; | |||||
template<class _Awaitable, class _Ty> | |||||
future_t<> when_all_connector(state_when_t* state, _Awaitable awaitor, _Ty& value) | |||||
{ | |||||
static_assert(is_awaitor_v<_Awaitable>); | |||||
if constexpr(std::is_same_v<_Ty, ignore_type>) | |||||
co_await awaitor; | |||||
else | |||||
value = co_await awaitor; | |||||
state->on_notify_one(); | |||||
}; | |||||
template<class _Tup, size_t _Idx> | |||||
inline void when_all_one__(scheduler_t& , state_when_t*, _Tup& ) | |||||
{ | |||||
} | |||||
template<class _Tup, size_t _Idx, class _Awaitable, class... _Rest> | |||||
inline void when_all_one__(scheduler_t& sch, state_when_t* state, _Tup& values, _Awaitable&& awaitable, _Rest&&... rest) | |||||
{ | |||||
sch + when_all_connector(state, std::forward<_Awaitable>(awaitable), std::get<_Idx>(values)); | |||||
when_all_one__<_Tup, _Idx + 1, _Rest...>(sch, state, values, std::forward<_Rest>(rest)...); | |||||
} | |||||
template<class _Val, class _Iter, typename _Awaitable = decltype(*std::declval<_Iter>())> | |||||
inline void when_all_range__(scheduler_t& sch, state_when_t* state, std::vector<_Val> & values, _Iter begin, _Iter end) | |||||
{ | |||||
const auto _First = begin; | |||||
intptr_t _Idx = 0; | |||||
for (; begin != end; ++begin, ++_Idx) | |||||
{ | |||||
sch + when_all_connector(state, std::move(*begin), values[_Idx]); | |||||
} | |||||
} | |||||
//----------------------------------------------------------------------------------------------------------------------------------------- | |||||
using when_any_pair = std::pair<intptr_t, any_t>; | |||||
using when_any_pair_ptr = std::shared_ptr<when_any_pair>; | |||||
template<class _Awaitable> | |||||
future_t<> when_any_connector(counted_ptr<state_when_t> state, _Awaitable awaitor, when_any_pair_ptr value, intptr_t idx) | |||||
{ | |||||
assert(idx >= 0); | |||||
static_assert(is_awaitor_v<_Awaitable>); | |||||
using value_type = remove_future_t<_Awaitable>; | |||||
if constexpr (std::is_same_v<value_type, ignore_type>) | |||||
{ | |||||
co_await awaitor; | |||||
intptr_t oldValue = -1; | |||||
if (reinterpret_cast<std::atomic<intptr_t>&>(value->first).compare_exchange_strong(oldValue, idx)) | |||||
{ | |||||
state->on_notify_one(); | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
decltype(auto) result = co_await awaitor; | |||||
intptr_t oldValue = -1; | |||||
if (reinterpret_cast<std::atomic<intptr_t>&>(value->first).compare_exchange_strong(oldValue, idx)) | |||||
{ | |||||
value->second = std::move(result); | |||||
state->on_notify_one(); | |||||
} | |||||
} | |||||
}; | |||||
inline void when_any_one__(scheduler_t&, state_when_t*, when_any_pair_ptr, intptr_t) | |||||
{ | |||||
} | |||||
template<class _Awaitable, class... _Rest> | |||||
inline void when_any_one__(scheduler_t& sch, state_when_t* state, when_any_pair_ptr value, intptr_t _Idx, _Awaitable&& awaitable, _Rest&&... rest) | |||||
{ | |||||
sch + when_any_connector(state, awaitable, value, _Idx); | |||||
when_any_one__(sch, state, value, _Idx + 1, std::forward<_Rest>(rest)...); | |||||
} | |||||
template<class _Iter> | |||||
inline void when_any_range__(scheduler_t& sch, state_when_t* state, when_any_pair_ptr value, _Iter begin, _Iter end) | |||||
{ | |||||
const auto _First = begin; | |||||
intptr_t _Idx = 0; | |||||
for (; begin != end; ++begin, ++_Idx) | |||||
{ | |||||
sch + when_any_connector(state, *begin, value, static_cast<intptr_t>(_Idx)); | |||||
} | |||||
} | |||||
} | |||||
inline namespace when_v2 | |||||
{ | |||||
template<class... _Awaitable, | |||||
class = std::enable_if_t<std::conjunction_v<is_awaitor<_Awaitable>...>> | |||||
> | |||||
auto when_all(scheduler_t& sch, _Awaitable&&... args) | |||||
-> detail::when_future_t<std::tuple<detail::remove_future_t<_Awaitable>...> > | |||||
{ | |||||
using tuple_type = std::tuple<detail::remove_future_t<_Awaitable>...>; | |||||
detail::when_future_t<tuple_type> awaitor{ sizeof...(_Awaitable) }; | |||||
detail::when_all_one__<tuple_type, 0u, _Awaitable...>(sch, awaitor._state.get(), *awaitor._values, std::forward<_Awaitable>(args)...); | |||||
return awaitor; | |||||
} | |||||
template<class _Iter, | |||||
class _Awaitable = decltype(*std::declval<_Iter>()), | |||||
class = std::enable_if_t<is_awaitor_v<_Awaitable>> | |||||
> | |||||
auto when_all(scheduler_t& sch, _Iter begin, _Iter end) | |||||
-> detail::when_future_t<std::vector<detail::remove_future_t<_Awaitable> > > | |||||
{ | |||||
using value_type = detail::remove_future_t<_Awaitable>; | |||||
using vector_type = std::vector<value_type>; | |||||
detail::when_future_t<vector_type> awaitor{ std::distance(begin, end) }; | |||||
awaitor._values->resize(end - begin); | |||||
when_all_range__(sch, awaitor._state.get(), *awaitor._values, begin, end); | |||||
return awaitor; | |||||
} | |||||
template<class... _Awaitable, | |||||
class = std::enable_if_t<std::conjunction_v<is_awaitor<_Awaitable>...>> | |||||
> | |||||
auto when_all(_Awaitable&&... awaitor) | |||||
-> future_t<std::tuple<detail::remove_future_t<_Awaitable>...>> | |||||
{ | |||||
co_return co_await when_all(*current_scheduler(), std::forward<_Awaitable>(awaitor)...); | |||||
} | |||||
template<class _Iter, | |||||
class _Awaitable = decltype(*std::declval<_Iter>()), | |||||
class = std::enable_if_t<is_awaitor_v<_Awaitable>> | |||||
> | |||||
auto when_all(_Iter begin, _Iter end) | |||||
-> future_t<std::vector<detail::remove_future_t<_Awaitable>>> | |||||
{ | |||||
co_return co_await when_all(*current_scheduler(), begin, end); | |||||
} | |||||
template<class... _Awaitable, | |||||
class = std::enable_if_t<std::conjunction_v<is_awaitor<_Awaitable>...>> | |||||
> | |||||
auto when_any(scheduler_t& sch, _Awaitable&&... args) | |||||
-> detail::when_future_t<detail::when_any_pair> | |||||
{ | |||||
detail::when_future_t<detail::when_any_pair> awaitor{ sizeof...(_Awaitable) > 0 ? 1 : 0 }; | |||||
awaitor._values->first = -1; | |||||
detail::when_any_one__(sch, awaitor._state.get(), awaitor._values, 0, std::forward<_Awaitable>(args)...); | |||||
return awaitor; | |||||
} | |||||
template<class _Iter, | |||||
typename _Awaitable = decltype(*std::declval<_Iter>()), | |||||
class = std::enable_if_t<is_awaitor_v<_Awaitable>> | |||||
> | |||||
auto when_any(scheduler_t& sch, _Iter begin, _Iter end) | |||||
-> detail::when_future_t<detail::when_any_pair> | |||||
{ | |||||
detail::when_future_t<detail::when_any_pair> awaitor{ begin == end ? 0 : 1 }; | |||||
awaitor._values->first = -1; | |||||
detail::when_any_range__<_Iter>(sch, awaitor._state.get(), awaitor._values, begin, end); | |||||
return awaitor; | |||||
} | |||||
template<class... _Awaitable, | |||||
class = std::enable_if_t<std::conjunction_v<is_awaitor<_Awaitable>...>> | |||||
> | |||||
auto when_any(_Awaitable&&... awaitor) | |||||
-> future_t<detail::when_any_pair> | |||||
{ | |||||
co_return co_await when_any(*current_scheduler(), std::forward<_Awaitable>(awaitor)...); | |||||
} | |||||
template<class _Iter, | |||||
typename _Awaitable = decltype(*std::declval<_Iter>()), | |||||
class = std::enable_if_t<is_awaitor_v<_Awaitable>> | |||||
> | |||||
auto when_any(_Iter begin, _Iter end) | |||||
-> future_t<detail::when_any_pair> | |||||
{ | |||||
co_return co_await when_any(*current_scheduler(), begin, end); | |||||
} | |||||
} | |||||
} |
using namespace resumef; | using namespace resumef; | ||||
template<class... _Fty> | |||||
auto when_all2(_Fty&&... f) -> future_t<std::tuple<detail::remove_future_vt<_Fty>...>> | |||||
{ | |||||
using tuple_type = std::tuple<detail::remove_future_vt<_Fty>...>; | |||||
co_return co_await when_all(*current_scheduler(), std::forward<_Fty>(f)...); | |||||
} | |||||
void test_when_any() | void test_when_any() | ||||
{ | { | ||||
using namespace std::chrono; | using namespace std::chrono; | ||||
co_await when_all(); | co_await when_all(); | ||||
std::cout << "when all: zero!" << std::endl << std::endl; | std::cout << "when all: zero!" << std::endl << std::endl; | ||||
auto ab = co_await when_all2(my_sleep("a"), my_sleep_v("b")); | |||||
auto ab = co_await when_all(my_sleep("a"), my_sleep_v("b")); | |||||
//ab.1 is std::ignore | //ab.1 is std::ignore | ||||
std::cout << "when all:" << std::get<0>(ab) << std::endl << std::endl; | std::cout << "when all:" << std::get<0>(ab) << std::endl << std::endl; | ||||
//test_ring_queue<resumef::ring_queue_spinlock<int, false, uint32_t>>(); | //test_ring_queue<resumef::ring_queue_spinlock<int, false, uint32_t>>(); | ||||
//test_ring_queue<resumef::ring_queue_lockfree<int, uint64_t>>(); | //test_ring_queue<resumef::ring_queue_lockfree<int, uint64_t>>(); | ||||
resumable_main_switch_scheduler(); | |||||
//resumable_main_when_all(); | |||||
//resumable_main_switch_scheduler(); | |||||
resumable_main_when_all(); | |||||
//resumable_main_event_v2(); | //resumable_main_event_v2(); | ||||
return 0; | return 0; | ||||
<ItemGroup> | <ItemGroup> | ||||
<ClCompile Include="..\benchmark\benchmark_asio_echo.cpp"> | <ClCompile Include="..\benchmark\benchmark_asio_echo.cpp"> | ||||
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild> | <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild> | ||||
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild> | |||||
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild> | |||||
</ClCompile> | </ClCompile> | ||||
<ClCompile Include="..\benchmark\benchmark_async_mem.cpp" /> | <ClCompile Include="..\benchmark\benchmark_async_mem.cpp" /> | ||||
<ClCompile Include="..\benchmark\benchmark_channel_passing_next.cpp" /> | <ClCompile Include="..\benchmark\benchmark_channel_passing_next.cpp" /> | ||||
<ClCompile Include="..\librf\src\sleep.cpp" /> | <ClCompile Include="..\librf\src\sleep.cpp" /> | ||||
<ClCompile Include="..\librf\src\state.cpp" /> | <ClCompile Include="..\librf\src\state.cpp" /> | ||||
<ClCompile Include="..\librf\src\timer.cpp" /> | <ClCompile Include="..\librf\src\timer.cpp" /> | ||||
<ClCompile Include="..\librf\src\when.cpp" /> | |||||
<ClCompile Include="..\librf\src\when_v2.cpp" /> | |||||
<ClCompile Include="..\tutorial\test_async_cb.cpp" /> | <ClCompile Include="..\tutorial\test_async_cb.cpp" /> | ||||
<ClCompile Include="..\tutorial\test_async_channel.cpp" /> | <ClCompile Include="..\tutorial\test_async_channel.cpp" /> | ||||
<ClCompile Include="..\tutorial\test_async_channel_mult_thread.cpp" /> | <ClCompile Include="..\tutorial\test_async_channel_mult_thread.cpp" /> | ||||
<ClInclude Include="..\librf\src\unix\coroutine.h" /> | <ClInclude Include="..\librf\src\unix\coroutine.h" /> | ||||
<ClInclude Include="..\librf\src\utils.h" /> | <ClInclude Include="..\librf\src\utils.h" /> | ||||
<ClInclude Include="..\librf\src\when.h" /> | <ClInclude Include="..\librf\src\when.h" /> | ||||
<ClInclude Include="..\librf\src\when_v2.h" /> | |||||
<ClInclude Include="..\librf\src\_awaker.h" /> | <ClInclude Include="..\librf\src\_awaker.h" /> | ||||
<ClInclude Include="..\tutorial\test_ring_queue.h" /> | <ClInclude Include="..\tutorial\test_ring_queue.h" /> | ||||
<ClInclude Include="dcas.h" /> | <ClInclude Include="dcas.h" /> |
<ClCompile Include="..\tutorial\test_async_when_all.cpp"> | <ClCompile Include="..\tutorial\test_async_when_all.cpp"> | ||||
<Filter>tutorial</Filter> | <Filter>tutorial</Filter> | ||||
</ClCompile> | </ClCompile> | ||||
<ClCompile Include="..\librf\src\when.cpp"> | |||||
<Filter>librf\src</Filter> | |||||
</ClCompile> | |||||
<ClCompile Include="..\benchmark\benchmark_asio_echo.cpp"> | <ClCompile Include="..\benchmark\benchmark_asio_echo.cpp"> | ||||
<Filter>benchmark</Filter> | <Filter>benchmark</Filter> | ||||
</ClCompile> | </ClCompile> | ||||
<ClCompile Include="..\tutorial\test_async_event_v2.cpp"> | <ClCompile Include="..\tutorial\test_async_event_v2.cpp"> | ||||
<Filter>tutorial</Filter> | <Filter>tutorial</Filter> | ||||
</ClCompile> | </ClCompile> | ||||
<ClCompile Include="..\librf\src\when_v2.cpp"> | |||||
<Filter>librf\src</Filter> | |||||
</ClCompile> | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<ClInclude Include="..\librf\librf.h"> | <ClInclude Include="..\librf\librf.h"> | ||||
<ClInclude Include="..\librf\src\current_scheduler.h"> | <ClInclude Include="..\librf\src\current_scheduler.h"> | ||||
<Filter>librf\src</Filter> | <Filter>librf\src</Filter> | ||||
</ClInclude> | </ClInclude> | ||||
<ClInclude Include="..\librf\src\when_v2.h"> | |||||
<Filter>librf\src</Filter> | |||||
</ClInclude> | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<None Include="..\librf\src\asio_task_1.12.0.inl"> | <None Include="..\librf\src\asio_task_1.12.0.inl"> |