tearshark пре 2 година
родитељ
комит
cfcd612ecd

+ 1
- 3
include/librf/src/channel_v2.inl Прегледај датотеку

{ {
template<class _Ty, class _Chty> template<class _Ty, class _Chty>
struct state_channel_t : public state_base_t struct state_channel_t : public state_base_t
, public intrusive_link_node<state_channel_t<_Ty, _Chty>>
{ {
using value_type = _Ty; using value_type = _Ty;


} }


friend _Chty; friend _Chty;
public:
//为浸入式单向链表提供的next指针
state_channel_t* _next = nullptr;
protected: protected:
//co_await产生的临时awaitor会引用state,管理state的生命周期 //co_await产生的临时awaitor会引用state,管理state的生命周期
//state再引用channel //state再引用channel

+ 4
- 0
include/librf/src/counted_ptr.h Прегледај датотеку

{ {
return _p; return _p;
} }
operator T* () const noexcept
{
return _p;
}


/** /**
* @brief 获得管理的state指针。 * @brief 获得管理的state指针。

+ 44
- 7
include/librf/src/event_v2.h Прегледај датотеку

{ {
struct event_v2_impl; struct event_v2_impl;
} }

#endif //DOXYGEN_SKIP_PROPERTY #endif //DOXYGEN_SKIP_PROPERTY


/** /**
* @attention 只能在协程中调用。 * @attention 只能在协程中调用。
*/ */
template<class _Rep, class _Period> template<class _Rep, class _Period>
timeout_awaiter wait_for(const std::chrono::duration<_Rep, _Period>& dt) const noexcept;
timeout_awaiter wait_for(const std::chrono::duration<_Rep, _Period>& dt) const noexcept; //test OK


/** /**
* @brief 在协程中等待信号触发,直到超时。 * @brief 在协程中等待信号触发,直到超时。
* @attention 只能在协程中调用。 * @attention 只能在协程中调用。
*/ */
template<class _Clock, class _Duration> template<class _Clock, class _Duration>
timeout_awaiter wait_until(const std::chrono::time_point<_Clock, _Duration>& tp) const noexcept;
timeout_awaiter wait_until(const std::chrono::time_point<_Clock, _Duration>& tp) const noexcept; //test OK




template<class _Iter> template<class _Iter>
struct [[nodiscard]] any_awaiter; struct [[nodiscard]] any_awaiter;


/**
* @brief 在协程中等待任意一个信号触发。
* @details 如果已经有信号触发,则立即返回第一个触发信号的索引。\n
* 否则,当前协程被阻塞,直到信号被触发后唤醒。
* 至少消耗一次信号触发次数。
* @param begin_ 容纳信号的容器的首迭代器
* @param end_ 容纳信号的容器的尾迭代器(不包含有效信号)
* @retval intptr_t [co_await] 返回第一个触发信号的索引
* @attention 只能在协程中调用。
*/
template<class _Iter> template<class _Iter>
requires(_IteratorOfT<_Iter, event_t>) requires(_IteratorOfT<_Iter, event_t>)
static auto wait_any(_Iter begin_, _Iter end_)
->any_awaiter<_Iter>;
static auto wait_any(_Iter begin_, _Iter end_)->any_awaiter<_Iter>;


/**
* @brief 在协程中等待任意一个信号触发。
* @details 如果已经有信号触发,则立即返回第一个触发信号的索引。\n
* 否则,当前协程被阻塞,直到信号被触发后唤醒。
* 至少消耗一次信号触发次数。
* @param cnt_ 容纳信号的容器,需要支持std::begin(cnt_)和std::end(cnt_)
* @retval intptr_t [co_await] 返回第一个触发信号的索引
* @attention 只能在协程中调用。
*/
template<class _Cont> template<class _Cont>
requires(_ContainerOfT<_Cont, event_t>) requires(_ContainerOfT<_Cont, event_t>)
static auto wait_any(const _Cont& cnt_)
->any_awaiter<decltype(std::begin(cnt_))>;
static auto wait_any(const _Cont& cnt_)->any_awaiter<decltype(std::begin(cnt_))>;


template<class _Iter> template<class _Iter>
struct [[nodiscard]] timeout_any_awaiter; struct [[nodiscard]] timeout_any_awaiter;


/**
* @brief 在协程中等待任意一个信号触发,直到超时。
* @details 如果已经有信号触发,则立即返回第一个触发信号的索引。\n
* 否则,当前协程被阻塞,直到信号被触发后,或者超时后唤醒。
* 如果等到了信号,则至少消耗一次信号触发次数。
* @param dt 超时时长
* @param begin_ 容纳信号的容器的首迭代器
* @param end_ 容纳信号的容器的尾迭代器(不包含有效信号)
* @retval intptr_t [co_await] 如果等到了任意一个信号,返回其索引(相对于begin_的距离);否则,返回-1
* @attention 只能在协程中调用。
*/
template<class _Rep, class _Period, class _Iter> template<class _Rep, class _Period, class _Iter>
requires(_IteratorOfT<_Iter, event_t>) requires(_IteratorOfT<_Iter, event_t>)
static auto wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_) static auto wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_)
->timeout_any_awaiter<_Iter>; ->timeout_any_awaiter<_Iter>;


