Browse Source

event支持超时

tags/v2.9.7
tearshark 4 years ago
parent
commit
1b344cc651

+ 26
- 0
librf/src/counted_ptr.h View File

@@ -87,5 +87,31 @@ RESUMEF_NS
{
return new T{std::forward<Args>(args)...};
}
template <typename T, typename U>
inline bool operator == (const counted_ptr<T>& _Left, const counted_ptr<U>& _Right)
{
return _Left.get() == _Right.get();
}
template <typename T>
inline bool operator == (const counted_ptr<T>& _Left, std::nullptr_t)
{
return _Left.get() == nullptr;
}
template <typename T>
inline bool operator == (std::nullptr_t, const counted_ptr<T>& _Left)
{
return _Left.get() == nullptr;
}
template <typename T>
inline bool operator != (const counted_ptr<T>& _Left, std::nullptr_t)
{
return _Left.get() != nullptr;
}
template <typename T>
inline bool operator != (std::nullptr_t, const counted_ptr<T>& _Left)
{
return _Left.get() != nullptr;
}
}

+ 66
- 15
librf/src/event_v2.cpp View File

@@ -20,19 +20,62 @@ RESUMEF_NS
return (bool)_coro;
}

void state_event_t::on_notify()
void state_event_t::on_cancel() noexcept
{
*_value = true;
scoped_lock<lock_type> lock_(_mtx);

assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);
if (_value != nullptr)
{
*_value = false;
_value = nullptr;
}
this->_coro = nullptr;
}

void state_event_t::on_cancel() noexcept
bool state_event_t::on_notify()
{
*_value = false;
this->_coro = nullptr;
scoped_lock<lock_type> lock_(_mtx);

if (_value != nullptr)
{
*_value = true;
_value = nullptr;

assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);

return true;
}
return false;
}

bool state_event_t::on_timeout()
{
scoped_lock<lock_type> lock_(_mtx);

if (_value != nullptr)
{
*_value = false;
_value = nullptr;

assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);

return true;
}
return false;
}

event_v2_impl::event_v2_impl(bool initially) noexcept
: _counter(initially ? 1 : 0)
{
}

event_v2_impl::~event_v2_impl()
{
for (; _wait_awakes.try_pop() != nullptr;);
}

void event_v2_impl::signal_all() noexcept
@@ -41,10 +84,10 @@ RESUMEF_NS

_counter.store(0, std::memory_order_release);

state_event_t* state;
counted_ptr<state_event_t> state;
for (; (state = _wait_awakes.try_pop()) != nullptr;)
{
state->on_notify();
(void)state->on_notify();
}
}

@@ -52,11 +95,14 @@ RESUMEF_NS
{
scoped_lock<lock_type> lock_(_lock);

state_event_t* state = _wait_awakes.try_pop();
if (state == nullptr)
_counter.fetch_add(1, std::memory_order_acq_rel);
else
state->on_notify();
counted_ptr<state_event_t> state;
for (; (state = _wait_awakes.try_pop()) != nullptr;)
{
if (state->on_notify())
return;
}

_counter.fetch_add(1, std::memory_order_acq_rel);
}
}

@@ -70,5 +116,10 @@ RESUMEF_NS
event_t::~event_t()
{
}

event_t::timeout_awaiter event_t::wait_until_(const clock_type::time_point& tp) const noexcept
{
return { _event, tp };
}
}
}

+ 10
- 0
librf/src/event_v2.h View File

@@ -12,6 +12,7 @@ RESUMEF_NS
struct event_t
{
using event_impl_ptr = std::shared_ptr<detail::event_v2_impl>;
using clock_type = std::chrono::system_clock;

event_t(bool initially = false);
~event_t();
@@ -23,8 +24,17 @@ RESUMEF_NS
struct [[nodiscard]] awaiter;
awaiter operator co_await() const noexcept;
awaiter wait() const noexcept;

struct [[nodiscard]] timeout_awaiter;

template<class _Rep, class _Period>
timeout_awaiter wait_for(const std::chrono::duration<_Rep, _Period>& dt) const noexcept;
template<class _Clock, class _Duration>
timeout_awaiter wait_until(const std::chrono::time_point<_Clock, _Duration>& tp) const noexcept;
private:
event_impl_ptr _event;

timeout_awaiter wait_until_(const clock_type::time_point& tp) const noexcept;
};
}
}

+ 87
- 19
librf/src/event_v2.inl View File

