Browse Source

完成event_v2的wait_any

tags/v2.9.7
tearshark 4 years ago
parent
commit
ba477e9252

+ 14
- 10
librf/src/event_v2.cpp View File

@@ -22,22 +22,22 @@ RESUMEF_NS

void state_event_t::on_cancel() noexcept
{
bool* oldValue = _value.load(std::memory_order_acquire);
event_v2_impl** oldValue = _value.load(std::memory_order_acquire);
if (oldValue != nullptr && _value.compare_exchange_strong(oldValue, nullptr, std::memory_order_acq_rel))
{
*oldValue = false;
*oldValue = nullptr;
_thandler.stop();

this->_coro = nullptr;
}
}

bool state_event_t::on_notify()
bool state_event_t::on_notify(event_v2_impl* eptr)
{
bool* oldValue = _value.load(std::memory_order_acquire);
event_v2_impl** oldValue = _value.load(std::memory_order_acquire);
if (oldValue != nullptr && _value.compare_exchange_strong(oldValue, nullptr, std::memory_order_acq_rel))
{
*oldValue = true;
*oldValue = eptr;
_thandler.stop();

assert(this->_scheduler != nullptr);
@@ -51,10 +51,10 @@ RESUMEF_NS

bool state_event_t::on_timeout()
{
bool* oldValue = _value.load(std::memory_order_acquire);
event_v2_impl** oldValue = _value.load(std::memory_order_acquire);
if (oldValue != nullptr && _value.compare_exchange_strong(oldValue, nullptr, std::memory_order_acq_rel))
{
*oldValue = false;
*oldValue = nullptr;
_thandler.reset();

assert(this->_scheduler != nullptr);
@@ -112,7 +112,7 @@ RESUMEF_NS
state_event_ptr state;
for (; (state = try_pop_list(_wait_awakes)) != nullptr;)
{
(void)state->on_notify();
(void)state->on_notify(this);
}
}

@@ -123,7 +123,7 @@ RESUMEF_NS
state_event_ptr state;
for (; (state = try_pop_list(_wait_awakes)) != nullptr;)
{
if (state->on_notify())
if (state->on_notify(this))
return;
}

@@ -138,13 +138,17 @@ RESUMEF_NS
{
}

event_t::event_t(std::adopt_lock_t)
{
}

event_t::~event_t()
{
}

event_t::timeout_awaiter event_t::wait_until_(const clock_type::time_point& tp) const noexcept
{
return { _event, tp };
return { _event.get(), tp };
}
}
}

+ 6
- 12
librf/src/event_v2.h View File

@@ -15,6 +15,7 @@ RESUMEF_NS
using clock_type = std::chrono::system_clock;

event_t(bool initially = false);
event_t(std::adopt_lock_t);
~event_t();

void signal_all() const noexcept;
@@ -40,27 +41,18 @@ RESUMEF_NS
//而when_any会导致所有的event_t都被触发
//改日有空再补上

template<class _Iter>
struct [[nodiscard]] any_awaiter;

template<class _Iter
COMMA_RESUMEF_ENABLE_IF(traits::is_iterator_of_v<_Iter, event_t>)
> RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>)
static future_t<intptr_t>
wait_any(_Iter begin_, _Iter end_)
{
when_any_pair idx = co_await when_any(begin_, end_);
co_return idx.first;
}
static auto wait_any(_Iter begin_, _Iter end_)->any_awaiter<_Iter>;

template<class _Cont
COMMA_RESUMEF_ENABLE_IF(traits::is_container_of_v<_Cont, event_t>)
> RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
static future_t<intptr_t>
wait_any(_Cont& cnt_)
{
when_any_pair idx = co_await when_any(std::begin(cnt_), std::end(cnt_));
co_return idx.first;
}
static auto wait_any(_Cont& cnt_)->any_awaiter<decltype(std::begin(cnt_))>;

