Browse Source

完善event_v2的wait_all功能

tags/v2.9.7
tearshark 4 years ago
parent
commit
d09152280c
4 changed files with 294 additions and 79 deletions
  1. 58
    3
      librf/src/event_v2.cpp
  2. 27
    42
      librf/src/event_v2.h
  3. 207
    32
      librf/src/event_v2.inl
  4. 2
    2
      tutorial/test_async_event.cpp

+ 58
- 3
librf/src/event_v2.cpp View File

@@ -4,7 +4,7 @@ RESUMEF_NS
{
namespace detail
{
void state_event_t::resume()
void state_event_base_t::resume()
{
coroutine_handle<> handler = _coro;
if (handler)
@@ -15,7 +15,7 @@ RESUMEF_NS
}
}

bool state_event_t::has_handler() const noexcept
bool state_event_base_t::has_handler() const noexcept
{
return (bool)_coro;
}
@@ -66,6 +66,61 @@ RESUMEF_NS
return false;
}




void state_event_all_t::on_cancel() noexcept
{
intptr_t oldValue = _counter.load(std::memory_order_acquire);
if (oldValue >= 0 && _counter.compare_exchange_strong(oldValue, -1, std::memory_order_acq_rel))
{
*_value = false;
_thandler.stop();

this->_coro = nullptr;
}
}

bool state_event_all_t::on_notify(event_v2_impl*)
{
intptr_t oldValue = _counter.load(std::memory_order_acquire);
if (oldValue <= 0) return false;

oldValue = _counter.fetch_add(-1, std::memory_order_acq_rel);
if (oldValue == 1)
{
*_value = true;
_thandler.stop();

assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);

return true;
}

return oldValue >= 1;
}

bool state_event_all_t::on_timeout()
{
intptr_t oldValue = _counter.load(std::memory_order_acquire);
if (oldValue >= 0 && _counter.compare_exchange_strong(oldValue, -1, std::memory_order_acq_rel))
{
*_value = false;
_thandler.reset();

assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);

return true;
}
return false;
}



event_v2_impl::event_v2_impl(bool initially) noexcept
: _counter(initially ? 1 : 0)
{
@@ -148,7 +203,7 @@ RESUMEF_NS

event_t::timeout_awaiter event_t::wait_until_(const clock_type::time_point& tp) const noexcept
{
return { _event.get(), tp };
return { tp, _event.get() };
}
}
}

+ 27
- 42
librf/src/event_v2.h View File

@@ -27,6 +27,9 @@ RESUMEF_NS
awaiter operator co_await() const noexcept;
awaiter wait() const noexcept;

template<class _Btype>
struct timeout_awaitor_impl;

struct [[nodiscard]] timeout_awaiter;

template<class _Rep, class _Period>
@@ -47,79 +50,61 @@ RESUMEF_NS
template<class _Iter
COMMA_RESUMEF_ENABLE_IF(traits::is_iterator_of_v<_Iter, event_t>)
> RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>)
static auto wait_any(_Iter begin_, _Iter end_)->any_awaiter<_Iter>;
static auto wait_any(_Iter begin_, _Iter end_)
->any_awaiter<_Iter>;

template<class _Cont
COMMA_RESUMEF_ENABLE_IF(traits::is_container_of_v<_Cont, event_t>)
> RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
static auto wait_any(_Cont& cnt_)->any_awaiter<decltype(std::begin(cnt_))>;
static auto wait_any(_Cont& cnt_)
->any_awaiter<decltype(std::begin(cnt_))>;

template<class _Iter>
struct [[nodiscard]] timeout_any_awaiter;

template<class _Rep, class _Period, class _Iter
COMMA_RESUMEF_ENABLE_IF(traits::is_iterator_of_v<_Iter, event_t>)
> RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>)
static future_t<intptr_t>
wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_)
{
auto tidx = co_await when_any(sleep_for(dt), when_any(begin_, end_));
if (tidx.first == 0) co_return -1;

when_any_pair idx = any_cast<when_any_pair>(tidx.second);
co_return idx.first;
}
static auto wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_)
->timeout_any_awaiter<_Iter>;