@@ -11,9 +11,8 @@ RESUMEF_NS
//所以,还得用锁结构来实现(等待实现,今日不空)。
struct event_v2_impl : public std::enable_shared_from_this<event_v2_impl>
{
event_v2_impl(bool initially) noexcept
: _counter(initially ? 1 : 0)
{}
event_v2_impl(bool initially) noexcept;
~event_v2_impl();
bool is_signaled() const noexcept
{
@@ -30,7 +29,7 @@ RESUMEF_NS
static constexpr bool USE_SPINLOCK = true;
using lock_type = std::conditional_t<USE_SPINLOCK, spinlock, std::deque<std::recursive_mutex>>;
using wait_queue_type = intrusive_link_queue<state_event_t>;
using wait_queue_type = intrusive_link_queue<state_event_t, counted_ptr<state_event_t>>;
friend struct state_event_t;
@@ -69,12 +68,13 @@ RESUMEF_NS
virtual void resume() override;
virtual bool has_handler() const noexcept override;
void on_notify();
void on_cancel() noexcept;
bool on_notify();
bool on_timeout();
//将自己加入到通知链表里
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void on_await_suspend(coroutine_handle<_PromiseT> handler) noexcept
scheduler_t* on_await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
_PromiseT& promise = handler.promise();
auto* parent_state = promise.get_state();
@@ -82,16 +82,39 @@ RESUMEF_NS
this->_scheduler = sch;
this->_coro = handler;
return sch;
}
public:
typedef spinlock lock_type;
//为浸入式单向链表提供的next指针
state_event_t* _next = nullptr;
counted_ptr<state_event_t> _next = nullptr;
private:
//std::atomic<bool*> _value;
bool* _value;
mutable lock_type _mtx;
};
}
namespace event_v2
{
inline void event_t::signal_all() const noexcept
{
_event->signal_all();
}
inline void event_t::signal() const noexcept
{
_event->signal();
}
inline void event_t::reset() const noexcept
{
_event->reset();
}
struct [[nodiscard]] event_t::awaiter
{
awaiter(event_impl_ptr evt) noexcept
@@ -103,6 +126,7 @@ RESUMEF_NS
{
return (_value = _event->try_wait_one());
}
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
@@ -112,12 +136,13 @@ RESUMEF_NS
return false;
_state = new detail::state_event_t(_value);
_state->on_await_suspend(handler);
(void)_state->on_await_suspend(handler);
_event->add_wait_list(_state.get());
return true;
}
bool await_resume() noexcept
{
return _value;
@@ -128,29 +153,72 @@ RESUMEF_NS
bool _value = false;
};
inline void event_t::signal_all() const noexcept
inline event_t::awaiter event_t::operator co_await() const noexcept
{
_event->signal_all();
return { _event };
}
inline void event_t::signal() const noexcept
inline event_t::awaiter event_t::wait() const noexcept
{
_event->signal();
return { _event };
}
inline void event_t::reset() const noexcept
struct [[nodiscard]] event_t::timeout_awaiter
{
_event->reset();
}
timeout_awaiter(event_impl_ptr evt, clock_type::time_point tp) noexcept
: _event(std::move(evt))
, _tp(tp)
{
}
inline event_t::awaiter event_t::operator co_await() const noexcept
bool await_ready() noexcept
{
return (_value = _event->try_wait_one());
}
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
scoped_lock<detail::event_v2_impl::lock_type> lock_(_event->_lock);
if (_event->try_wait_one())
return false;
_state = new detail::state_event_t(_value);
scheduler_t* sch = _state->on_await_suspend(handler);
_event->add_wait_list(_state.get());
(void)sch->timer()->add(_tp, [_state=_state](bool canceld)
{
if (!canceld)
_state->on_timeout();
});
return true;
}
bool await_resume() noexcept
{
return _value;
}
private:
std::shared_ptr<detail::event_v2_impl> _event;
counted_ptr<detail::state_event_t> _state;
clock_type::time_point _tp;
bool _value = false;
};
template<class _Rep, class _Period>
inline event_t::timeout_awaiter event_t::wait_for(const std::chrono::duration<_Rep, _Period>& dt) const noexcept
{
return { _event };
return wait_until_(clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt));
}
inline event_t::awaiter event_t::wait() const noexcept
template<class _Clock, class _Duration>
inline event_t::timeout_awaiter event_t::wait_until(const std::chrono::time_point<_Clock, _Duration>& tp) const noexcept
{
return { _event };
return wait_until_(std::chrono::time_point_cast<clock_type::duration>(tp));
}
}
}

+ 17
- 16
librf/src/intrusive_link_queue.h View File

@@ -2,10 +2,11 @@

RESUMEF_NS
{
template<class _Node, class _Sty = uint32_t>
template<class _Node, class _Nodeptr = _Node*, class _Sty = uint32_t>
struct intrusive_link_queue
{
using node_type = _Node;
using node_ptr_type = _Nodeptr;
using size_type = _Sty;
public:
intrusive_link_queue();
@@ -17,19 +18,19 @@ RESUMEF_NS

auto size() const noexcept->size_type;
bool empty() const noexcept;
void push_back(node_type* node) noexcept;
auto try_pop() noexcept->node_type*;
void push_back(node_ptr_type node) noexcept;
auto try_pop() noexcept->node_ptr_type;
private:
node_type* _head;
node_type* _tail;
node_ptr_type _head;
node_ptr_type _tail;

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
std::atomic<size_type> m_count;
#endif
};

template<class _Node, class _Sty>
intrusive_link_queue<_Node, _Sty>::intrusive_link_queue()
template<class _Node, class _Nodeptr, class _Sty>
intrusive_link_queue<_Node, _Nodeptr, _Sty>::intrusive_link_queue()
: _head(nullptr)
, _tail(nullptr)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
@@ -38,8 +39,8 @@ RESUMEF_NS
{
}

template<class _Node, class _Sty>
auto intrusive_link_queue<_Node, _Sty>::size() const noexcept->size_type
template<class _Node, class _Nodeptr, class _Sty>
auto intrusive_link_queue<_Node, _Nodeptr, _Sty>::size() const noexcept->size_type
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire);
@@ -51,14 +52,14 @@ RESUMEF_NS
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Node, class _Sty>
bool intrusive_link_queue<_Node, _Sty>::empty() const noexcept
template<class _Node, class _Nodeptr, class _Sty>
bool intrusive_link_queue<_Node, _Nodeptr, _Sty>::empty() const noexcept
{
return _head == nullptr;
}

