From 6fef43340a8c993e05567d1b9e5b69c41e7b30cc Mon Sep 17 00:00:00 2001 From: tearshark Date: Sun, 22 Mar 2020 14:19:10 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=A1=E6=9F=A5mutex=5Fv2=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- librf/src/channel_v2.inl | 1 + librf/src/event_v2.inl | 18 +++----- librf/src/mutex_v2.cpp | 80 +++++++++-------------------------- librf/src/mutex_v2.inl | 73 +++++++++----------------------- librf/src/sleep.cpp | 2 +- librf/src/sleep.h | 12 +++--- librf/src/when_v2.h | 2 + tutorial/test_async_mutex.cpp | 4 +- vs_proj/librf.cpp | 4 +- vs_proj/librf.vcxproj | 5 +-- 10 files changed, 61 insertions(+), 140 deletions(-) diff --git a/librf/src/channel_v2.inl b/librf/src/channel_v2.inl index 790bc15..3e50040 100644 --- a/librf/src/channel_v2.inl +++ b/librf/src/channel_v2.inl @@ -358,6 +358,7 @@ inline namespace channel_v2 read_awaiter(channel_type* ch) noexcept : _channel(ch) + , _value() {} ~read_awaiter() diff --git a/librf/src/event_v2.inl b/librf/src/event_v2.inl index 194ddaa..0533b79 100644 --- a/librf/src/event_v2.inl +++ b/librf/src/event_v2.inl @@ -219,11 +219,9 @@ RESUMEF_NS clock_type::time_point _tp; }; - struct [[nodiscard]] event_t::timeout_awaiter : timeout_awaitor_impl + struct [[nodiscard]] event_t::timeout_awaiter : timeout_awaitor_impl { - timeout_awaiter(clock_type::time_point tp, detail::event_v2_impl* evt) noexcept - : timeout_awaitor_impl(tp, evt) - {} + using timeout_awaitor_impl::timeout_awaitor_impl; }; template @@ -331,11 +329,9 @@ RESUMEF_NS template - struct [[nodiscard]] event_t::timeout_any_awaiter : timeout_awaitor_impl> + struct [[nodiscard]] event_t::timeout_any_awaiter : timeout_awaitor_impl> { - timeout_any_awaiter(clock_type::time_point tp, _Iter begin, _Iter end) noexcept - : timeout_awaitor_impl>(tp, begin, end) - {} + using timeout_awaitor_impl>::timeout_awaitor_impl; }; template @@ -445,11 +441,9 @@ RESUMEF_NS template - struct [[nodiscard]] event_t::timeout_all_awaiter : timeout_awaitor_impl> + struct [[nodiscard]] event_t::timeout_all_awaiter : timeout_awaitor_impl> { - timeout_all_awaiter(clock_type::time_point tp, _Iter begin, _Iter end) noexcept - : timeout_awaitor_impl>(tp, begin, end) - {} + using timeout_awaitor_impl>::timeout_awaitor_impl; }; template diff --git a/librf/src/mutex_v2.cpp b/librf/src/mutex_v2.cpp index b281cf3..6377e06 100644 --- a/librf/src/mutex_v2.cpp +++ b/librf/src/mutex_v2.cpp @@ -4,7 +4,7 @@ RESUMEF_NS { namespace detail { - void state_mutex_base_t::resume() + void state_mutex_t::resume() { coroutine_handle<> handler = _coro; if (handler) @@ -15,12 +15,12 @@ RESUMEF_NS } } - bool state_mutex_base_t::has_handler() const noexcept + bool state_mutex_t::has_handler() const noexcept { return (bool)_coro; } - state_base_t* state_mutex_base_t::get_parent() const noexcept + state_base_t* state_mutex_t::get_parent() const noexcept { return _root; } @@ -74,59 +74,18 @@ RESUMEF_NS return false; } - - void state_mutex_all_t::on_cancel() noexcept + void state_mutex_t::add_timeout_timer(std::chrono::system_clock::time_point tp) { - intptr_t oldValue = _counter.load(std::memory_order_acquire); - if (oldValue >= 0 && _counter.compare_exchange_strong(oldValue, -1, std::memory_order_acq_rel)) + this->_thandler = this->_scheduler->timer()->add_handler(tp, + [st = counted_ptr{ this }](bool canceld) { - *_value = false; - _thandler.stop(); - - this->_coro = nullptr; - } - } - - bool state_mutex_all_t::on_notify(event_v2_impl*) - { - intptr_t oldValue = _counter.load(std::memory_order_acquire); - if (oldValue <= 0) return false; - - oldValue = _counter.fetch_add(-1, std::memory_order_acq_rel); - if (oldValue == 1) - { - *_value = true; - _thandler.stop(); - - assert(this->_scheduler != nullptr); - if (this->_coro) - this->_scheduler->add_generator(this); - - return true; - } - - return oldValue >= 1; - } - - bool state_mutex_all_t::on_timeout() - { - intptr_t oldValue = _counter.load(std::memory_order_acquire); - if (oldValue >= 0 && _counter.compare_exchange_strong(oldValue, -1, std::memory_order_acq_rel)) - { - *_value = false; - _thandler.reset(); - - assert(this->_scheduler != nullptr); - if (this->_coro) - this->_scheduler->add_generator(this); - - return true; - } - return false; + if (!canceld) + st->on_timeout(); + }); } - + void mutex_v2_impl::lock_until_succeed(void* sch) { assert(sch != nullptr); @@ -164,16 +123,16 @@ RESUMEF_NS { assert(sch != nullptr); - void* oldValue = _owner.load(std::memory_order_acquire); + void* oldValue = _owner.load(std::memory_order_relaxed); if (oldValue == nullptr) { - _owner.store(sch, std::memory_order_release); - _counter.fetch_add(1, std::memory_order_acq_rel); + _owner.store(sch, std::memory_order_relaxed); + _counter.fetch_add(1, std::memory_order_relaxed); return true; } if (oldValue == sch) { - _counter.fetch_add(1, std::memory_order_acq_rel); + _counter.fetch_add(1, std::memory_order_relaxed); return true; } return false; @@ -185,12 +144,13 @@ RESUMEF_NS scoped_lock lock_(_lock); - void* oldValue = _owner.load(std::memory_order_acquire); + void* oldValue = _owner.load(std::memory_order_relaxed); if (oldValue == sch) { - if (_counter.fetch_sub(1, std::memory_order_acquire) == 1) + if (_counter.fetch_sub(1, std::memory_order_relaxed) == 1) { - _owner.store(nullptr, std::memory_order_release); + _owner.store(nullptr, std::memory_order_relaxed); + while (!_wait_awakes.empty()) { state_mutex_ptr state = _wait_awakes.front(); @@ -204,8 +164,8 @@ RESUMEF_NS break; //转移状态失败,恢复成空 - _owner.store(nullptr, std::memory_order_release); - _counter.fetch_sub(1, std::memory_order_acq_rel); + _owner.store(nullptr, std::memory_order_relaxed); + _counter.fetch_sub(1, std::memory_order_relaxed); } } diff --git a/librf/src/mutex_v2.inl b/librf/src/mutex_v2.inl index 3be62de..4eb8bc2 100644 --- a/librf/src/mutex_v2.inl +++ b/librf/src/mutex_v2.inl @@ -4,16 +4,22 @@ RESUMEF_NS { namespace detail { - struct state_mutex_base_t : public state_base_t + struct state_mutex_t : public state_base_t { + state_mutex_t(mutex_v2_impl*& val) + : _value(&val) + {} + virtual void resume() override; virtual bool has_handler() const noexcept override; virtual state_base_t* get_parent() const noexcept override; - virtual void on_cancel() noexcept = 0; - virtual bool on_notify(mutex_v2_impl* eptr) = 0; - virtual bool on_timeout() = 0; - + void on_cancel() noexcept; + bool on_notify(mutex_v2_impl* eptr); + bool on_timeout(); + + void add_timeout_timer(std::chrono::system_clock::time_point tp); + inline scheduler_t* get_scheduler() const noexcept { return _scheduler; @@ -26,62 +32,22 @@ RESUMEF_NS this->_root = root; } - inline void add_timeout_timer(std::chrono::system_clock::time_point tp) - { - this->_thandler = this->_scheduler->timer()->add_handler(tp, - [st = counted_ptr{ this }](bool canceld) - { - if (!canceld) - st->on_timeout(); - }); - } - timer_handler _thandler; protected: state_base_t* _root; - }; - - struct state_mutex_t : public state_mutex_base_t - { - state_mutex_t(mutex_v2_impl*& val) - : _value(&val) - {} - - virtual void on_cancel() noexcept override; - virtual bool on_notify(mutex_v2_impl* eptr) override; - virtual bool on_timeout() override; - public: - timer_handler _thandler; - protected: std::atomic _value; }; - struct state_mutex_all_t : public state_event_base_t - { - state_mutex_all_t(intptr_t count, bool& val) - : _counter(count) - , _value(&val) - {} - - virtual void on_cancel() noexcept override; - virtual bool on_notify(event_v2_impl* eptr) override; - virtual bool on_timeout() override; - public: - timer_handler _thandler; - std::atomic _counter; - protected: - bool* _value; - }; - struct mutex_v2_impl : public std::enable_shared_from_this { using clock_type = std::chrono::system_clock; mutex_v2_impl() {} - inline void* owner() const noexcept + inline void* owner() noexcept { - return _owner.load(std::memory_order_acquire); + scoped_lock lock_(_lock); + return _owner.load(std::memory_order_relaxed); } bool try_lock(void* sch); //内部加锁 @@ -171,7 +137,9 @@ RESUMEF_NS { typedef std::shared_ptr mutex_impl_ptr; - scoped_unlock_t() {} + scoped_unlock_t() + : _owner(nullptr) + {} //此函数,应该在try_lock()获得锁后使用 //或者在协程里,由awaiter使用 @@ -405,12 +373,9 @@ RESUMEF_NS - struct [[nodiscard]] mutex_t::timeout_awaiter : public event_t::timeout_awaitor_impl + struct [[nodiscard]] mutex_t::timeout_awaiter : public event_t::timeout_awaitor_impl { - timeout_awaiter(clock_type::time_point tp, detail::mutex_v2_impl * mtx) noexcept - : event_t::timeout_awaitor_impl(tp, mtx) - {} - + using event_t::timeout_awaitor_impl::timeout_awaitor_impl; bool await_resume() noexcept { detail::mutex_v2_impl* mtx = this->_mutex; diff --git a/librf/src/sleep.cpp b/librf/src/sleep.cpp index ae196e5..953a33f 100644 --- a/librf/src/sleep.cpp +++ b/librf/src/sleep.cpp @@ -2,7 +2,7 @@ RESUMEF_NS { - future_t<> sleep_until_(const std::chrono::system_clock::time_point& tp_, scheduler_t& scheduler_) + future_t<> sleep_until_(std::chrono::system_clock::time_point tp_, scheduler_t& scheduler_) { awaitable_t<> awaitable; diff --git a/librf/src/sleep.h b/librf/src/sleep.h index 73582e3..f094ecb 100644 --- a/librf/src/sleep.h +++ b/librf/src/sleep.h @@ -8,32 +8,32 @@ RESUMEF_NS { struct scheduler_t; - future_t<> sleep_until_(const std::chrono::system_clock::time_point& tp_, scheduler_t& scheduler_); + future_t<> sleep_until_(std::chrono::system_clock::time_point tp_, scheduler_t& scheduler_); - inline future_t<> sleep_for_(const std::chrono::system_clock::duration& dt_, scheduler_t& scheduler_) + inline future_t<> sleep_for_(std::chrono::system_clock::duration dt_, scheduler_t& scheduler_) { return sleep_until_(std::chrono::system_clock::now() + dt_, scheduler_); } template - inline future_t<> sleep_for(const std::chrono::duration<_Rep, _Period>& dt_, scheduler_t& scheduler_) + inline future_t<> sleep_for(std::chrono::duration<_Rep, _Period> dt_, scheduler_t& scheduler_) { return sleep_for_(std::chrono::duration_cast(dt_), scheduler_); } template - inline future_t<> sleep_until(const std::chrono::time_point<_Clock, _Duration>& tp_, scheduler_t& scheduler_) + inline future_t<> sleep_until(std::chrono::time_point<_Clock, _Duration> tp_, scheduler_t& scheduler_) { return sleep_until_(std::chrono::time_point_cast(tp_), scheduler_); } template - inline future_t<> sleep_for(const std::chrono::duration<_Rep, _Period>& dt_) + inline future_t<> sleep_for(std::chrono::duration<_Rep, _Period> dt_) { co_await sleep_for_(std::chrono::duration_cast(dt_), *current_scheduler()); } template - inline future_t<> sleep_until(const std::chrono::time_point<_Clock, _Duration>& tp_) + inline future_t<> sleep_until(std::chrono::time_point<_Clock, _Duration> tp_) { co_await sleep_until_(std::chrono::time_point_cast(tp_), *current_scheduler()); } diff --git a/librf/src/when_v2.h b/librf/src/when_v2.h index d0811fc..2280545 100644 --- a/librf/src/when_v2.h +++ b/librf/src/when_v2.h @@ -323,7 +323,9 @@ inline namespace when_v2 auto when_any(scheduler_t& sch, _Awaitable&&... args) -> detail::when_future_t { +#pragma warning(disable : 6326) //warning C6326: Potential comparison of a constant with another constant. detail::when_future_t awaitor{ sizeof...(_Awaitable) > 0 ? 1 : 0 }; +#pragma warning(default : 6326) awaitor._values->first = -1; detail::when_any_one__(sch, awaitor._state.get(), awaitor._values, 0, std::forward<_Awaitable>(args)...); diff --git a/tutorial/test_async_mutex.cpp b/tutorial/test_async_mutex.cpp index bc30fe2..d3a682c 100644 --- a/tutorial/test_async_mutex.cpp +++ b/tutorial/test_async_mutex.cpp @@ -143,7 +143,7 @@ static void resumable_mutex_async() static future_t<> resumable_mutex_range_push(size_t idx, mutex_t a, mutex_t b, mutex_t c) { - for (int i = 0; i < 1000; ++i) + for (int i = 0; i < 100000; ++i) { scoped_unlock_t __lockers = co_await mutex_t::lock(a, b, c); assert(a.is_locked()); @@ -159,7 +159,7 @@ static future_t<> resumable_mutex_range_push(size_t idx, mutex_t a, mutex_t b, m static future_t<> resumable_mutex_range_pop(size_t idx, mutex_t a, mutex_t b, mutex_t c) { - for (int i = 0; i < 1000; ++i) + for (int i = 0; i < 100000; ++i) { scoped_unlock_t __lockers = co_await mutex_t::lock(a, b, c); assert(a.is_locked()); diff --git a/vs_proj/librf.cpp b/vs_proj/librf.cpp index 4e98181..27d709b 100644 --- a/vs_proj/librf.cpp +++ b/vs_proj/librf.cpp @@ -43,8 +43,8 @@ int main(int argc, const char* argv[]) //test_ring_queue>(); //test_ring_queue>(); - resumable_main_mutex(); - return 0; + //resumable_main_mutex(); + //return 0; //if (argc > 1) // resumable_main_benchmark_asio_client(atoi(argv[1])); diff --git a/vs_proj/librf.vcxproj b/vs_proj/librf.vcxproj index cd918c8..a877bb1 100644 --- a/vs_proj/librf.vcxproj +++ b/vs_proj/librf.vcxproj @@ -46,7 +46,7 @@ Application false - v142 + ClangCL true NotSet @@ -77,7 +77,6 @@ NativeRecommendedRules.ruleset - true @@ -158,7 +157,7 @@ true - false + true AnySuitable true true