template<class _Rep, class _Period, class _Iter
COMMA_RESUMEF_ENABLE_IF(traits::is_iterator_of_v<_Iter, event_t>)
@@ -134,6 +126,8 @@ RESUMEF_NS
event_t& operator = (const event_t&) = default;
event_t& operator = (event_t&&) = default;
private:
//friend struct any_awaiter;

event_impl_ptr _event;

timeout_awaiter wait_until_(const clock_type::time_point& tp) const noexcept;

+ 130
- 42
librf/src/event_v2.inl View File

@@ -64,7 +64,7 @@ RESUMEF_NS
struct state_event_t : public state_base_t
{
state_event_t(bool& val)
state_event_t(event_v2_impl*& val)
: _value(&val)
{}
@@ -72,7 +72,7 @@ RESUMEF_NS
virtual bool has_handler() const noexcept override;
void on_cancel() noexcept;
bool on_notify();
bool on_notify(event_v2_impl* eptr);
bool on_timeout();
//将自己加入到通知链表里
@@ -80,8 +80,8 @@ RESUMEF_NS
scheduler_t* on_await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
_PromiseT& promise = handler.promise();
auto* parent_state = promise.get_state();
scheduler_t* sch = parent_state->get_scheduler();
auto* parent = promise.get_state();
scheduler_t* sch = parent->get_scheduler();
this->_scheduler = sch;
this->_coro = handler;
@@ -93,14 +93,14 @@ RESUMEF_NS
typedef spinlock lock_type;
//为浸入式单向链表提供的next指针
counted_ptr<state_event_t> _next = nullptr;
//counted_ptr<state_event_t> _next = nullptr;
timer_handler _thandler;
private:
//_value引用awaitor保存的值,这样可以尽可能减少创建state的可能。而不必进入没有state就没有value实体被用于返回。
//在调用on_notify()或on_timeout()任意之一后,置为nullptr。
//这样来保证要么超时了,要么响应了signal的通知了。
//这个指针在on_notify()和on_timeout()里,当作一个互斥的锁来防止同时进入两个函数
std::atomic<bool*> _value;
std::atomic<event_v2_impl**> _value;
};
}
@@ -123,77 +123,71 @@ RESUMEF_NS
struct [[nodiscard]] event_t::awaiter
{
awaiter(event_impl_ptr evt) noexcept
: _event(std::move(evt))
awaiter(detail::event_v2_impl* evt) noexcept
: _event(evt)
{
}
bool await_ready() noexcept
{
return (_value = _event->try_wait_one());
return _event->try_wait_one();
}
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
scoped_lock<detail::event_v2_impl::lock_type> lock_(_event->_lock);
detail::event_v2_impl* evt = _event;
scoped_lock<detail::event_v2_impl::lock_type> lock_(evt->_lock);
if ((_value = _event->try_wait_one()) != false)
if (evt->try_wait_one())
return false;
_state = new detail::state_event_t(_value);
_state = new detail::state_event_t(_event);
_event = nullptr;
(void)_state->on_await_suspend(handler);
_event->add_wait_list(_state.get());
evt->add_wait_list(_state.get());
return true;
}
bool await_resume() noexcept
{
return _value;
return _event != nullptr;
}
private:
std::shared_ptr<detail::event_v2_impl> _event;
protected:
detail::event_v2_impl* _event;
counted_ptr<detail::state_event_t> _state;
bool _value = false;
};
inline event_t::awaiter event_t::operator co_await() const noexcept
{
return { _event };
return { _event.get() };
}
inline event_t::awaiter event_t::wait() const noexcept
{
return { _event };
return { _event.get() };
}
struct [[nodiscard]] event_t::timeout_awaiter
struct [[nodiscard]] event_t::timeout_awaiter : public event_t::awaiter
{
timeout_awaiter(event_impl_ptr evt, clock_type::time_point tp) noexcept
: _event(std::move(evt))
timeout_awaiter(detail::event_v2_impl* evt, clock_type::time_point tp) noexcept
: awaiter(evt)
, _tp(tp)
{
}
bool await_ready() noexcept
{
return (_value = _event->try_wait_one());
}
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
scoped_lock<detail::event_v2_impl::lock_type> lock_(_event->_lock);
if ((_value = _event->try_wait_one()) != false)
if (!awaiter::await_suspend(handler))
return false;
_state = new detail::state_event_t(_value);
scheduler_t* sch = _state->on_await_suspend(handler);
_event->add_wait_list(_state.get());
_PromiseT& promise = handler.promise();
auto* parent_state = promise.get_state();
scheduler_t* sch = parent_state->get_scheduler();
_state->_thandler = sch->timer()->add_handler(_tp, [_state=_state](bool canceld)
{
@@ -203,16 +197,8 @@ RESUMEF_NS
return true;
}
bool await_resume() noexcept
{
return _value;
}
private:
std::shared_ptr<detail::event_v2_impl> _event;
counted_ptr<detail::state_event_t> _state;
clock_type::time_point _tp;
bool _value = false;
};
template<class _Rep, class _Period>
@@ -226,5 +212,107 @@ RESUMEF_NS
{
return wait_until_(std::chrono::time_point_cast<clock_type::duration>(tp));
}
template<class _Iter>
struct [[nodiscard]] event_t::any_awaiter
{
any_awaiter(_Iter begin, _Iter end) noexcept
: _begin(begin)
, _end(end)
{
}
bool await_ready() noexcept
{
if (_begin == _end)
return true;
for (auto iter = _begin; iter != _end; ++iter)
{
detail::event_v2_impl* evt = (*iter)._event.get();
if (evt->try_wait_one())
{
_event = evt;
return true;
}
}
return false;
}
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
std::vector<detail::event_v2_impl::lock_type> lockes;
lockes.reserve(std::distance(_begin, _end));
for (auto iter = _begin; iter != _end; ++iter)
{
detail::event_v2_impl* evt = (*iter)._event.get();
lockes.push_back(evt->_lock);
}
scoped_lock_range<detail::event_v2_impl::lock_type> lock_(lockes);
for (auto iter = _begin; iter != _end; ++iter)
{
detail::event_v2_impl* evt = (*iter)._event.get();
if (evt->try_wait_one())
{
_event = evt;
return false;
}
}
_state = new detail::state_event_t(_event);
(void)_state->on_await_suspend(handler);
for (auto iter = _begin; iter != _end; ++iter)
{
detail::event_v2_impl* evt = (*iter)._event.get();
evt->add_wait_list(_state.get());
}
return true;
}
intptr_t await_resume() noexcept
{
if (_begin == _end)
return 0;
if (_event == nullptr)
return -1;
intptr_t idx = 0;
for (auto iter = _begin; iter != _end; ++iter, ++idx)
{
detail::event_v2_impl* evt = (*iter)._event.get();
if (evt == _event)
return idx;
}
return -1;
}
private:
detail::event_v2_impl* _event = nullptr;
counted_ptr<detail::state_event_t> _state;
_Iter _begin;
_Iter _end;
};
template<class _Iter COMMA_RESUMEF_ENABLE_IF_TYPENAME()>
RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>)
auto event_t::wait_any(_Iter begin_, _Iter end_) ->event_t::any_awaiter<_Iter>
{
return { begin_, end_ };
}
template<class _Cont COMMA_RESUMEF_ENABLE_IF_TYPENAME()>
RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
auto event_t::wait_any(_Cont& cnt_) ->event_t::any_awaiter<decltype(std::begin(cnt_))>
{
return { std::begin(cnt_), std::end(cnt_) };
}
}
}