/**
* @brief 在协程中等待任意一个信号触发,直到超时。
* @details 如果已经有信号触发,则立即返回第一个触发信号的索引。\n
* 否则,当前协程被阻塞,直到信号被触发后,或者超时后唤醒。
* 如果等到了信号,则至少消耗一次信号触发次数。
* @param dt 超时时长
* @param cnt_ 容纳信号的容器,需要支持std::begin(cnt_)和std::end(cnt_)
* @retval intptr_t [co_await] 如果等到了任意一个信号,返回其索引(相对于begin_的距离);否则,返回-1
* @attention 只能在协程中调用。
*/
template<class _Rep, class _Period, class _Cont> template<class _Rep, class _Period, class _Cont>
requires(_ContainerOfT<_Cont, event_t>) requires(_ContainerOfT<_Cont, event_t>)
static auto wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_) static auto wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_)

+ 101
- 42
include/librf/src/event_v2.inl Прегледај датотеку

{ {
return _counter.load(std::memory_order_acquire) > 0; return _counter.load(std::memory_order_acquire) > 0;
} }
void reset() noexcept
{
_counter.store(0, std::memory_order_release);
}
LIBRF_API void signal_all() noexcept; LIBRF_API void signal_all() noexcept;
LIBRF_API void signal() noexcept; LIBRF_API void signal() noexcept;
LIBRF_API void reset() noexcept;


LIBRF_API void add_wait_list(state_event_base_t* state);
LIBRF_API void remove_wait_list(state_event_base_t* state);
public: public:
static constexpr bool USE_SPINLOCK = true; static constexpr bool USE_SPINLOCK = true;
static constexpr bool USE_LINK_QUEUE = false;


using lock_type = std::conditional_t<USE_SPINLOCK, spinlock, std::recursive_mutex>; using lock_type = std::conditional_t<USE_SPINLOCK, spinlock, std::recursive_mutex>;
using state_event_ptr = counted_ptr<state_event_base_t>; using state_event_ptr = counted_ptr<state_event_base_t>;
using link_state_queue = intrusive_link_queue<state_event_base_t, state_event_ptr>;
using wait_queue_type = std::conditional_t<USE_LINK_QUEUE, link_state_queue, std::list<state_event_ptr>>;
using wait_queue_type = intrusive_link_queue<state_event_base_t, state_event_ptr>;


bool try_wait_one() noexcept bool try_wait_one() noexcept
{ {
return false; return false;
} }


void add_wait_list(state_event_base_t* state) noexcept
{
assert(state != nullptr);
_wait_awakes.push_back(state);
}

