Browse Source

完成event的wait_all_for功能

3.0.0
tearshark 2 years ago
parent
commit
dfa3811619

+ 40
- 0
include/librf/src/event_v2.h View File

template<class _Iter> template<class _Iter>
struct [[nodiscard]] any_awaiter; struct [[nodiscard]] any_awaiter;


/**
* @brief 在协程中等待任意一个信号触发。
* @details 如果已经有信号触发,则立即返回第一个触发信号的索引。\n
* 否则,当前协程被阻塞,直到信号被触发后唤醒。
* 至少消耗一次信号触发次数。
* @param begin_ 容纳信号的容器的首迭代器
* @param end_ 容纳信号的容器的尾迭代器(不包含有效信号)
* @retval intptr_t [co_await] 返回第一个触发信号的索引
* @attention 只能在协程中调用。
*/
template<class _Iter> template<class _Iter>
requires(_IteratorOfT<_Iter, event_t>) requires(_IteratorOfT<_Iter, event_t>)
static auto wait_any(_Iter begin_, _Iter end_)->any_awaiter<_Iter>; static auto wait_any(_Iter begin_, _Iter end_)->any_awaiter<_Iter>;


/**
* @brief 在协程中等待任意一个信号触发。
* @details 如果已经有信号触发,则立即返回第一个触发信号的索引。\n
* 否则,当前协程被阻塞,直到信号被触发后唤醒。
* 至少消耗一次信号触发次数。
* @param cnt_ 容纳信号的容器,需要支持std::begin(cnt_)和std::end(cnt_)
* @retval intptr_t [co_await] 返回第一个触发信号的索引
* @attention 只能在协程中调用。
*/
template<class _Cont> template<class _Cont>
requires(_ContainerOfT<_Cont, event_t>) requires(_ContainerOfT<_Cont, event_t>)
static auto wait_any(const _Cont& cnt_)->any_awaiter<decltype(std::begin(cnt_))>; static auto wait_any(const _Cont& cnt_)->any_awaiter<decltype(std::begin(cnt_))>;
template<class _Iter> template<class _Iter>
struct [[nodiscard]] timeout_any_awaiter; struct [[nodiscard]] timeout_any_awaiter;


/**
* @brief 在协程中等待任意一个信号触发,直到超时。
* @details 如果已经有信号触发,则立即返回第一个触发信号的索引。\n
* 否则,当前协程被阻塞,直到信号被触发后,或者超时后唤醒。
* 如果等到了信号,则至少消耗一次信号触发次数。
* @param dt 超时时长
* @param begin_ 容纳信号的容器的首迭代器
* @param end_ 容纳信号的容器的尾迭代器(不包含有效信号)
* @retval intptr_t [co_await] 如果等到了任意一个信号,返回其索引(相对于begin_的距离);否则,返回-1
* @attention 只能在协程中调用。
*/
template<class _Rep, class _Period, class _Iter> template<class _Rep, class _Period, class _Iter>
requires(_IteratorOfT<_Iter, event_t>) requires(_IteratorOfT<_Iter, event_t>)
static auto wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_) static auto wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_)
->timeout_any_awaiter<_Iter>; ->timeout_any_awaiter<_Iter>;


/**
* @brief 在协程中等待任意一个信号触发,直到超时。
* @details 如果已经有信号触发,则立即返回第一个触发信号的索引。\n
* 否则,当前协程被阻塞,直到信号被触发后,或者超时后唤醒。
* 如果等到了信号,则至少消耗一次信号触发次数。
* @param dt 超时时长
* @param cnt_ 容纳信号的容器,需要支持std::begin(cnt_)和std::end(cnt_)
* @retval intptr_t [co_await] 如果等到了任意一个信号,返回其索引(相对于begin_的距离);否则,返回-1
* @attention 只能在协程中调用。
*/
template<class _Rep, class _Period, class _Cont> template<class _Rep, class _Period, class _Cont>
requires(_ContainerOfT<_Cont, event_t>) requires(_ContainerOfT<_Cont, event_t>)
static auto wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_) static auto wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_)