+ 1
- 1
librf/src/scheduler.cpp View File

@@ -108,7 +108,7 @@ RESUMEF_NS
sptr->set_scheduler(this);
{
scoped_lock<spinlock, spinlock> __guard(_lock_ready, _lock_running);
scoped_lock<spinlock> __guard(_lock_ready);
_ready_task.emplace(sptr, task);
}

+ 152
- 0
librf/src/spinlock.h View File

@@ -88,4 +88,156 @@ RESUMEF_NS
};
#endif
namespace detail
{
template<class _Ty>
void _Lock_from_locks(const int _Target, std::vector<_Ty>& _LkN) { // lock _LkN[_Target]
_LkN[_Target].lock();
}
// FUNCTION TEMPLATE _Try_lock_from_locks
template<class _Ty>
bool _Try_lock_from_locks(const int _Target, std::vector<_Ty>& _LkN) { // try to lock _LkN[_Target]
return _LkN[_Target].try_lock();
}
// FUNCTION TEMPLATE _Unlock_locks
template<class _Ty>
void _Unlock_locks(int _First, int _Last, std::vector<_Ty>& _LkN) noexcept /* terminates */ {
for (; _First != _Last; ++_First) {
_LkN[_First].unlock();
}
}
// FUNCTION TEMPLATE try_lock
template<class _Ty>
int _Try_lock_range(const int _First, const int _Last, std::vector<_Ty>& _LkN) {
int _Next = _First;
try {
for (; _Next != _Last; ++_Next) {
if (!_Try_lock_from_locks(_Next, _LkN)) { // try_lock failed, backout
_Unlock_locks(_First, _Next, _LkN);
return _Next;
}
}
}
catch (...) {
_Unlock_locks(_First, _Next, _LkN);
throw;
}
return -1;
}
// FUNCTION TEMPLATE lock
template<class _Ty>
int _Lock_attempt(const int _Hard_lock, std::vector<_Ty>& _LkN) {
// attempt to lock 3 or more locks, starting by locking _LkN[_Hard_lock] and trying to lock the rest
_Lock_from_locks(_Hard_lock, _LkN);
int _Failed = -1;
int _Backout_start = _Hard_lock; // that is, unlock _Hard_lock
try {
_Failed = _Try_lock_range(0, _Hard_lock, _LkN);
if (_Failed == -1) {
_Backout_start = 0; // that is, unlock [0, _Hard_lock] if the next throws
_Failed = _Try_lock_range(_Hard_lock + 1, (int)_LkN.size(), _LkN);
if (_Failed == -1) { // we got all the locks
return -1;
}
}
}
catch (...) {
_Unlock_locks(_Backout_start, _Hard_lock + 1, _LkN);
throw;
}
// we didn't get all the locks, backout
_Unlock_locks(_Backout_start, _Hard_lock + 1, _LkN);
std::this_thread::yield();
return _Failed;
}
template<class _Ty>
void _Lock_nonmember3(std::vector<_Ty>& _LkN) {
// lock 3 or more locks, without deadlock
int _Hard_lock = 0;
while (_Hard_lock != -1) {
_Hard_lock = _Lock_attempt(_Hard_lock, _LkN);
}
}
template <class _Lock0, class _Lock1>
bool _Lock_attempt_small(_Lock0& _Lk0, _Lock1& _Lk1)
{
// attempt to lock 2 locks, by first locking _Lk0, and then trying to lock _Lk1 returns whether to try again
_Lk0.lock();
try {
if (_Lk1.try_lock())
return false;
}
catch (...) {
_Lk0.unlock();
throw;
}
_Lk0.unlock();
std::this_thread::yield();
return true;
}
template <class _Lock0, class _Lock1>
void _Lock_nonmember2(_Lock0& _Lk0, _Lock1& _Lk1)
{
// lock 2 locks, without deadlock, special case for better codegen and reduced metaprogramming for common case
while (_Lock_attempt_small(_Lk0, _Lk1) && _Lock_attempt_small(_Lk1, _Lk0)) { // keep trying
}
}
template<class _Ty>
void _Lock_range(std::vector<_Ty>& lockes)
{
if (lockes.size() == 0)
{
}
else if (lockes.size() == 1)
{
lockes[0].lock();
}
else if (lockes.size() == 2)
{
_Lock_nonmember2(lockes[0], lockes[1]);
}
else
{
_Lock_nonmember3(lockes);
}
}
}
template<class _Ty>
class scoped_lock_range { // class with destructor that unlocks mutexes
public:
explicit scoped_lock_range(std::vector<_Ty>& locks_)
: _LkN(locks_)
{
detail::_Lock_range(locks_);
}
explicit scoped_lock_range(std::adopt_lock_t, std::vector<_Ty>& locks_)
: _LkN(locks_)
{ // construct but don't lock
}
~scoped_lock_range() noexcept
{
detail::_Unlock_locks(0, (int)_LkN.size(), _LkN);
}
scoped_lock_range(const scoped_lock_range&) = delete;
scoped_lock_range& operator=(const scoped_lock_range&) = delete;
private:
std::vector<_Ty>& _LkN;
};
}