lock_type _lock; //保证访问本对象是线程安全的 lock_type _lock; //保证访问本对象是线程安全的
private: private:
std::atomic<intptr_t> _counter; std::atomic<intptr_t> _counter;
}; };


struct state_event_base_t : public state_base_t struct state_event_base_t : public state_base_t
, public intrusive_link_node<state_event_base_t, counted_ptr<state_event_base_t>>
{ {
LIBRF_API virtual void resume() override;
LIBRF_API virtual bool has_handler() const noexcept override;

virtual void on_cancel() noexcept = 0; virtual void on_cancel() noexcept = 0;
virtual bool on_notify(event_v2_impl* eptr) = 0; virtual bool on_notify(event_v2_impl* eptr) = 0;
virtual bool on_timeout() = 0; virtual bool on_timeout() = 0;
{ {
this->_thandler = this->_scheduler->timer()->add_handler(tp, this->_thandler = this->_scheduler->timer()->add_handler(tp,
[st = counted_ptr<state_event_base_t>{ this }](bool canceld) [st = counted_ptr<state_event_base_t>{ this }](bool canceld)
{
if (!canceld)
st->on_timeout();
});
{
if (!canceld)
st->on_timeout();
});
} }


//为侵入式单向链表提供的next指针
//counted_ptr<state_event_base_t> _next = nullptr;
protected:
timer_handler _thandler; timer_handler _thandler;
}; };



struct state_event_t : public state_event_base_t struct state_event_t : public state_event_base_t
{ {
state_event_t(event_v2_impl*& val) state_event_t(event_v2_impl*& val)
std::atomic<event_v2_impl**> _value; std::atomic<event_v2_impl**> _value;
}; };


struct state_event_all_t : public state_event_base_t
struct state_event_all_t : public state_base_t
{ {
using sub_state_t = std::pair<counted_ptr<state_event_base_t>, detail::event_v2_impl*>;

state_event_all_t(intptr_t count, bool& val) state_event_all_t(intptr_t count, bool& val)
: _counter(count) : _counter(count)
, _value(&val)
{}
, _result(&val)
{
_values.resize(count, sub_state_t{nullptr, nullptr});
}


LIBRF_API virtual void on_cancel() noexcept override;
LIBRF_API virtual bool on_notify(event_v2_impl* eptr) override;
LIBRF_API virtual bool on_timeout() override;
LIBRF_API void on_cancel(intptr_t idx);
LIBRF_API bool on_notify(event_v2_impl* eptr, intptr_t idx);
LIBRF_API bool on_timeout();


std::atomic<intptr_t> _counter;
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
scheduler_t* on_await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
_PromiseT& promise = handler.promise();
auto* parent = promise.get_state();
scheduler_t* sch = parent->get_scheduler();

this->_scheduler = sch;
this->_coro = handler;

return sch;
}

inline void add_timeout_timer(std::chrono::system_clock::time_point tp)
{
this->_thandler = this->_scheduler->timer()->add_handler(tp,
[st = counted_ptr<state_event_all_t>{ this }](bool canceld)
{
if (!canceld)
st->on_timeout();
});
}

std::vector<sub_state_t> _values;
intptr_t _counter;
event_v2_impl::lock_type _lock;
protected: protected:
bool* _value;
timer_handler _thandler;
bool* _result;
};

template<class _StateT>
struct state_event_proxy_t : public state_event_base_t
{
state_event_proxy_t(_StateT* sta, intptr_t idx)
: _state(sta)
, _index(idx)
{
assert(sta != nullptr);
assert(idx >= 0);
}

void on_cancel() noexcept override { return _state->on_cancel(_index); }
bool on_notify(event_v2_impl* eptr) override { return _state->on_notify(eptr, _index); }
bool on_timeout() override { assert(false); return false; }
private:
_StateT* _state;
intptr_t _index;
}; };
} }


: _event(evt) : _event(evt)
{ {
} }
~awaiter()
{
if (_event != nullptr && _state != nullptr)
_event->remove_wait_list(_state.get());
}


