@@ -6,6 +6,7 @@ namespace detail | |||
{ | |||
template<class _Ty, class _Chty> | |||
struct state_channel_t : public state_base_t | |||
, public intrusive_link_node<state_channel_t<_Ty, _Chty>> | |||
{ | |||
using value_type = _Ty; | |||
@@ -55,9 +56,6 @@ namespace detail | |||
} | |||
friend _Chty; | |||
public: | |||
//为浸入式单向链表提供的next指针 | |||
state_channel_t* _next = nullptr; | |||
protected: | |||
//co_await产生的临时awaitor会引用state,管理state的生命周期 | |||
//state再引用channel |
@@ -82,6 +82,10 @@ namespace librf | |||
{ | |||
return _p; | |||
} | |||
operator T* () const noexcept | |||
{ | |||
return _p; | |||
} | |||
/** | |||
* @brief 获得管理的state指针。 |
@@ -7,7 +7,6 @@ namespace librf | |||
{ | |||
struct event_v2_impl; | |||
} | |||
#endif //DOXYGEN_SKIP_PROPERTY | |||
/** | |||
@@ -90,7 +89,7 @@ namespace librf | |||
* @attention 只能在协程中调用。 | |||
*/ | |||
template<class _Rep, class _Period> | |||
timeout_awaiter wait_for(const std::chrono::duration<_Rep, _Period>& dt) const noexcept; | |||
timeout_awaiter wait_for(const std::chrono::duration<_Rep, _Period>& dt) const noexcept; //test OK | |||
/** | |||
* @brief 在协程中等待信号触发,直到超时。 | |||
@@ -102,7 +101,7 @@ namespace librf | |||
* @attention 只能在协程中调用。 | |||
*/ | |||
template<class _Clock, class _Duration> | |||
timeout_awaiter wait_until(const std::chrono::time_point<_Clock, _Duration>& tp) const noexcept; | |||
timeout_awaiter wait_until(const std::chrono::time_point<_Clock, _Duration>& tp) const noexcept; //test OK | |||
template<class _Iter> | |||
@@ -110,13 +109,11 @@ namespace librf | |||
template<class _Iter> | |||
requires(_IteratorOfT<_Iter, event_t>) | |||
static auto wait_any(_Iter begin_, _Iter end_) | |||
->any_awaiter<_Iter>; | |||
static auto wait_any(_Iter begin_, _Iter end_)->any_awaiter<_Iter>; | |||
template<class _Cont> | |||
requires(_ContainerOfT<_Cont, event_t>) | |||
static auto wait_any(const _Cont& cnt_) | |||
->any_awaiter<decltype(std::begin(cnt_))>; | |||
static auto wait_any(const _Cont& cnt_)->any_awaiter<decltype(std::begin(cnt_))>; | |||
template<class _Iter> | |||
struct [[nodiscard]] timeout_any_awaiter; |
@@ -22,14 +22,14 @@ namespace librf | |||
LIBRF_API void signal_all() noexcept; | |||
LIBRF_API void signal() noexcept; | |||
LIBRF_API void add_wait_list(state_event_base_t* state); | |||
LIBRF_API void remove_wait_list(state_event_base_t* state); | |||
public: | |||
static constexpr bool USE_SPINLOCK = true; | |||
static constexpr bool USE_LINK_QUEUE = false; | |||
using lock_type = std::conditional_t<USE_SPINLOCK, spinlock, std::recursive_mutex>; | |||
using state_event_ptr = counted_ptr<state_event_base_t>; | |||
using link_state_queue = intrusive_link_queue<state_event_base_t, state_event_ptr>; | |||
using wait_queue_type = std::conditional_t<USE_LINK_QUEUE, link_state_queue, std::list<state_event_ptr>>; | |||
using wait_queue_type = intrusive_link_queue<state_event_base_t, state_event_ptr>; | |||
bool try_wait_one() noexcept | |||
{ | |||
@@ -42,12 +42,6 @@ namespace librf | |||
return false; | |||
} | |||
void add_wait_list(state_event_base_t* state) noexcept | |||
{ | |||
assert(state != nullptr); | |||
_wait_awakes.push_back(state); | |||
} | |||
lock_type _lock; //保证访问本对象是线程安全的 | |||
private: | |||
std::atomic<intptr_t> _counter; | |||
@@ -61,6 +55,7 @@ namespace librf | |||
}; | |||
struct state_event_base_t : public state_base_t | |||
, public intrusive_link_node<state_event_base_t, counted_ptr<state_event_base_t>> | |||
{ | |||
LIBRF_API virtual void resume() override; | |||
LIBRF_API virtual bool has_handler() const noexcept override; | |||
@@ -86,14 +81,12 @@ namespace librf | |||
{ | |||
this->_thandler = this->_scheduler->timer()->add_handler(tp, | |||
[st = counted_ptr<state_event_base_t>{ this }](bool canceld) | |||
{ | |||
if (!canceld) | |||
st->on_timeout(); | |||
}); | |||
{ | |||
if (!canceld) | |||
st->on_timeout(); | |||
}); | |||
} | |||
//为侵入式单向链表提供的next指针 | |||
//counted_ptr<state_event_base_t> _next = nullptr; | |||
timer_handler _thandler; | |||
}; | |||
@@ -152,6 +145,11 @@ namespace librf | |||
: _event(evt) | |||
{ | |||
} | |||
~awaiter() | |||
{ | |||
if (_event != nullptr && _state != nullptr) | |||
_event->remove_wait_list(_state.get()); | |||
} | |||
bool await_ready() noexcept | |||
{ | |||
@@ -167,14 +165,17 @@ namespace librf | |||
scoped_lock<detail::event_v2_impl::lock_type> lock_(evt->_lock); | |||
if (evt->try_wait_one()) | |||
{ | |||
return false; | |||
} | |||
_state = new detail::state_event_t(_event); | |||
_event = nullptr; | |||
(void)_state->on_await_suspend(handler); | |||
if constexpr (!std::is_same_v<std::remove_reference_t<_Timeout>, std::nullptr_t>) | |||
{ | |||
cb(); | |||
} | |||
evt->add_wait_list(_state.get()); | |||
@@ -218,12 +219,11 @@ namespace librf | |||
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>> | |||
bool await_suspend(coroutine_handle<_PromiseT> handler) | |||
{ | |||
if (!_Btype::await_suspend2(handler, [this] | |||
{ | |||
this->_state->add_timeout_timer(_tp); | |||
})) | |||
if (!_Btype::await_suspend2(handler, [this]{ this->_state->add_timeout_timer(_tp);})) | |||
{ | |||
return false; | |||
return true; | |||
} | |||
return true; | |||
} | |||
protected: | |||
clock_type::time_point _tp; |
@@ -2,12 +2,24 @@ | |||
namespace librf | |||
{ | |||
template<class _Node, class _Nodeptr = _Node*, class _Sty = uint32_t> | |||
template<class _Node, class _Nextptr = _Node*> | |||
struct intrusive_link_node | |||
{ | |||
private: | |||
_Node* _prev; | |||
_Nextptr _next; | |||
template<class _Node, class _Nodeptr> | |||
friend struct intrusive_link_queue; | |||
}; | |||
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE 1 | |||
template<class _Node, class _Nodeptr = _Node*> | |||
struct intrusive_link_queue | |||
{ | |||
using node_type = _Node; | |||
using node_ptr_type = _Nodeptr; | |||
using size_type = _Sty; | |||
public: | |||
intrusive_link_queue(); | |||
@@ -16,88 +28,135 @@ namespace librf | |||
intrusive_link_queue& operator =(const intrusive_link_queue&) = delete; | |||
intrusive_link_queue& operator =(intrusive_link_queue&&) = default; | |||
auto size() const noexcept->size_type; | |||
std::size_t size() const noexcept; | |||
bool empty() const noexcept; | |||
void push_back(node_ptr_type node) noexcept; | |||
void push_front(node_ptr_type node) noexcept; | |||
void erase(node_ptr_type node) noexcept; | |||
auto try_pop() noexcept->node_ptr_type; | |||
private: | |||
node_ptr_type _head; | |||
node_ptr_type _tail; | |||
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE | |||
std::atomic<size_type> m_count; | |||
#endif | |||
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE | |||
std::atomic<std::size_t> _count; | |||
#endif | |||
node_ptr_type _header; | |||
node_ptr_type _tailer; | |||
}; | |||
template<class _Node, class _Nodeptr, class _Sty> | |||
intrusive_link_queue<_Node, _Nodeptr, _Sty>::intrusive_link_queue() | |||
: _head(nullptr) | |||
, _tail(nullptr) | |||
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE | |||
, m_count(0) | |||
#endif | |||
template<class _Node, class _Nodeptr> | |||
intrusive_link_queue<_Node, _Nodeptr>::intrusive_link_queue() | |||
: _header(nullptr) | |||
, _tailer(nullptr) | |||
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE | |||
, _count(0) | |||
#endif | |||
{ | |||
} | |||
template<class _Node, class _Nodeptr, class _Sty> | |||
auto intrusive_link_queue<_Node, _Nodeptr, _Sty>::size() const noexcept->size_type | |||
template<class _Node, class _Nodeptr> | |||
std::size_t intrusive_link_queue<_Node, _Nodeptr>::size() const noexcept | |||
{ | |||
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE | |||
return m_count.load(std::memory_order_acquire); | |||
#else | |||
size_type count = 0; | |||
for (node_type* node = _head; node != nullptr; node = node->next) | |||
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE | |||
return _count.load(std::memory_order_acquire); | |||
#else | |||
std::size_t count = 0; | |||
for (node_type* node = _header; node != nullptr; node = node->next) | |||
++count; | |||
return count; | |||
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE | |||
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE | |||
} | |||
template<class _Node, class _Nodeptr, class _Sty> | |||
bool intrusive_link_queue<_Node, _Nodeptr, _Sty>::empty() const noexcept | |||
template<class _Node, class _Nodeptr> | |||
bool intrusive_link_queue<_Node, _Nodeptr>::empty() const noexcept | |||
{ | |||
return _head == nullptr; | |||
return _header == nullptr; | |||
} | |||
template<class _Node, class _Nodeptr, class _Sty> | |||
void intrusive_link_queue<_Node, _Nodeptr, _Sty>::push_back(node_ptr_type node) noexcept | |||
template<class _Node, class _Nodeptr> | |||
void intrusive_link_queue<_Node, _Nodeptr>::push_back(node_ptr_type e) noexcept | |||
{ | |||
assert(node != nullptr); | |||
assert(e != nullptr); | |||
node->_next = nullptr; | |||
if (_head == nullptr) | |||
{ | |||
_head = _tail = node; | |||
} | |||
else | |||
{ | |||
_tail->_next = node; | |||
_tail = node; | |||
} | |||
e->_prev = _tailer; | |||
e->_next = nullptr; | |||
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE | |||
m_count.fetch_add(1, std::memory_order_acq_rel); | |||
#endif | |||
if (!_header) | |||
_header = e; | |||
if (_tailer) | |||
_tailer->_next = e; | |||
_tailer = e; | |||
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE | |||
_count.fetch_add(1, std::memory_order_acq_rel); | |||
#endif | |||
} | |||
template<class _Node, class _Nodeptr, class _Sty> | |||
auto intrusive_link_queue<_Node, _Nodeptr, _Sty>::try_pop() noexcept->node_ptr_type | |||
template<class _Node, class _Nodeptr> | |||
void intrusive_link_queue<_Node, _Nodeptr>::push_front(node_ptr_type e) noexcept | |||
{ | |||
if (_head == nullptr) | |||
assert(e != nullptr); | |||
e->_prev = nullptr; | |||
e->_next = _header; | |||
if (_header) | |||
_header->_prev = e; | |||
_header = e; | |||
if (!_tailer) | |||
_tailer = e; | |||
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE | |||
_count.fetch_add(1, std::memory_order_acq_rel); | |||
#endif | |||
} | |||
template<class _Node, class _Nodeptr> | |||
void intrusive_link_queue<_Node, _Nodeptr>::erase(node_ptr_type e) noexcept | |||
{ | |||
assert(e != nullptr); | |||
if (_header == e) | |||
_header = e->_next; | |||
if (_tailer == e) | |||
_tailer = e->_prev; | |||
if (e->_next) | |||
e->_next->_prev = e->_prev; | |||
if (e->_prev) | |||
e->_prev->_next = e->_next; | |||
e->_prev = nullptr; | |||
e->_next = nullptr; | |||
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE | |||
_count.fetch_sub(1, std::memory_order_acq_rel); | |||
#endif | |||
} | |||
template<class _Node, class _Nodeptr> | |||
auto intrusive_link_queue<_Node, _Nodeptr>::try_pop() noexcept->node_ptr_type | |||
{ | |||
if (_header == nullptr) | |||
return nullptr; | |||
node_ptr_type node = _head; | |||
_head = node->_next; | |||
node->_next = nullptr; | |||
node_ptr_type node = _header; | |||
_header = node->_next; | |||
if (_header != nullptr) | |||
_header->_prev = nullptr; | |||
if (_tail == node) | |||
if (_tailer == node) | |||
{ | |||
assert(node->_next == nullptr); | |||
_tail = nullptr; | |||
_tailer = nullptr; | |||
} | |||
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE | |||
m_count.fetch_sub(1, std::memory_order_acq_rel); | |||
#endif | |||
node->_prev = nullptr; | |||
node->_next = nullptr; | |||
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE | |||
_count.fetch_sub(1, std::memory_order_acq_rel); | |||
#endif | |||
return node; | |||
} |
@@ -54,6 +54,9 @@ namespace librf | |||
event_v2_impl** oldValue = _value.load(std::memory_order_acquire); | |||
if (oldValue != nullptr && _value.compare_exchange_strong(oldValue, nullptr, std::memory_order_acq_rel)) | |||
{ | |||
event_v2_impl* evt = *oldValue; | |||
if (evt != nullptr) | |||
evt->remove_wait_list(this); | |||
*oldValue = nullptr; | |||
_thandler.reset(); | |||
@@ -184,6 +187,20 @@ namespace librf | |||
_counter.fetch_add(1, std::memory_order_acq_rel); | |||
} | |||
LIBRF_API void event_v2_impl::add_wait_list(state_event_base_t* state) | |||
{ | |||
assert(state != nullptr); | |||
_wait_awakes.push_back(state); | |||
} | |||
LIBRF_API void event_v2_impl::remove_wait_list(state_event_base_t* state) | |||
{ | |||
assert(state != nullptr); | |||
scoped_lock<lock_type> lock_(_lock); | |||
_wait_awakes.erase(state); | |||
} | |||
} | |||
LIBRF_API event_t::event_t(bool initially) |
@@ -0,0 +1,40 @@ | |||
#include <chrono> | |||
#include <iostream> | |||
#include <string> | |||
#include <thread> | |||
#include "librf/librf.h" | |||
using namespace librf; | |||
using namespace std::chrono; | |||
void test_memory_leak() | |||
{ | |||
go[]()->future_t<void> | |||
{ | |||
event_t e; | |||
for (;;) | |||
{ | |||
bool val = co_await e.wait_for(1ms); | |||
assert(val == false); | |||
co_await yield(); | |||
} | |||
}; | |||
for (;;) | |||
{ | |||
auto have = this_scheduler()->run_one_batch(); | |||
if (!have) { | |||
std::this_thread::sleep_for(1ms); | |||
} | |||
} | |||
} | |||
#if LIBRF_TUTORIAL_STAND_ALONE | |||
int main() | |||
{ | |||
test_memory_leak(); | |||
return 0; | |||
} | |||
#endif |