+ 1
- 1
librf/src/state.cpp View File

@@ -129,7 +129,7 @@ RESUMEF_NS
bool state_future_t::has_handler() const noexcept
{
scoped_lock<lock_type> __guard(_mtx);
scoped_lock<lock_type> __guard(this->_mtx);
return has_handler_skip_lock();
}

+ 4
- 2
librf/src/type_concept.inl View File

@@ -2,9 +2,9 @@
#ifndef RESUMEF_ENABLE_CONCEPT
#ifdef __cpp_lib_concepts
#define RESUMEF_ENABLE_CONCEPT 1
#define RESUMEF_ENABLE_CONCEPT 0
#else
#define RESUMEF_ENABLE_CONCEPT 1
#define RESUMEF_ENABLE_CONCEPT 0
#endif //#ifdef __cpp_lib_concepts
#endif //#ifndef RESUMEF_ENABLE_CONCEPT
@@ -89,6 +89,7 @@ RESUMEF_NS
{ *std::begin(u) } ->std::same_as<E&>;
};
#define COMMA_RESUMEF_ENABLE_IF_TYPENAME()
#define COMMA_RESUMEF_ENABLE_IF(...)
#define RESUMEF_ENABLE_IF(...)
#define RESUMEF_REQUIRES(...) requires __VA_ARGS__
@@ -108,6 +109,7 @@ RESUMEF_NS
#define _ContainerT typename
#define _ContainerOfT typename
#define COMMA_RESUMEF_ENABLE_IF_TYPENAME() ,typename _EnableIf
#define COMMA_RESUMEF_ENABLE_IF(...) ,typename=std::enable_if_t<__VA_ARGS__>
#define RESUMEF_ENABLE_IF(...) typename=std::enable_if_t<__VA_ARGS__>
#define RESUMEF_REQUIRES(...)

+ 2
- 2
tutorial/test_async_event.cpp View File

@@ -150,8 +150,8 @@ static void test_wait_all_timeout()
void resumable_main_event()
{
test_wait_one();
std::cout << std::endl;
//test_wait_one();
//std::cout << std::endl;
test_wait_any();
std::cout << std::endl;

+ 0
- 1
tutorial/test_async_event_v2.cpp View File

@@ -58,7 +58,6 @@ static void test_notify_all()
tt.join();
}

//目前还没法测试在多线程调度下,是否线程安全
static void test_notify_one()
{
using namespace std::chrono;

+ 3
- 4
vs_proj/librf.cpp View File

@@ -43,11 +43,10 @@ int main(int argc, const char* argv[])
//test_ring_queue<resumef::ring_queue_spinlock<int, false, uint32_t>>();
//test_ring_queue<resumef::ring_queue_lockfree<int, uint64_t>>();
//resumable_main_switch_scheduler();
//resumable_main_event_v2();
resumable_main_event();
resumable_main_event_v2();
resumable_main_event_timeout();
resumable_main_sleep();
//resumable_main_event_timeout();
//resumable_main_sleep();
return 0;
//if (argc > 1)

Loading…
Cancel
Save