+ 80
- 21
include/librf/src/event_v2.inl View File

{ {
return _counter.load(std::memory_order_acquire) > 0; return _counter.load(std::memory_order_acquire) > 0;
} }
void reset() noexcept
{
_counter.store(0, std::memory_order_release);
}
LIBRF_API void signal_all() noexcept; LIBRF_API void signal_all() noexcept;
LIBRF_API void signal() noexcept; LIBRF_API void signal() noexcept;
LIBRF_API void reset() noexcept;


LIBRF_API void add_wait_list(state_event_base_t* state); LIBRF_API void add_wait_list(state_event_base_t* state);
LIBRF_API void remove_wait_list(state_event_base_t* state); LIBRF_API void remove_wait_list(state_event_base_t* state);
struct state_event_base_t : public state_base_t struct state_event_base_t : public state_base_t
, public intrusive_link_node<state_event_base_t, counted_ptr<state_event_base_t>> , public intrusive_link_node<state_event_base_t, counted_ptr<state_event_base_t>>
{ {
LIBRF_API virtual void resume() override;
LIBRF_API virtual bool has_handler() const noexcept override;

virtual void on_cancel() noexcept = 0; virtual void on_cancel() noexcept = 0;
virtual bool on_notify(event_v2_impl* eptr) = 0; virtual bool on_notify(event_v2_impl* eptr) = 0;
virtual bool on_timeout() = 0; virtual bool on_timeout() = 0;
}); });
} }


protected:
timer_handler _thandler; timer_handler _thandler;
}; };



struct state_event_t : public state_event_base_t struct state_event_t : public state_event_base_t
{ {
state_event_t(event_v2_impl*& val) state_event_t(event_v2_impl*& val)
std::atomic<event_v2_impl**> _value; std::atomic<event_v2_impl**> _value;
}; };


struct state_event_all_t : public state_event_base_t
struct state_event_all_t : public state_base_t
{ {
using sub_state_t = std::pair<counted_ptr<state_event_base_t>, detail::event_v2_impl*>;

state_event_all_t(intptr_t count, bool& val) state_event_all_t(intptr_t count, bool& val)
: _counter(count) : _counter(count)
, _value(&val)
{}
, _result(&val)
{
_values.resize(count, sub_state_t{nullptr, nullptr});
}


LIBRF_API virtual void on_cancel() noexcept override;
LIBRF_API virtual bool on_notify(event_v2_impl* eptr) override;
LIBRF_API virtual bool on_timeout() override;
LIBRF_API void on_cancel(intptr_t idx);
LIBRF_API bool on_notify(event_v2_impl* eptr, intptr_t idx);
LIBRF_API bool on_timeout();


std::atomic<intptr_t> _counter;
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
scheduler_t* on_await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
_PromiseT& promise = handler.promise();
auto* parent = promise.get_state();
scheduler_t* sch = parent->get_scheduler();

this->_scheduler = sch;
this->_coro = handler;

return sch;
}

inline void add_timeout_timer(std::chrono::system_clock::time_point tp)
{
this->_thandler = this->_scheduler->timer()->add_handler(tp,
[st = counted_ptr<state_event_all_t>{ this }](bool canceld)
{
if (!canceld)
st->on_timeout();
});
}

std::vector<sub_state_t> _values;
intptr_t _counter;
event_v2_impl::lock_type _lock;
protected: protected:
bool* _value;
timer_handler _thandler;
bool* _result;
};

template<class _StateT>
struct state_event_proxy_t : public state_event_base_t
{
state_event_proxy_t(_StateT* sta, intptr_t idx)
: _state(sta)
, _index(idx)
{
assert(sta != nullptr);
assert(idx >= 0);
}

void on_cancel() noexcept override { return _state->on_cancel(_index); }
bool on_notify(event_v2_impl* eptr) override { return _state->on_notify(eptr, _index); }
bool on_timeout() override { assert(false); return false; }
private:
_StateT* _state;
intptr_t _index;
}; };
} }


(void)_state->on_await_suspend(handler); (void)_state->on_await_suspend(handler);


