diff --git a/librf/src/event_v2.cpp b/librf/src/event_v2.cpp index b57e51c..8a48b69 100644 --- a/librf/src/event_v2.cpp +++ b/librf/src/event_v2.cpp @@ -4,7 +4,7 @@ RESUMEF_NS { namespace detail { - void state_event_t::resume() + void state_event_base_t::resume() { coroutine_handle<> handler = _coro; if (handler) @@ -15,7 +15,7 @@ RESUMEF_NS } } - bool state_event_t::has_handler() const noexcept + bool state_event_base_t::has_handler() const noexcept { return (bool)_coro; } @@ -66,6 +66,61 @@ RESUMEF_NS return false; } + + + + void state_event_all_t::on_cancel() noexcept + { + intptr_t oldValue = _counter.load(std::memory_order_acquire); + if (oldValue >= 0 && _counter.compare_exchange_strong(oldValue, -1, std::memory_order_acq_rel)) + { + *_value = false; + _thandler.stop(); + + this->_coro = nullptr; + } + } + + bool state_event_all_t::on_notify(event_v2_impl*) + { + intptr_t oldValue = _counter.load(std::memory_order_acquire); + if (oldValue <= 0) return false; + + oldValue = _counter.fetch_add(-1, std::memory_order_acq_rel); + if (oldValue == 1) + { + *_value = true; + _thandler.stop(); + + assert(this->_scheduler != nullptr); + if (this->_coro) + this->_scheduler->add_generator(this); + + return true; + } + + return oldValue >= 1; + } + + bool state_event_all_t::on_timeout() + { + intptr_t oldValue = _counter.load(std::memory_order_acquire); + if (oldValue >= 0 && _counter.compare_exchange_strong(oldValue, -1, std::memory_order_acq_rel)) + { + *_value = false; + _thandler.reset(); + + assert(this->_scheduler != nullptr); + if (this->_coro) + this->_scheduler->add_generator(this); + + return true; + } + return false; + } + + + event_v2_impl::event_v2_impl(bool initially) noexcept : _counter(initially ? 1 : 0) { @@ -148,7 +203,7 @@ RESUMEF_NS event_t::timeout_awaiter event_t::wait_until_(const clock_type::time_point& tp) const noexcept { - return { _event.get(), tp }; + return { tp, _event.get() }; } } } diff --git a/librf/src/event_v2.h b/librf/src/event_v2.h index 00a66c3..dff0250 100644 --- a/librf/src/event_v2.h +++ b/librf/src/event_v2.h @@ -27,6 +27,9 @@ RESUMEF_NS awaiter operator co_await() const noexcept; awaiter wait() const noexcept; + template + struct timeout_awaitor_impl; + struct [[nodiscard]] timeout_awaiter; template @@ -47,79 +50,61 @@ RESUMEF_NS template) > RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>) - static auto wait_any(_Iter begin_, _Iter end_)->any_awaiter<_Iter>; + static auto wait_any(_Iter begin_, _Iter end_) + ->any_awaiter<_Iter>; template) > RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>) - static auto wait_any(_Cont& cnt_)->any_awaiter; + static auto wait_any(_Cont& cnt_) + ->any_awaiter; + + template + struct [[nodiscard]] timeout_any_awaiter; template) > RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>) - static future_t - wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_) - { - auto tidx = co_await when_any(sleep_for(dt), when_any(begin_, end_)); - if (tidx.first == 0) co_return -1; - - when_any_pair idx = any_cast(tidx.second); - co_return idx.first; - } + static auto wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_) + ->timeout_any_awaiter<_Iter>; template) > RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>) - static future_t - wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Cont& cont) - { - return wait_any_for(dt, std::begin(cont), std::end(cont)); - } + static auto wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Cont& cnt_) + ->timeout_any_awaiter; + template + struct [[nodiscard]] all_awaiter; template) > RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>) - static future_t - wait_all(_Iter begin_, _Iter end_) - { - auto vb = co_await when_all(begin_, end_); - co_return is_all_succeeded(vb); - } + static auto wait_all(_Iter begin_, _Iter end_) + ->all_awaiter<_Iter>; template) > RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>) - static future_t - wait_all(_Cont& cnt_) - { - auto vb = co_await when_all(std::begin(cnt_), std::end(cnt_)); - co_return is_all_succeeded(vb); - } + static auto wait_all(_Cont& cnt_) + ->all_awaiter; + + template + struct [[nodiscard]] timeout_all_awaiter; template) > RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>) - static future_t - wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_) - { - auto tidx = co_await when_any(sleep_for(dt), when_all(begin_, end_)); - if (tidx.first == 0) co_return false; - - std::vector& vb = any_cast&>(tidx.second); - co_return is_all_succeeded(vb); - } + static auto wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_) + ->timeout_all_awaiter<_Iter>; template) > RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>) - static future_t - wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Cont& cont) - { - return wait_all_for(dt, std::begin(cont), std::end(cont)); - } + static auto wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Cont& cnt_) + ->timeout_all_awaiter; event_t(const event_t&) = default; event_t(event_t&&) = default; diff --git a/librf/src/event_v2.inl b/librf/src/event_v2.inl index a30b325..4449172 100644 --- a/librf/src/event_v2.inl +++ b/librf/src/event_v2.inl @@ -4,7 +4,7 @@ RESUMEF_NS { namespace detail { - struct state_event_t; + struct state_event_base_t; struct event_v2_impl : public std::enable_shared_from_this { @@ -27,12 +27,10 @@ RESUMEF_NS static constexpr bool USE_LINK_QUEUE = false; using lock_type = std::conditional_t; - using state_event_ptr = counted_ptr; - using link_state_queue = intrusive_link_queue; + using state_event_ptr = counted_ptr; + using link_state_queue = intrusive_link_queue; using wait_queue_type = std::conditional_t>; - friend struct state_event_t; - bool try_wait_one() noexcept { if (_counter.load(std::memory_order_acquire) > 0) @@ -44,7 +42,7 @@ RESUMEF_NS return false; } - void add_wait_list(state_event_t* state) noexcept + void add_wait_list(state_event_base_t* state) noexcept { assert(state != nullptr); _wait_awakes.push_back(state); @@ -62,18 +60,14 @@ RESUMEF_NS event_v2_impl& operator=(event_v2_impl&&) = delete; }; - struct state_event_t : public state_base_t + struct state_event_base_t : public state_base_t { - state_event_t(event_v2_impl*& val) - : _value(&val) - {} - virtual void resume() override; virtual bool has_handler() const noexcept override; - void on_cancel() noexcept; - bool on_notify(event_v2_impl* eptr); - bool on_timeout(); + virtual void on_cancel() noexcept = 0; + virtual bool on_notify(event_v2_impl* eptr) = 0; + virtual bool on_timeout() = 0; //将自己加入到通知链表里 template>> @@ -89,19 +83,51 @@ RESUMEF_NS return sch; } - public: - typedef spinlock lock_type; - //为浸入式单向链表提供的next指针 - //counted_ptr _next = nullptr; + //counted_ptr _next = nullptr; + }; + + struct state_event_t : public state_event_base_t + { + state_event_t(event_v2_impl*& val) + : _value(&val) + {} + + virtual void on_cancel() noexcept override; + virtual bool on_notify(event_v2_impl* eptr) override; + virtual bool on_timeout() override; + public: + //typedef spinlock lock_type; + timer_handler _thandler; - private: + protected: //_value引用awaitor保存的值,这样可以尽可能减少创建state的可能。而不必进入没有state就没有value实体被用于返回。 //在调用on_notify()或on_timeout()任意之一后,置为nullptr。 //这样来保证要么超时了,要么响应了signal的通知了。 //这个指针在on_notify()和on_timeout()里,当作一个互斥的锁来防止同时进入两个函数 std::atomic _value; }; + + struct state_event_all_t : public state_event_base_t + { + state_event_all_t(intptr_t count, bool& val) + : _counter(count) + , _value(&val) + {} + + virtual void on_cancel() noexcept override; + virtual bool on_notify(event_v2_impl* eptr) override; + virtual bool on_timeout() override; + public: + //typedef spinlock lock_type; + + //为浸入式单向链表提供的next指针 + //counted_ptr _next = nullptr; + timer_handler _thandler; + std::atomic _counter; + protected: + bool* _value; + }; } inline namespace event_v2 @@ -171,36 +197,44 @@ RESUMEF_NS return { _event.get() }; } - struct [[nodiscard]] event_t::timeout_awaiter : public event_t::awaiter + template + struct event_t::timeout_awaitor_impl : public _Btype { - timeout_awaiter(detail::event_v2_impl* evt, clock_type::time_point tp) noexcept - : awaiter(evt) + template + timeout_awaitor_impl(clock_type::time_point tp, Args... args) + : _Btype(std::forward(args)...) , _tp(tp) - { - } - + {} template>> bool await_suspend(coroutine_handle<_PromiseT> handler) { - if (!awaiter::await_suspend(handler)) + if (!_Btype::await_suspend(handler)) return false; _PromiseT& promise = handler.promise(); auto* parent_state = promise.get_state(); scheduler_t* sch = parent_state->get_scheduler(); - _state->_thandler = sch->timer()->add_handler(_tp, [_state=_state](bool canceld) - { - if (!canceld) - _state->on_timeout(); - }); + _state->_thandler = sch->timer()->add_handler(_tp, [_state = _state](bool canceld) + { + if (!canceld) + _state->on_timeout(); + }); return true; } - private: + protected: clock_type::time_point _tp; }; + struct [[nodiscard]] event_t::timeout_awaiter : event_t::timeout_awaitor_impl + { + timeout_awaiter(clock_type::time_point tp, detail::event_v2_impl* evt) noexcept + : timeout_awaitor_impl(tp, evt) + { + } + }; + template inline event_t::timeout_awaiter event_t::wait_for(const std::chrono::duration<_Rep, _Period>& dt) const noexcept { @@ -294,7 +328,7 @@ RESUMEF_NS return -1; } - private: + protected: detail::event_v2_impl* _event = nullptr; counted_ptr _state; _Iter _begin; @@ -314,5 +348,146 @@ RESUMEF_NS { return { std::begin(cnt_), std::end(cnt_) }; } + + + + template + struct [[nodiscard]] event_t::timeout_any_awaiter : timeout_awaitor_impl> + { + timeout_any_awaiter(clock_type::time_point tp, _Iter begin, _Iter end) noexcept + : timeout_awaitor_impl(tp, begin, end) + {} + }; + + template + RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>) + auto event_t::wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_) + ->event_t::timeout_any_awaiter<_Iter> + { + clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast(dt); + return { tp, begin_, end_ }; + } + + template + RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>) + auto event_t::wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Cont& cnt_) + ->event_t::timeout_any_awaiter + { + clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast(dt); + return { tp, std::begin(cnt_), std::end(cnt_) }; + } + + + + template + struct [[nodiscard]] event_t::all_awaiter + { + all_awaiter(_Iter begin, _Iter end) noexcept + : _begin(begin) + , _end(end) + { + } + + bool await_ready() noexcept + { + _value = _begin == _end; + return _value; + } + + template>> + bool await_suspend(coroutine_handle<_PromiseT> handler) + { + intptr_t count = std::distance(_begin, _end); + + std::vector lockes; + lockes.reserve(count); + + for (auto iter = _begin; iter != _end; ++iter) + { + detail::event_v2_impl* evt = (*iter)._event.get(); + lockes.push_back(evt->_lock); + } + + _state = new detail::state_event_all_t(count, _value); + (void)_state->on_await_suspend(handler); + + scoped_lock_range lock_(lockes); + + for (auto iter = _begin; iter != _end; ++iter) + { + detail::event_v2_impl* evt = (*iter)._event.get(); + if (evt->try_wait_one()) + { + _state->_counter.fetch_sub(1, std::memory_order_acq_rel); + } + else + { + evt->add_wait_list(_state.get()); + } + } + + if (_state->_counter.load(std::memory_order_relaxed) == 0) + { + _state = nullptr; + _value = true; + + return false; + } + + return true; + } + + bool await_resume() noexcept + { + return _value; + } + protected: + _Iter _begin; + _Iter _end; + counted_ptr _state; + bool _value = false; + }; + + template + RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>) + auto event_t::wait_all(_Iter begin_, _Iter end_) ->event_t::all_awaiter<_Iter> + { + return { begin_, end_ }; + } + + template + RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>) + auto event_t::wait_all(_Cont& cnt_) ->event_t::all_awaiter + { + return { std::begin(cnt_), std::end(cnt_) }; + } + + + + template + struct [[nodiscard]] event_t::timeout_all_awaiter : timeout_awaitor_impl> + { + timeout_all_awaiter(clock_type::time_point tp, _Iter begin, _Iter end) noexcept + : timeout_awaitor_impl(tp, begin, end) + {} + }; + + template + RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>) + auto event_t::wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_) + ->event_t::timeout_all_awaiter<_Iter> + { + clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast(dt); + return { tp, begin_, end_ }; + } + + template + RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>) + auto event_t::wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Cont& cnt_) + ->event_t::timeout_all_awaiter + { + clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast(dt); + return { tp, std::begin(cnt_), std::end(cnt_) }; + } } } diff --git a/tutorial/test_async_event.cpp b/tutorial/test_async_event.cpp index 37f3bfa..19605db 100644 --- a/tutorial/test_async_event.cpp +++ b/tutorial/test_async_event.cpp @@ -150,8 +150,8 @@ static void test_wait_all_timeout() void resumable_main_event() { - //test_wait_one(); - //std::cout << std::endl; + test_wait_one(); + std::cout << std::endl; test_wait_any(); std::cout << std::endl;