template<class _Rep, class _Period, class _Cont
COMMA_RESUMEF_ENABLE_IF(traits::is_container_of_v<_Cont, event_t>)
> RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
static future_t<intptr_t>
wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Cont& cont)
{
return wait_any_for(dt, std::begin(cont), std::end(cont));
}
static auto wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Cont& cnt_)
->timeout_any_awaiter<decltype(std::begin(cnt_))>;



template<class _Iter>
struct [[nodiscard]] all_awaiter;

template<class _Iter
COMMA_RESUMEF_ENABLE_IF(traits::is_iterator_of_v<_Iter, event_t>)
> RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>)
static future_t<bool>
wait_all(_Iter begin_, _Iter end_)
{
auto vb = co_await when_all(begin_, end_);
co_return is_all_succeeded(vb);
}
static auto wait_all(_Iter begin_, _Iter end_)
->all_awaiter<_Iter>;

template<class _Cont
COMMA_RESUMEF_ENABLE_IF(traits::is_container_of_v<_Cont, event_t>)
> RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
static future_t<bool>
wait_all(_Cont& cnt_)
{
auto vb = co_await when_all(std::begin(cnt_), std::end(cnt_));
co_return is_all_succeeded(vb);
}
static auto wait_all(_Cont& cnt_)
->all_awaiter<decltype(std::begin(cnt_))>;

template<class _Iter>
struct [[nodiscard]] timeout_all_awaiter;

template<class _Rep, class _Period, class _Iter
COMMA_RESUMEF_ENABLE_IF(traits::is_iterator_of_v<_Iter, event_t>)
> RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>)
static future_t<bool>
wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_)
{
auto tidx = co_await when_any(sleep_for(dt), when_all(begin_, end_));
if (tidx.first == 0) co_return false;

std::vector<bool>& vb = any_cast<std::vector<bool>&>(tidx.second);
co_return is_all_succeeded(vb);
}
static auto wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_)
->timeout_all_awaiter<_Iter>;

template<class _Rep, class _Period, class _Cont
COMMA_RESUMEF_ENABLE_IF(traits::is_container_of_v<_Cont, event_t>)
> RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
static future_t<bool>
wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Cont& cont)
{
return wait_all_for(dt, std::begin(cont), std::end(cont));
}
static auto wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Cont& cnt_)
->timeout_all_awaiter<decltype(std::begin(cnt_))>;

event_t(const event_t&) = default;
event_t(event_t&&) = default;

+ 207
- 32
librf/src/event_v2.inl View File