if constexpr (!std::is_same_v<std::remove_reference_t<_Timeout>, std::nullptr_t>) if constexpr (!std::is_same_v<std::remove_reference_t<_Timeout>, std::nullptr_t>)
{
cb(); cb();
}


for (auto iter = _begin; iter != _end; ++iter) for (auto iter = _begin; iter != _end; ++iter)
{ {
requires(_IteratorOfT<_Iter, event_t>) requires(_IteratorOfT<_Iter, event_t>)
auto event_t::wait_any(_Iter begin_, _Iter end_) ->event_t::any_awaiter<_Iter> auto event_t::wait_any(_Iter begin_, _Iter end_) ->event_t::any_awaiter<_Iter>
{ {
assert(false && "Function is flawed!");
return { begin_, end_ }; return { begin_, end_ };
} }


requires(_ContainerOfT<_Cont, event_t>) requires(_ContainerOfT<_Cont, event_t>)
auto event_t::wait_any(const _Cont& cnt_) ->event_t::any_awaiter<decltype(std::begin(cnt_))> auto event_t::wait_any(const _Cont& cnt_) ->event_t::any_awaiter<decltype(std::begin(cnt_))>
{ {
assert(false && "Function is flawed!");
return { std::begin(cnt_), std::end(cnt_) }; return { std::begin(cnt_), std::end(cnt_) };
} }


auto event_t::wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_) auto event_t::wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_)
->event_t::timeout_any_awaiter<_Iter> ->event_t::timeout_any_awaiter<_Iter>
{ {
assert(false && "Function is flawed!");
clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt); clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt);
return { tp, begin_, end_ }; return { tp, begin_, end_ };
} }
auto event_t::wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_) auto event_t::wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_)
->event_t::timeout_any_awaiter<decltype(std::begin(cnt_))> ->event_t::timeout_any_awaiter<decltype(std::begin(cnt_))>
{ {
assert(false && "Function is flawed!");
clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt); clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt);
return { tp, std::begin(cnt_), std::end(cnt_) }; return { tp, std::begin(cnt_), std::end(cnt_) };
} }
bool await_suspend2(coroutine_handle<_PromiseT> handler, const _Timeout& cb) bool await_suspend2(coroutine_handle<_PromiseT> handler, const _Timeout& cb)
{ {
(void)cb; (void)cb;
intptr_t count = std::distance(_begin, _end);
const intptr_t count = std::distance(_begin, _end);


using ref_lock_type = std::reference_wrapper<detail::event_v2_impl::lock_type>; using ref_lock_type = std::reference_wrapper<detail::event_v2_impl::lock_type>;
std::vector<ref_lock_type> lockes; std::vector<ref_lock_type> lockes;
lockes.reserve(count);
lockes.reserve(count + 1);


for (auto iter = _begin; iter != _end; ++iter) for (auto iter = _begin; iter != _end; ++iter)
{ {
} }


_state = new detail::state_event_all_t(count, _value); _state = new detail::state_event_all_t(count, _value);
lockes.push_back(_state->_lock);
(void)_state->on_await_suspend(handler); (void)_state->on_await_suspend(handler);


if constexpr (!std::is_same_v<std::remove_reference_t<_Timeout>, std::nullptr_t>) if constexpr (!std::is_same_v<std::remove_reference_t<_Timeout>, std::nullptr_t>)
{
cb(); cb();
}


batch_lock_t<ref_lock_type> lock_(lockes); batch_lock_t<ref_lock_type> lock_(lockes);


for (auto iter = _begin; iter != _end; ++iter)
intptr_t idx = 0;
for (auto iter = _begin; iter != _end; ++iter, ++idx)
{ {
detail::event_v2_impl* evt = (*iter)._event.get(); detail::event_v2_impl* evt = (*iter)._event.get();
if (evt->try_wait_one()) if (evt->try_wait_one())
{ {
_state->_counter.fetch_sub(1, std::memory_order_acq_rel);
--_state->_counter;
_state->_values[idx].second = evt;
} }
else else
{ {
evt->add_wait_list(_state.get());
auto* proxy = new detail::state_event_proxy_t<detail::state_event_all_t>(_state.get(), idx);
_state->_values[idx] = detail::state_event_all_t::sub_state_t{ proxy, evt };

evt->add_wait_list(proxy);
} }
} }


if (_state->_counter.load(std::memory_order_relaxed) == 0)
if (_state->_counter == 0)
{ {
_state = nullptr; _state = nullptr;
_value = true; _value = true;

+ 1
- 1
include/librf/src/intrusive_link_queue.h View File

_Node* _prev; _Node* _prev;
_Nextptr _next; _Nextptr _next;


template<class _Node, class _Nodeptr>
template<class _Node2, class _Nodeptr2>
friend struct intrusive_link_queue; friend struct intrusive_link_queue;
}; };



+ 2
- 2
include/librf/src/state.h View File

private: private:
LIBRF_API virtual void destroy_deallocate(); LIBRF_API virtual void destroy_deallocate();
public: public:
virtual void resume() = 0;
virtual bool has_handler() const noexcept = 0;
LIBRF_API virtual void resume();
LIBRF_API virtual bool has_handler() const noexcept;
LIBRF_API virtual state_base_t* get_parent() const noexcept; LIBRF_API virtual state_base_t* get_parent() const noexcept;


void set_scheduler(scheduler_t* sch) noexcept void set_scheduler(scheduler_t* sch) noexcept

+ 59
- 41
source/event_v2.cpp View File

{ {
namespace detail namespace detail
{ {
LIBRF_API void state_event_base_t::resume()
{
coroutine_handle<> handler = _coro;
if (handler)
{
_coro = nullptr;
_scheduler->del_final(this);
handler.resume();
}
}

LIBRF_API bool state_event_base_t::has_handler() const noexcept
{
return (bool)_coro;
}


LIBRF_API void state_event_t::on_cancel() noexcept LIBRF_API void state_event_t::on_cancel() noexcept
{ {






LIBRF_API void state_event_all_t::on_cancel() noexcept
LIBRF_API void state_event_all_t::on_cancel(intptr_t idx)
{ {
intptr_t oldValue = _counter.load(std::memory_order_acquire);
if (oldValue >= 0 && _counter.compare_exchange_strong(oldValue, -1, std::memory_order_acq_rel))
scoped_lock<event_v2_impl::lock_type> lock_(_lock);
if (_counter <= 0) return ;
assert(idx < static_cast<intptr_t>(_values.size()));

_values[idx] = sub_state_t{ nullptr, nullptr };
if (--_counter == 0)
{ {
*_value = false;
*_result = false;
_thandler.stop(); _thandler.stop();


this->_coro = nullptr;
assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);
} }
} }


LIBRF_API bool state_event_all_t::on_notify(event_v2_impl*)
LIBRF_API bool state_event_all_t::on_notify(event_v2_impl*, intptr_t idx)
{ {
intptr_t oldValue = _counter.load(std::memory_order_acquire);
if (oldValue <= 0) return false;
scoped_lock<event_v2_impl::lock_type> lock_(_lock);

if (_counter <= 0) return false;
assert(idx < static_cast<intptr_t>(_values.size()));


oldValue = _counter.fetch_add(-1, std::memory_order_acq_rel);
if (oldValue == 1)
_values[idx].first = nullptr;

if (--_counter == 0)
{ {
*_value = true;
bool result = true;
for (sub_state_t& sub : _values)
{
if (sub.second == nullptr)
{
result = false;
break;
}
}

*_result = result;
_thandler.stop(); _thandler.stop();


assert(this->_scheduler != nullptr); assert(this->_scheduler != nullptr);
if (this->_coro) if (this->_coro)
this->_scheduler->add_generator(this); this->_scheduler->add_generator(this);

return true;
} }


return oldValue >= 1;
return true;
} }


LIBRF_API bool state_event_all_t::on_timeout() LIBRF_API bool state_event_all_t::on_timeout()
{ {
intptr_t oldValue = _counter.load(std::memory_order_acquire);
if (oldValue >= 0 && _counter.compare_exchange_strong(oldValue, -1, std::memory_order_acq_rel))
{
*_value = false;
_thandler.reset();
scoped_lock<event_v2_impl::lock_type> lock_(_lock);


assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);
if (_counter <= 0) return false;


return true;
_counter = 0;
*_result = false;
_thandler.reset();

for (sub_state_t& sub : _values)
{
if (sub.first != nullptr)
{
event_v2_impl* evt = sub.second;
sub.second = nullptr;

if (evt != nullptr)
evt->remove_wait_list(sub.first);
}
} }
return false;

assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);

return true;
} }




{ {
scoped_lock<lock_type> lock_(_lock); scoped_lock<lock_type> lock_(_lock);


_counter.store(0, std::memory_order_release);

state_event_ptr state; state_event_ptr state;
for (; (state = try_pop_list(_wait_awakes)) != nullptr;) for (; (state = try_pop_list(_wait_awakes)) != nullptr;)
{ {
_counter.fetch_add(1, std::memory_order_acq_rel); _counter.fetch_add(1, std::memory_order_acq_rel);
} }


LIBRF_API void event_v2_impl::reset() noexcept
{
_counter.store(0, std::memory_order_release);
}

LIBRF_API void event_v2_impl::add_wait_list(state_event_base_t* state) LIBRF_API void event_v2_impl::add_wait_list(state_event_base_t* state)
{ {
assert(state != nullptr); assert(state != nullptr);

+ 17
- 1
source/state.cpp View File

{ {
delete this; delete this;
} }

LIBRF_API void state_base_t::resume()
{
if (likely(_coro))
{
coroutine_handle<> handler = _coro;
_coro = nullptr;
_scheduler->del_final(this);
handler.resume();
}
}

LIBRF_API bool state_base_t::has_handler() const noexcept
{
return (bool)_coro;
}

LIBRF_API state_base_t* state_base_t::get_parent() const noexcept LIBRF_API state_base_t* state_base_t::get_parent() const noexcept
{ {
return nullptr; return nullptr;

+ 2
- 2
tutorial/test_async_event.cpp View File

test_wait_three(); test_wait_three();
std::cout << std::endl; std::cout << std::endl;
test_wait_any();
std::cout << std::endl;
//test_wait_any();
//std::cout << std::endl;
test_wait_all(); test_wait_all();
std::cout << std::endl; std::cout << std::endl;

+ 45
- 21
tutorial/test_memory_leak.cpp View File

using namespace librf; using namespace librf;
using namespace std::chrono; using namespace std::chrono;


void test_memory_leak()
void test_memory_leak_event_wait_for()
{ {
go[]()->future_t<void>
{
event_t e;
for (;;)
{
bool val = co_await e.wait_for(1ms);
assert(val == false);
co_await yield();
}
};

for (;;)
{
auto have = this_scheduler()->run_one_batch();
if (!have) {
std::this_thread::sleep_for(1ms);
}
}
go[]()->future_t<void>
{
event_t e;
for (;;)
{
bool val = co_await e.wait_for(1ms);
assert(val == false);
co_await yield();
}
};

for (;;)
{
auto have = this_scheduler()->run_one_batch();
if (!have) {
std::this_thread::sleep_for(1ms);
}
}
}

void test_memory_leak_event_wait_all_for()
{
go[]()->future_t<void>
{
event_t e[4];
for (;;)
{
bool val = co_await event_t::wait_all_for(1ms, e);
assert(val == false);
co_await yield();
}
};

for (;;)
{
auto have = this_scheduler()->run_one_batch();
if (!have) {
std::this_thread::sleep_for(1ms);
}
}
} }


#if LIBRF_TUTORIAL_STAND_ALONE #if LIBRF_TUTORIAL_STAND_ALONE
int main() int main()
{ {
test_memory_leak();
return 0;
//test_memory_leak_event_wait_for();
test_memory_leak_event_wait_all_for();

return 0;
} }
#endif #endif

Loading…
Cancel
Save