bool await_ready() noexcept bool await_ready() noexcept
{ {
scoped_lock<detail::event_v2_impl::lock_type> lock_(evt->_lock); scoped_lock<detail::event_v2_impl::lock_type> lock_(evt->_lock);


if (evt->try_wait_one()) if (evt->try_wait_one())
{
return false; return false;
}


_state = new detail::state_event_t(_event); _state = new detail::state_event_t(_event);
_event = nullptr;
(void)_state->on_await_suspend(handler); (void)_state->on_await_suspend(handler);


if constexpr (!std::is_same_v<std::remove_reference_t<_Timeout>, std::nullptr_t>) if constexpr (!std::is_same_v<std::remove_reference_t<_Timeout>, std::nullptr_t>)
{
cb(); cb();
}


evt->add_wait_list(_state.get()); evt->add_wait_list(_state.get());


template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>> template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler) bool await_suspend(coroutine_handle<_PromiseT> handler)
{ {
if (!_Btype::await_suspend2(handler, [this]
{
this->_state->add_timeout_timer(_tp);
}))
if (!_Btype::await_suspend2(handler, [this]{ this->_state->add_timeout_timer(_tp);}))
{
return false; return false;
return true;
}
return true;
} }
protected: protected:
clock_type::time_point _tp; clock_type::time_point _tp;
(void)_state->on_await_suspend(handler); (void)_state->on_await_suspend(handler);


if constexpr (!std::is_same_v<std::remove_reference_t<_Timeout>, std::nullptr_t>) if constexpr (!std::is_same_v<std::remove_reference_t<_Timeout>, std::nullptr_t>)
{
cb(); cb();
}


for (auto iter = _begin; iter != _end; ++iter) for (auto iter = _begin; iter != _end; ++iter)
{ {
requires(_IteratorOfT<_Iter, event_t>) requires(_IteratorOfT<_Iter, event_t>)
auto event_t::wait_any(_Iter begin_, _Iter end_) ->event_t::any_awaiter<_Iter> auto event_t::wait_any(_Iter begin_, _Iter end_) ->event_t::any_awaiter<_Iter>
{ {
assert(false && "Function is flawed!");
return { begin_, end_ }; return { begin_, end_ };
} }


requires(_ContainerOfT<_Cont, event_t>) requires(_ContainerOfT<_Cont, event_t>)
auto event_t::wait_any(const _Cont& cnt_) ->event_t::any_awaiter<decltype(std::begin(cnt_))> auto event_t::wait_any(const _Cont& cnt_) ->event_t::any_awaiter<decltype(std::begin(cnt_))>
{ {
assert(false && "Function is flawed!");
return { std::begin(cnt_), std::end(cnt_) }; return { std::begin(cnt_), std::end(cnt_) };
} }


auto event_t::wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_) auto event_t::wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_)
->event_t::timeout_any_awaiter<_Iter> ->event_t::timeout_any_awaiter<_Iter>
{ {
assert(false && "Function is flawed!");
clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt); clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt);
return { tp, begin_, end_ }; return { tp, begin_, end_ };
} }
auto event_t::wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_) auto event_t::wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_)
->event_t::timeout_any_awaiter<decltype(std::begin(cnt_))> ->event_t::timeout_any_awaiter<decltype(std::begin(cnt_))>
{ {
assert(false && "Function is flawed!");
clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt); clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt);
return { tp, std::begin(cnt_), std::end(cnt_) }; return { tp, std::begin(cnt_), std::end(cnt_) };
} }
bool await_suspend2(coroutine_handle<_PromiseT> handler, const _Timeout& cb) bool await_suspend2(coroutine_handle<_PromiseT> handler, const _Timeout& cb)
{ {
(void)cb; (void)cb;
intptr_t count = std::distance(_begin, _end);
const intptr_t count = std::distance(_begin, _end);


using ref_lock_type = std::reference_wrapper<detail::event_v2_impl::lock_type>; using ref_lock_type = std::reference_wrapper<detail::event_v2_impl::lock_type>;
std::vector<ref_lock_type> lockes; std::vector<ref_lock_type> lockes;
lockes.reserve(count);
lockes.reserve(count + 1);


for (auto iter = _begin; iter != _end; ++iter) for (auto iter = _begin; iter != _end; ++iter)
{ {
} }


_state = new detail::state_event_all_t(count, _value); _state = new detail::state_event_all_t(count, _value);
lockes.push_back(_state->_lock);
(void)_state->on_await_suspend(handler); (void)_state->on_await_suspend(handler);


if constexpr (!std::is_same_v<std::remove_reference_t<_Timeout>, std::nullptr_t>) if constexpr (!std::is_same_v<std::remove_reference_t<_Timeout>, std::nullptr_t>)
{
cb(); cb();
}


batch_lock_t<ref_lock_type> lock_(lockes); batch_lock_t<ref_lock_type> lock_(lockes);


for (auto iter = _begin; iter != _end; ++iter)
intptr_t idx = 0;
for (auto iter = _begin; iter != _end; ++iter, ++idx)
{ {
detail::event_v2_impl* evt = (*iter)._event.get(); detail::event_v2_impl* evt = (*iter)._event.get();
if (evt->try_wait_one()) if (evt->try_wait_one())
{ {
_state->_counter.fetch_sub(1, std::memory_order_acq_rel);
--_state->_counter;
_state->_values[idx].second = evt;
} }
else else
{ {
evt->add_wait_list(_state.get());
auto* proxy = new detail::state_event_proxy_t<detail::state_event_all_t>(_state.get(), idx);
_state->_values[idx] = detail::state_event_all_t::sub_state_t{ proxy, evt };

evt->add_wait_list(proxy);
} }
} }


if (_state->_counter.load(std::memory_order_relaxed) == 0)
if (_state->_counter == 0)
{ {
_state = nullptr; _state = nullptr;
_value = true; _value = true;

+ 113
- 54
include/librf/src/intrusive_link_queue.h Прегледај датотеку



namespace librf namespace librf
{ {
template<class _Node, class _Nodeptr = _Node*, class _Sty = uint32_t>
template<class _Node, class _Nextptr = _Node*>
struct intrusive_link_node
{
private:
_Node* _prev;
_Nextptr _next;

template<class _Node2, class _Nodeptr2>
friend struct intrusive_link_queue;
};

#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE 1

template<class _Node, class _Nodeptr = _Node*>
struct intrusive_link_queue struct intrusive_link_queue
{ {
using node_type = _Node; using node_type = _Node;
using node_ptr_type = _Nodeptr; using node_ptr_type = _Nodeptr;
using size_type = _Sty;
public: public:
intrusive_link_queue(); intrusive_link_queue();


intrusive_link_queue& operator =(const intrusive_link_queue&) = delete; intrusive_link_queue& operator =(const intrusive_link_queue&) = delete;
intrusive_link_queue& operator =(intrusive_link_queue&&) = default; intrusive_link_queue& operator =(intrusive_link_queue&&) = default;


auto size() const noexcept->size_type;
std::size_t size() const noexcept;
bool empty() const noexcept; bool empty() const noexcept;
void push_back(node_ptr_type node) noexcept; void push_back(node_ptr_type node) noexcept;
void push_front(node_ptr_type node) noexcept;
void erase(node_ptr_type node) noexcept;
auto try_pop() noexcept->node_ptr_type; auto try_pop() noexcept->node_ptr_type;
private: private:
node_ptr_type _head;
node_ptr_type _tail;

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
std::atomic<size_type> m_count;
#endif
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
std::atomic<std::size_t> _count;
#endif
node_ptr_type _header;
node_ptr_type _tailer;
}; };


template<class _Node, class _Nodeptr, class _Sty>
intrusive_link_queue<_Node, _Nodeptr, _Sty>::intrusive_link_queue()
: _head(nullptr)
, _tail(nullptr)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
, m_count(0)
#endif
template<class _Node, class _Nodeptr>
intrusive_link_queue<_Node, _Nodeptr>::intrusive_link_queue()
: _header(nullptr)
, _tailer(nullptr)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
, _count(0)
#endif
{ {
} }


template<class _Node, class _Nodeptr, class _Sty>
auto intrusive_link_queue<_Node, _Nodeptr, _Sty>::size() const noexcept->size_type
template<class _Node, class _Nodeptr>
std::size_t intrusive_link_queue<_Node, _Nodeptr>::size() const noexcept
{ {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire);
#else
size_type count = 0;
for (node_type* node = _head; node != nullptr; node = node->next)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return _count.load(std::memory_order_acquire);
#else
std::size_t count = 0;
for (node_type* node = _header; node != nullptr; node = node->next)
++count; ++count;
return count; return count;
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
} }


template<class _Node, class _Nodeptr, class _Sty>
bool intrusive_link_queue<_Node, _Nodeptr, _Sty>::empty() const noexcept
template<class _Node, class _Nodeptr>
bool intrusive_link_queue<_Node, _Nodeptr>::empty() const noexcept
{ {
return _head == nullptr;
return _header == nullptr;
} }


template<class _Node, class _Nodeptr, class _Sty>
void intrusive_link_queue<_Node, _Nodeptr, _Sty>::push_back(node_ptr_type node) noexcept
template<class _Node, class _Nodeptr>
void intrusive_link_queue<_Node, _Nodeptr>::push_back(node_ptr_type e) noexcept
{ {
assert(node != nullptr);
assert(e != nullptr);


node->_next = nullptr;
if (_head == nullptr)
{
_head = _tail = node;
}
else
{
_tail->_next = node;
_tail = node;
}
e->_prev = _tailer;
e->_next = nullptr;


#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_add(1, std::memory_order_acq_rel);
#endif
if (!_header)
_header = e;

if (_tailer)
_tailer->_next = e;
_tailer = e;

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
_count.fetch_add(1, std::memory_order_acq_rel);
#endif
} }


template<class _Node, class _Nodeptr, class _Sty>
auto intrusive_link_queue<_Node, _Nodeptr, _Sty>::try_pop() noexcept->node_ptr_type
template<class _Node, class _Nodeptr>
void intrusive_link_queue<_Node, _Nodeptr>::push_front(node_ptr_type e) noexcept
{ {
if (_head == nullptr)
assert(e != nullptr);

e->_prev = nullptr;
e->_next = _header;

if (_header)
_header->_prev = e;
_header = e;

if (!_tailer)
_tailer = e;

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
_count.fetch_add(1, std::memory_order_acq_rel);
#endif
}

template<class _Node, class _Nodeptr>
void intrusive_link_queue<_Node, _Nodeptr>::erase(node_ptr_type e) noexcept
{
assert(e != nullptr);

if (_header == e)
_header = e->_next;
if (_tailer == e)
_tailer = e->_prev;

if (e->_next)
e->_next->_prev = e->_prev;
if (e->_prev)
e->_prev->_next = e->_next;

e->_prev = nullptr;
e->_next = nullptr;

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
_count.fetch_sub(1, std::memory_order_acq_rel);
#endif
}

template<class _Node, class _Nodeptr>
auto intrusive_link_queue<_Node, _Nodeptr>::try_pop() noexcept->node_ptr_type
{
if (_header == nullptr)
return nullptr; return nullptr;


node_ptr_type node = _head;
_head = node->_next;
node->_next = nullptr;
node_ptr_type node = _header;
_header = node->_next;
if (_header != nullptr)
_header->_prev = nullptr;


if (_tail == node)
if (_tailer == node)
{ {
assert(node->_next == nullptr); assert(node->_next == nullptr);
_tail = nullptr;
_tailer = nullptr;
} }


#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_sub(1, std::memory_order_acq_rel);
#endif
node->_prev = nullptr;
node->_next = nullptr;

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
_count.fetch_sub(1, std::memory_order_acq_rel);
#endif


return node; return node;
} }

+ 2
- 2
include/librf/src/state.h Прегледај датотеку

private: private:
LIBRF_API virtual void destroy_deallocate(); LIBRF_API virtual void destroy_deallocate();
public: public:
virtual void resume() = 0;
virtual bool has_handler() const noexcept = 0;
LIBRF_API virtual void resume();
LIBRF_API virtual bool has_handler() const noexcept;
LIBRF_API virtual state_base_t* get_parent() const noexcept; LIBRF_API virtual state_base_t* get_parent() const noexcept;


void set_scheduler(scheduler_t* sch) noexcept void set_scheduler(scheduler_t* sch) noexcept

+ 76
- 41
source/event_v2.cpp Прегледај датотеку

{ {
namespace detail namespace detail
{ {
LIBRF_API void state_event_base_t::resume()
{
coroutine_handle<> handler = _coro;
if (handler)
{
_coro = nullptr;
_scheduler->del_final(this);
handler.resume();
}
}

LIBRF_API bool state_event_base_t::has_handler() const noexcept
{
return (bool)_coro;
}


LIBRF_API void state_event_t::on_cancel() noexcept LIBRF_API void state_event_t::on_cancel() noexcept
{ {
event_v2_impl** oldValue = _value.load(std::memory_order_acquire); event_v2_impl** oldValue = _value.load(std::memory_order_acquire);
if (oldValue != nullptr && _value.compare_exchange_strong(oldValue, nullptr, std::memory_order_acq_rel)) if (oldValue != nullptr && _value.compare_exchange_strong(oldValue, nullptr, std::memory_order_acq_rel))
{ {
event_v2_impl* evt = *oldValue;
if (evt != nullptr)
evt->remove_wait_list(this);
*oldValue = nullptr; *oldValue = nullptr;
_thandler.reset(); _thandler.reset();








LIBRF_API void state_event_all_t::on_cancel() noexcept
LIBRF_API void state_event_all_t::on_cancel(intptr_t idx)
{ {
intptr_t oldValue = _counter.load(std::memory_order_acquire);
if (oldValue >= 0 && _counter.compare_exchange_strong(oldValue, -1, std::memory_order_acq_rel))
scoped_lock<event_v2_impl::lock_type> lock_(_lock);
if (_counter <= 0) return ;
assert(idx < static_cast<intptr_t>(_values.size()));

_values[idx] = sub_state_t{ nullptr, nullptr };
if (--_counter == 0)
{ {
*_value = false;
*_result = false;
_thandler.stop(); _thandler.stop();


this->_coro = nullptr;
assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);
} }
} }


LIBRF_API bool state_event_all_t::on_notify(event_v2_impl*)
LIBRF_API bool state_event_all_t::on_notify(event_v2_impl*, intptr_t idx)
{ {
intptr_t oldValue = _counter.load(std::memory_order_acquire);
if (oldValue <= 0) return false;
scoped_lock<event_v2_impl::lock_type> lock_(_lock);

if (_counter <= 0) return false;
assert(idx < static_cast<intptr_t>(_values.size()));

_values[idx].first = nullptr;


oldValue = _counter.fetch_add(-1, std::memory_order_acq_rel);
if (oldValue == 1)
if (--_counter == 0)
{ {
*_value = true;
bool result = true;
for (sub_state_t& sub : _values)
{
if (sub.second == nullptr)
{
result = false;
break;
}
}

*_result = result;
_thandler.stop(); _thandler.stop();


assert(this->_scheduler != nullptr); assert(this->_scheduler != nullptr);
if (this->_coro) if (this->_coro)
this->_scheduler->add_generator(this); this->_scheduler->add_generator(this);

return true;
} }


return oldValue >= 1;
return true;
} }


LIBRF_API bool state_event_all_t::on_timeout() LIBRF_API bool state_event_all_t::on_timeout()
{ {
intptr_t oldValue = _counter.load(std::memory_order_acquire);
if (oldValue >= 0 && _counter.compare_exchange_strong(oldValue, -1, std::memory_order_acq_rel))
{
*_value = false;
_thandler.reset();
scoped_lock<event_v2_impl::lock_type> lock_(_lock);


assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);
if (_counter <= 0) return false;


return true;
_counter = 0;
*_result = false;
_thandler.reset();

for (sub_state_t& sub : _values)
{
if (sub.first != nullptr)
{
event_v2_impl* evt = sub.second;
sub.second = nullptr;

if (evt != nullptr)
evt->remove_wait_list(sub.first);
}
} }
return false;

assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);

return true;
} }




{ {
scoped_lock<lock_type> lock_(_lock); scoped_lock<lock_type> lock_(_lock);


_counter.store(0, std::memory_order_release);

state_event_ptr state; state_event_ptr state;
for (; (state = try_pop_list(_wait_awakes)) != nullptr;) for (; (state = try_pop_list(_wait_awakes)) != nullptr;)
{ {


_counter.fetch_add(1, std::memory_order_acq_rel); _counter.fetch_add(1, std::memory_order_acq_rel);
} }

LIBRF_API void event_v2_impl::reset() noexcept
{
_counter.store(0, std::memory_order_release);
}

LIBRF_API void event_v2_impl::add_wait_list(state_event_base_t* state)
{
assert(state != nullptr);
_wait_awakes.push_back(state);
}

LIBRF_API void event_v2_impl::remove_wait_list(state_event_base_t* state)
{
assert(state != nullptr);

scoped_lock<lock_type> lock_(_lock);
_wait_awakes.erase(state);
}
} }


LIBRF_API event_t::event_t(bool initially) LIBRF_API event_t::event_t(bool initially)

+ 17
- 1
source/state.cpp Прегледај датотеку

{ {
delete this; delete this;
} }

LIBRF_API void state_base_t::resume()
{
if (likely(_coro))
{
coroutine_handle<> handler = _coro;
_coro = nullptr;
_scheduler->del_final(this);
handler.resume();
}
}

LIBRF_API bool state_base_t::has_handler() const noexcept
{
return (bool)_coro;
}

LIBRF_API state_base_t* state_base_t::get_parent() const noexcept LIBRF_API state_base_t* state_base_t::get_parent() const noexcept
{ {
return nullptr; return nullptr;

+ 2
- 2
tutorial/test_async_event.cpp Прегледај датотеку

test_wait_three(); test_wait_three();
std::cout << std::endl; std::cout << std::endl;
test_wait_any();
std::cout << std::endl;
//test_wait_any();
//std::cout << std::endl;
test_wait_all(); test_wait_all();
std::cout << std::endl; std::cout << std::endl;

+ 64
- 0
tutorial/test_memory_leak.cpp Прегледај датотеку


#include <chrono>
#include <iostream>
#include <string>
#include <thread>

#include "librf/librf.h"

using namespace librf;
using namespace std::chrono;

void test_memory_leak_event_wait_for()
{
go[]()->future_t<void>
{
event_t e;
for (;;)
{
bool val = co_await e.wait_for(1ms);
assert(val == false);
co_await yield();
}
};

for (;;)
{
auto have = this_scheduler()->run_one_batch();
if (!have) {
std::this_thread::sleep_for(1ms);
}
}
}

void test_memory_leak_event_wait_all_for()
{
go[]()->future_t<void>
{
event_t e[4];
for (;;)
{
bool val = co_await event_t::wait_all_for(1ms, e);
assert(val == false);
co_await yield();
}
};

for (;;)
{
auto have = this_scheduler()->run_one_batch();
if (!have) {
std::this_thread::sleep_for(1ms);
}
}
}

#if LIBRF_TUTORIAL_STAND_ALONE
int main()
{
//test_memory_leak_event_wait_for();
test_memory_leak_event_wait_all_for();

return 0;
}
#endif

Loading…
Откажи
Сачувај