@@ -4,7 +4,7 @@ RESUMEF_NS
{
namespace detail
{
struct state_event_t;
struct state_event_base_t;
struct event_v2_impl : public std::enable_shared_from_this<event_v2_impl>
{
@@ -27,12 +27,10 @@ RESUMEF_NS
static constexpr bool USE_LINK_QUEUE = false;
using lock_type = std::conditional_t<USE_SPINLOCK, spinlock, std::recursive_mutex>;
using state_event_ptr = counted_ptr<state_event_t>;
using link_state_queue = intrusive_link_queue<state_event_t, state_event_ptr>;
using state_event_ptr = counted_ptr<state_event_base_t>;
using link_state_queue = intrusive_link_queue<state_event_base_t, state_event_ptr>;
using wait_queue_type = std::conditional_t<USE_LINK_QUEUE, link_state_queue, std::list<state_event_ptr>>;
friend struct state_event_t;
bool try_wait_one() noexcept
{
if (_counter.load(std::memory_order_acquire) > 0)
@@ -44,7 +42,7 @@ RESUMEF_NS
return false;
}
void add_wait_list(state_event_t* state) noexcept
void add_wait_list(state_event_base_t* state) noexcept
{
assert(state != nullptr);
_wait_awakes.push_back(state);
@@ -62,18 +60,14 @@ RESUMEF_NS
event_v2_impl& operator=(event_v2_impl&&) = delete;
};
struct state_event_t : public state_base_t
struct state_event_base_t : public state_base_t
{
state_event_t(event_v2_impl*& val)
: _value(&val)
{}
virtual void resume() override;
virtual bool has_handler() const noexcept override;
void on_cancel() noexcept;
bool on_notify(event_v2_impl* eptr);
bool on_timeout();
virtual void on_cancel() noexcept = 0;
virtual bool on_notify(event_v2_impl* eptr) = 0;
virtual bool on_timeout() = 0;
//将自己加入到通知链表里
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
@@ -89,19 +83,51 @@ RESUMEF_NS
return sch;
}
//为浸入式单向链表提供的next指针
//counted_ptr<state_event_base_t> _next = nullptr;
};
struct state_event_t : public state_event_base_t
{
state_event_t(event_v2_impl*& val)
: _value(&val)
{}
virtual void on_cancel() noexcept override;
virtual bool on_notify(event_v2_impl* eptr) override;
virtual bool on_timeout() override;
public:
typedef spinlock lock_type;
//typedef spinlock lock_type;
//为浸入式单向链表提供的next指针
//counted_ptr<state_event_t> _next = nullptr;
timer_handler _thandler;
private:
protected:
//_value引用awaitor保存的值,这样可以尽可能减少创建state的可能。而不必进入没有state就没有value实体被用于返回。
//在调用on_notify()或on_timeout()任意之一后,置为nullptr。
//这样来保证要么超时了,要么响应了signal的通知了。
//这个指针在on_notify()和on_timeout()里,当作一个互斥的锁来防止同时进入两个函数
std::atomic<event_v2_impl**> _value;
};
struct state_event_all_t : public state_event_base_t
{
state_event_all_t(intptr_t count, bool& val)
: _counter(count)
, _value(&val)
{}
virtual void on_cancel() noexcept override;
virtual bool on_notify(event_v2_impl* eptr) override;
virtual bool on_timeout() override;
public:
//typedef spinlock lock_type;
//为浸入式单向链表提供的next指针
//counted_ptr<state_event_t> _next = nullptr;
timer_handler _thandler;
std::atomic<intptr_t> _counter;
protected:
bool* _value;
};
}
inline namespace event_v2
@@ -171,36 +197,44 @@ RESUMEF_NS
return { _event.get() };
}
struct [[nodiscard]] event_t::timeout_awaiter : public event_t::awaiter
template<class _Btype>
struct event_t::timeout_awaitor_impl : public _Btype
{
timeout_awaiter(detail::event_v2_impl* evt, clock_type::time_point tp) noexcept
: awaiter(evt)
template<class... Args>
timeout_awaitor_impl(clock_type::time_point tp, Args... args)
: _Btype(std::forward<Args>(args)...)
, _tp(tp)
{
}
{}
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
if (!awaiter::await_suspend(handler))
if (!_Btype::await_suspend(handler))
return false;
_PromiseT& promise = handler.promise();
auto* parent_state = promise.get_state();
scheduler_t* sch = parent_state->get_scheduler();
_state->_thandler = sch->timer()->add_handler(_tp, [_state=_state](bool canceld)
{
if (!canceld)
_state->on_timeout();
});
_state->_thandler = sch->timer()->add_handler(_tp, [_state = _state](bool canceld)
{
if (!canceld)
_state->on_timeout();
});
return true;
}
private:
protected:
clock_type::time_point _tp;
};
struct [[nodiscard]] event_t::timeout_awaiter : event_t::timeout_awaitor_impl<event_t::awaiter>
{
timeout_awaiter(clock_type::time_point tp, detail::event_v2_impl* evt) noexcept
: timeout_awaitor_impl(tp, evt)
{
}
};
template<class _Rep, class _Period>
inline event_t::timeout_awaiter event_t::wait_for(const std::chrono::duration<_Rep, _Period>& dt) const noexcept
{
@@ -294,7 +328,7 @@ RESUMEF_NS
return -1;
}
private:
protected:
detail::event_v2_impl* _event = nullptr;
counted_ptr<detail::state_event_t> _state;
_Iter _begin;
@@ -314,5 +348,146 @@ RESUMEF_NS
{
return { std::begin(cnt_), std::end(cnt_) };
}
template<class _Iter>
struct [[nodiscard]] event_t::timeout_any_awaiter : timeout_awaitor_impl<event_t::any_awaiter<_Iter>>
{
timeout_any_awaiter(clock_type::time_point tp, _Iter begin, _Iter end) noexcept
: timeout_awaitor_impl(tp, begin, end)
{}
};
template<class _Rep, class _Period, class _Iter COMMA_RESUMEF_ENABLE_IF_TYPENAME()>
RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>)
auto event_t::wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_)
->event_t::timeout_any_awaiter<_Iter>
{
clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt);
return { tp, begin_, end_ };
}
template<class _Rep, class _Period, class _Cont COMMA_RESUMEF_ENABLE_IF_TYPENAME()>
RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
auto event_t::wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Cont& cnt_)
->event_t::timeout_any_awaiter<decltype(std::begin(cnt_))>
{
clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt);
return { tp, std::begin(cnt_), std::end(cnt_) };
}
template<class _Iter>
struct [[nodiscard]] event_t::all_awaiter
{
all_awaiter(_Iter begin, _Iter end) noexcept
: _begin(begin)
, _end(end)
{
}
bool await_ready() noexcept
{
_value = _begin == _end;
return _value;
}
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
intptr_t count = std::distance(_begin, _end);
std::vector<detail::event_v2_impl::lock_type> lockes;
lockes.reserve(count);
for (auto iter = _begin; iter != _end; ++iter)
{
detail::event_v2_impl* evt = (*iter)._event.get();
lockes.push_back(evt->_lock);
}
_state = new detail::state_event_all_t(count, _value);
(void)_state->on_await_suspend(handler);
scoped_lock_range<detail::event_v2_impl::lock_type> lock_(lockes);
for (auto iter = _begin; iter != _end; ++iter)
{
detail::event_v2_impl* evt = (*iter)._event.get();
if (evt->try_wait_one())
{
_state->_counter.fetch_sub(1, std::memory_order_acq_rel);
}
else
{
evt->add_wait_list(_state.get());
}
}
if (_state->_counter.load(std::memory_order_relaxed) == 0)
{
_state = nullptr;
_value = true;
return false;
}
return true;
}
bool await_resume() noexcept
{
return _value;
}
protected:
_Iter _begin;
_Iter _end;
counted_ptr<detail::state_event_all_t> _state;
bool _value = false;
};
template<class _Iter COMMA_RESUMEF_ENABLE_IF_TYPENAME()>
RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>)
auto event_t::wait_all(_Iter begin_, _Iter end_) ->event_t::all_awaiter<_Iter>
{
return { begin_, end_ };
}
template<class _Cont COMMA_RESUMEF_ENABLE_IF_TYPENAME()>
RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
auto event_t::wait_all(_Cont& cnt_) ->event_t::all_awaiter<decltype(std::begin(cnt_))>
{
return { std::begin(cnt_), std::end(cnt_) };
}
template<class _Iter>
struct [[nodiscard]] event_t::timeout_all_awaiter : timeout_awaitor_impl<event_t::all_awaiter<_Iter>>
{
timeout_all_awaiter(clock_type::time_point tp, _Iter begin, _Iter end) noexcept
: timeout_awaitor_impl(tp, begin, end)
{}
};
template<class _Rep, class _Period, class _Iter COMMA_RESUMEF_ENABLE_IF_TYPENAME()>
RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>)
auto event_t::wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_)
->event_t::timeout_all_awaiter<_Iter>
{
clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt);
return { tp, begin_, end_ };
}
template<class _Rep, class _Period, class _Cont COMMA_RESUMEF_ENABLE_IF_TYPENAME()>
RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
auto event_t::wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Cont& cnt_)
->event_t::timeout_all_awaiter<decltype(std::begin(cnt_))>
{
clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt);
return { tp, std::begin(cnt_), std::end(cnt_) };
}
}
}

+ 2
- 2
tutorial/test_async_event.cpp View File

@@ -150,8 +150,8 @@ static void test_wait_all_timeout()
void resumable_main_event()
{
//test_wait_one();
//std::cout << std::endl;
test_wait_one();
std::cout << std::endl;
test_wait_any();
std::cout << std::endl;

Loading…
Cancel
Save