template<class _Node, class _Sty>
void intrusive_link_queue<_Node, _Sty>::push_back(node_type* node) noexcept
template<class _Node, class _Nodeptr, class _Sty>
void intrusive_link_queue<_Node, _Nodeptr, _Sty>::push_back(node_ptr_type node) noexcept
{
assert(node != nullptr);

@@ -78,13 +79,13 @@ RESUMEF_NS
#endif
}

template<class _Node, class _Sty>
auto intrusive_link_queue<_Node, _Sty>::try_pop() noexcept->node_type*
template<class _Node, class _Nodeptr, class _Sty>
auto intrusive_link_queue<_Node, _Nodeptr, _Sty>::try_pop() noexcept->node_ptr_type
{
if (_head == nullptr)
return nullptr;

node_type* node = _head;
node_ptr_type node = _head;
_head = node->_next;
node->_next = nullptr;


+ 3
- 3
librf/src/spinlock.h View File

@@ -10,8 +10,8 @@ RESUMEF_NS
struct spinlock
{
static const size_t MAX_ACTIVE_SPIN = 1000;
static const size_t MAX_YIELD_SPIN = 4000;
static const size_t MAX_ACTIVE_SPIN = 4000;
static const size_t MAX_YIELD_SPIN = 8000;
static const int FREE_VALUE = 0;
static const int LOCKED_VALUE = 1;
@@ -53,7 +53,7 @@ RESUMEF_NS
else
{
std::this_thread::sleep_for(dt);
dt *= 2;
dt = (std::min)(std::chrono::milliseconds{ 128 }, dt * 2);
}
}

+ 41
- 13
tutorial/test_async_event_v2.cpp View File

@@ -8,6 +8,7 @@
#include "librf.h"

using namespace resumef;
using namespace std::chrono;

//非协程的逻辑线程,或异步代码,可以通过event_t通知到协程,并且不会阻塞协程所在的线程。
static std::thread async_set_event_all(const event_v2::event_t & e, std::chrono::milliseconds dt)
@@ -28,30 +29,33 @@ static std::thread async_set_event_one(event_v2::event_t e, std::chrono::millise
});
}


static future_t<> resumable_wait_event(event_v2::event_t e, int idx)
{
if (co_await e)
std::cout << "[" << idx << "]event signal!" << std::endl;
else
std::cout << "time out!" << std::endl;
std::cout << "[" << idx << "]time out!" << std::endl;
}

static void test_notify_all()
static future_t<> resumable_wait_timeout(event_v2::event_t e, milliseconds dt, int idx)
{
using namespace std::chrono;
if (co_await e.wait_for(dt))
std::cout << "[" << idx << "]event signal!" << std::endl;
else
std::cout << "[" << idx << "]time out!" << std::endl;
}

{
event_v2::event_t evt;
go resumable_wait_event(evt, 0);
go resumable_wait_event(evt, 1);
go resumable_wait_event(evt, 2);
static void test_notify_all()
{
event_v2::event_t evt;
go resumable_wait_event(evt, 0);
go resumable_wait_event(evt, 1);
go resumable_wait_event(evt, 2);

auto tt = async_set_event_all(evt, 100ms);
this_scheduler()->run_until_notask();
auto tt = async_set_event_all(evt, 100ms);
this_scheduler()->run_until_notask();

tt.join();
}
tt.join();
}

//目前还没法测试在多线程调度下,是否线程安全
@@ -76,6 +80,27 @@ static void test_notify_one()
}
}

static void test_wait_all_timeout()
{
using namespace std::chrono;

srand((int)time(nullptr));

event_v2::event_t evts[10];

std::vector<std::thread> vtt;
for(size_t i = 0; i < _countof(evts); ++i)
{
go resumable_wait_timeout(evts[i], 100ms, (int)i);
vtt.emplace_back(async_set_event_one(evts[i], 1ms * (50 + i * 10)));
}

this_scheduler()->run_until_notask();

for (auto& tt : vtt)
tt.join();
}

void resumable_main_event_v2()
{
test_notify_all();
@@ -83,4 +108,7 @@ void resumable_main_event_v2()

test_notify_one();
std::cout << std::endl;

test_wait_all_timeout();
std::cout << std::endl;
}

Loading…
Cancel
Save