Browse Source

解决支持timeout的event/mutex可能的线程不安全的问题

tags/v2.9.7
tearshark 4 years ago
parent
commit
2a296c3be0
2 changed files with 44 additions and 10 deletions
  1. 31
    8
      librf/src/event_v2.inl
  2. 13
    2
      librf/src/mutex_v2.inl

+ 31
- 8
librf/src/event_v2.inl View File

return _event->try_wait_one(); return _event->try_wait_one();
} }
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
template<class _PromiseT, class _Timeout, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend2(coroutine_handle<_PromiseT> handler, const _Timeout& cb)
{ {
detail::event_v2_impl* evt = _event; detail::event_v2_impl* evt = _event;
scoped_lock<detail::event_v2_impl::lock_type> lock_(evt->_lock); scoped_lock<detail::event_v2_impl::lock_type> lock_(evt->_lock);
_state = new detail::state_event_t(_event); _state = new detail::state_event_t(_event);
_event = nullptr; _event = nullptr;
(void)_state->on_await_suspend(handler); (void)_state->on_await_suspend(handler);
cb();
evt->add_wait_list(_state.get()); evt->add_wait_list(_state.get());
return true; return true;
} }
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
return await_suspend2(handler, []{});
}
bool await_resume() noexcept bool await_resume() noexcept
{ {
return _event != nullptr; return _event != nullptr;
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>> template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler) bool await_suspend(coroutine_handle<_PromiseT> handler)
{ {
if (!_Btype::await_suspend(handler))
if (!_Btype::await_suspend2(handler, [this]
{
this->_state->add_timeout_timer(_tp);
}))
return false; return false;
this->_state->add_timeout_timer(_tp);
return true; return true;
} }
protected: protected:
return _begin == _end; return _begin == _end;
} }
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
template<class _PromiseT, class _Timeout, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend2(coroutine_handle<_PromiseT> handler, const _Timeout& cb)
{ {
using ref_lock_type = std::reference_wrapper<detail::event_v2_impl::lock_type>; using ref_lock_type = std::reference_wrapper<detail::event_v2_impl::lock_type>;
std::vector<ref_lock_type> lockes; std::vector<ref_lock_type> lockes;
_state = new detail::state_event_t(_event); _state = new detail::state_event_t(_event);
(void)_state->on_await_suspend(handler); (void)_state->on_await_suspend(handler);
cb();
for (auto iter = _begin; iter != _end; ++iter) for (auto iter = _begin; iter != _end; ++iter)
{ {
return true; return true;
} }
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
return await_suspend2(handler, []{});
}
intptr_t await_resume() noexcept intptr_t await_resume() noexcept
{ {
if (_begin == _end) if (_begin == _end)
return _value; return _value;
} }
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
template<class _PromiseT, class _Timeout, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend2(coroutine_handle<_PromiseT> handler, const _Timeout& cb)
{ {
intptr_t count = std::distance(_begin, _end); intptr_t count = std::distance(_begin, _end);
_state = new detail::state_event_all_t(count, _value); _state = new detail::state_event_all_t(count, _value);
(void)_state->on_await_suspend(handler); (void)_state->on_await_suspend(handler);
cb();
scoped_lock_range<ref_lock_type> lock_(lockes); scoped_lock_range<ref_lock_type> lock_(lockes);
return true; return true;
} }
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
return await_suspend2(handler, []{});
}
bool await_resume() noexcept bool await_resume() noexcept
{ {
return _value; return _value;

+ 13
- 2
librf/src/mutex_v2.inl View File

return false; return false;
} }


template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
template<class _PromiseT, class _Timeout, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend2(coroutine_handle<_PromiseT> handler, const _Timeout& cb)
{ {
_PromiseT& promise = handler.promise(); _PromiseT& promise = handler.promise();
auto* parent = promise.get_state(); auto* parent = promise.get_state();


_state = new detail::state_mutex_t(_mutex); _state = new detail::state_mutex_t(_mutex);
_state->on_await_suspend(handler, parent->get_scheduler(), _root); _state->on_await_suspend(handler, parent->get_scheduler(), _root);
cb();


_mutex->add_wait_list_lockless(_state.get()); _mutex->add_wait_list_lockless(_state.get());


{ {
using lock_awaiter::lock_awaiter; using lock_awaiter::lock_awaiter;


template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
return await_suspend2(handler, []{});
}
scoped_unlock_t<mutex_t> await_resume() noexcept scoped_unlock_t<mutex_t> await_resume() noexcept
{ {
mutex_impl_ptr mtx = _mutex ? _mutex->shared_from_this() : nullptr; mutex_impl_ptr mtx = _mutex ? _mutex->shared_from_this() : nullptr;
struct mutex_t::manual_awaiter : public lock_awaiter struct mutex_t::manual_awaiter : public lock_awaiter
{ {
using lock_awaiter::lock_awaiter; using lock_awaiter::lock_awaiter;
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
return await_suspend2(handler, []{});
}
void await_resume() noexcept void await_resume() noexcept
{ {
_mutex = nullptr; _mutex = nullptr;

Loading…
Cancel
Save