#pragma once | #pragma once | ||||
RESUMEF_NS | |||||
{ | |||||
namespace detail | |||||
{ | |||||
struct event_impl; | |||||
typedef _awaker<event_impl> event_awaker; | |||||
typedef std::shared_ptr<event_awaker> event_awaker_ptr; | |||||
struct event_impl : public std::enable_shared_from_this<event_impl> | |||||
{ | |||||
private: | |||||
//typedef spinlock lock_type; | |||||
typedef std::recursive_mutex lock_type; | |||||
std::list<event_awaker_ptr> _awakes; | |||||
intptr_t _counter; | |||||
lock_type _lock; | |||||
public: | |||||
event_impl(intptr_t initial_counter_); | |||||
void signal(); | |||||
void reset(); | |||||
//如果已经触发了awaker,则返回true | |||||
bool wait_(const event_awaker_ptr& awaker); | |||||
template<class callee_t, class dummy_t = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, event_awaker_ptr>::value>> | |||||
decltype(auto) wait(callee_t&& awaker, dummy_t* dummy_ = nullptr) | |||||
{ | |||||
(void)dummy_; | |||||
return wait_(std::make_shared<event_awaker>(std::forward<callee_t>(awaker))); | |||||
} | |||||
event_impl(const event_impl&) = delete; | |||||
event_impl(event_impl&&) = delete; | |||||
event_impl& operator = (const event_impl&) = delete; | |||||
event_impl& operator = (event_impl&&) = delete; | |||||
}; | |||||
} | |||||
//提供一种在协程和非协程之间同步的手段。 | |||||
//典型用法是在非协程的线程,或者异步代码里,调用signal()方法触发信号, | |||||
//协程代码里,调用co_await wait()等系列方法等待同步。 | |||||
struct event_t | |||||
{ | |||||
typedef std::shared_ptr<detail::event_impl> event_impl_ptr; | |||||
typedef std::weak_ptr<detail::event_impl> event_impl_wptr; | |||||
typedef std::chrono::system_clock clock_type; | |||||
private: | |||||
event_impl_ptr _event; | |||||
struct wait_all_ctx; | |||||
public: | |||||
event_t(intptr_t initial_counter_ = 0); | |||||
void signal() const | |||||
{ | |||||
_event->signal(); | |||||
} | |||||
void reset() const | |||||
{ | |||||
_event->reset(); | |||||
} | |||||
future_t<bool> | |||||
wait() const; | |||||
template<class _Rep, class _Period> | |||||
future_t<bool> | |||||
wait_for(const std::chrono::duration<_Rep, _Period>& dt) const | |||||
{ | |||||
return wait_for_(std::chrono::duration_cast<clock_type::duration>(dt)); | |||||
} | |||||
template<class _Clock, class _Duration> | |||||
future_t<bool> | |||||
wait_until(const std::chrono::time_point<_Clock, _Duration>& tp) const | |||||
{ | |||||
return wait_until_(std::chrono::time_point_cast<clock_type::duration>(tp)); | |||||
} | |||||
template<class _Iter> | |||||
static future_t<intptr_t> | |||||
wait_any(_Iter begin_, _Iter end_) | |||||
{ | |||||
return wait_any_(make_event_vector(begin_, end_)); | |||||
} | |||||
template<class _Cont> | |||||
static future_t<intptr_t> | |||||
wait_any(const _Cont& cnt_) | |||||
{ | |||||
return wait_any_(make_event_vector(std::begin(cnt_), std::end(cnt_))); | |||||
} | |||||
template<class _Rep, class _Period, class _Iter> | |||||
static future_t<intptr_t> | |||||
wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_) | |||||
{ | |||||
return wait_any_for_(std::chrono::duration_cast<clock_type::duration>(dt), make_event_vector(begin_, end_)); | |||||
} | |||||
template<class _Rep, class _Period, class _Cont> | |||||
static future_t<intptr_t> | |||||
wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_) | |||||
{ | |||||
return wait_any_for_(std::chrono::duration_cast<clock_type::duration>(dt), make_event_vector(std::begin(cnt_), std::end(cnt_))); | |||||
} | |||||
template<class _Clock, class _Duration, class _Iter> | |||||
static future_t<intptr_t> | |||||
wait_any_until(const std::chrono::time_point<_Clock, _Duration>& tp, _Iter begin_, _Iter end_) | |||||
{ | |||||
return wait_any_until_(std::chrono::time_point_cast<clock_type::duration>(tp), make_event_vector(begin_, end_)); | |||||
} | |||||
template<class _Clock, class _Duration, class _Cont> | |||||
static future_t<intptr_t> | |||||
wait_any_until(const std::chrono::time_point<_Clock, _Duration>& tp, const _Cont& cnt_) | |||||
{ | |||||
return wait_any_until_(std::chrono::time_point_cast<clock_type::duration>(tp), make_event_vector(std::begin(cnt_), std::end(cnt_))); | |||||
} | |||||
template<class _Iter> | |||||
static future_t<bool> | |||||
wait_all(_Iter begin_, _Iter end_) | |||||
{ | |||||
return wait_all_(make_event_vector(begin_, end_)); | |||||
} | |||||
template<class _Cont> | |||||
static future_t<bool> | |||||
wait_all(const _Cont& cnt_) | |||||
{ | |||||
return wait_all(std::begin(cnt_), std::end(cnt_)); | |||||
} | |||||
template<class _Rep, class _Period, class _Iter> | |||||
static future_t<bool> | |||||
wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_) | |||||
{ | |||||
return wait_all_for_(std::chrono::duration_cast<clock_type::duration>(dt), make_event_vector(begin_, end_)); | |||||
} | |||||
template<class _Rep, class _Period, class _Cont> | |||||
static future_t<bool> | |||||
wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_) | |||||
{ | |||||
return wait_all_for_(std::chrono::duration_cast<clock_type::duration>(dt), make_event_vector(std::begin(cnt_), std::end(cnt_))); | |||||
} | |||||
template<class _Clock, class _Duration, class _Iter> | |||||
static future_t<bool> | |||||
wait_all_until(const std::chrono::time_point<_Clock, _Duration>& tp, _Iter begin_, _Iter end_) | |||||
{ | |||||
return wait_all_until_(std::chrono::time_point_cast<clock_type::duration>(tp), make_event_vector(begin_, end_)); | |||||
} | |||||
template<class _Clock, class _Duration, class _Cont> | |||||
static future_t<bool> | |||||
wait_all_until(const std::chrono::time_point<_Clock, _Duration>& tp, const _Cont& cnt_) | |||||
{ | |||||
return wait_all_until_(std::chrono::time_point_cast<clock_type::duration>(tp), make_event_vector(std::begin(cnt_), std::end(cnt_))); | |||||
} | |||||
event_t(const event_t&) = default; | |||||
event_t(event_t&&) = default; | |||||
event_t& operator = (const event_t&) = default; | |||||
event_t& operator = (event_t&&) = default; | |||||
private: | |||||
template<class _Iter> | |||||
static std::vector<event_impl_ptr> make_event_vector(_Iter begin_, _Iter end_) | |||||
{ | |||||
std::vector<event_impl_ptr> evts; | |||||
evts.reserve(std::distance(begin_, end_)); | |||||
for (auto i = begin_; i != end_; ++i) | |||||
evts.push_back((*i)._event); | |||||
return evts; | |||||
} | |||||
inline future_t<bool> wait_for_(const clock_type::duration& dt) const | |||||
{ | |||||
return wait_until_(clock_type::now() + dt); | |||||
} | |||||
future_t<bool> wait_until_(const clock_type::time_point& tp) const; | |||||
static future_t<intptr_t> wait_any_(std::vector<event_impl_ptr>&& evts); | |||||
inline static future_t<intptr_t> wait_any_for_(const clock_type::duration& dt, std::vector<event_impl_ptr>&& evts) | |||||
{ | |||||
return wait_any_until_(clock_type::now() + dt, std::forward<std::vector<event_impl_ptr>>(evts)); | |||||
} | |||||
static future_t<intptr_t> wait_any_until_(const clock_type::time_point& tp, std::vector<event_impl_ptr>&& evts); | |||||
static future_t<bool> wait_all_(std::vector<event_impl_ptr>&& evts); | |||||
inline static future_t<bool> wait_all_for_(const clock_type::duration& dt, std::vector<event_impl_ptr>&& evts) | |||||
{ | |||||
return wait_all_until_(clock_type::now() + dt, std::forward<std::vector<event_impl_ptr>>(evts)); | |||||
} | |||||
static future_t<bool> wait_all_until_(const clock_type::time_point& tp, std::vector<event_impl_ptr>&& evts); | |||||
}; | |||||
class async_manual_reset_event | |||||
{ | |||||
public: | |||||
async_manual_reset_event(bool initiallySet = false) noexcept | |||||
: m_state(initiallySet ? this : nullptr) | |||||
{} | |||||
// No copying/moving | |||||
async_manual_reset_event(const async_manual_reset_event&) = delete; | |||||
async_manual_reset_event(async_manual_reset_event&&) = delete; | |||||
async_manual_reset_event& operator=(const async_manual_reset_event&) = delete; | |||||
async_manual_reset_event& operator=(async_manual_reset_event&&) = delete; | |||||
bool is_set() const noexcept | |||||
{ | |||||
return m_state.load(std::memory_order_acquire) == this; | |||||
} | |||||
struct awaiter; | |||||
awaiter operator co_await() const noexcept; | |||||
void set() noexcept; | |||||
void reset() noexcept | |||||
{ | |||||
void* oldValue = this; | |||||
m_state.compare_exchange_strong(oldValue, nullptr, std::memory_order_acquire); | |||||
} | |||||
private: | |||||
friend struct awaiter; | |||||
// - 'this' => set state | |||||
// - otherwise => not set, head of linked list of awaiter*. | |||||
mutable std::atomic<void*> m_state; | |||||
}; | |||||
struct async_manual_reset_event::awaiter | |||||
{ | |||||
awaiter(const async_manual_reset_event& event) noexcept | |||||
: m_event(event) | |||||
{} | |||||
bool await_ready() const noexcept | |||||
{ | |||||
return m_event.is_set(); | |||||
} | |||||
bool await_suspend(coroutine_handle<> awaitingCoroutine) noexcept; | |||||
void await_resume() noexcept {} | |||||
private: | |||||
friend class async_manual_reset_event; | |||||
const async_manual_reset_event& m_event; | |||||
coroutine_handle<> m_awaitingCoroutine; | |||||
awaiter* m_next; | |||||
}; | |||||
inline async_manual_reset_event::awaiter async_manual_reset_event::operator co_await() const noexcept | |||||
{ | |||||
return awaiter{ *this }; | |||||
} | |||||
} | |||||
#include "event_v1.h" | |||||
#include "event_v2.h" |
#include "../librf.h" | |||||
RESUMEF_NS | |||||
{ | |||||
namespace detail | |||||
{ | |||||
event_impl::event_impl(intptr_t initial_counter_) | |||||
: _counter(initial_counter_) | |||||
{ | |||||
} | |||||
void event_impl::signal() | |||||
{ | |||||
scoped_lock<lock_type> lock_(this->_lock); | |||||
++this->_counter; | |||||
for (auto iter = this->_awakes.begin(); iter != this->_awakes.end(); ) | |||||
{ | |||||
auto awaker = *iter; | |||||
iter = this->_awakes.erase(iter); | |||||
if (awaker->awake(this, 1)) | |||||
{ | |||||
if (--this->_counter == 0) | |||||
break; | |||||
} | |||||
} | |||||
} | |||||
void event_impl::reset() | |||||
{ | |||||
scoped_lock<lock_type> lock_(this->_lock); | |||||
this->_awakes.clear(); | |||||
this->_counter = 0; | |||||
} | |||||
bool event_impl::wait_(const event_awaker_ptr & awaker) | |||||
{ | |||||
assert(awaker); | |||||
scoped_lock<lock_type> lock_(this->_lock); | |||||
if (this->_counter > 0) | |||||
{ | |||||
if(awaker->awake(this, 1)) | |||||
{ | |||||
--this->_counter; | |||||
return true; | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
this->_awakes.push_back(awaker); | |||||
} | |||||
return false; | |||||
} | |||||
} | |||||
event_t::event_t(intptr_t initial_counter_) | |||||
: _event(std::make_shared<detail::event_impl>(initial_counter_)) | |||||
{ | |||||
} | |||||
future_t<bool> event_t::wait() const | |||||
{ | |||||
awaitable_t<bool> awaitable; | |||||
auto awaker = std::make_shared<detail::event_awaker>( | |||||
[st = awaitable._state](detail::event_impl * e) -> bool | |||||
{ | |||||
st->set_value(e ? true : false); | |||||
return true; | |||||
}); | |||||
_event->wait_(awaker); | |||||
return awaitable.get_future(); | |||||
} | |||||
future_t<bool> event_t::wait_until_(const clock_type::time_point & tp) const | |||||
{ | |||||
awaitable_t<bool> awaitable; | |||||
auto awaker = std::make_shared<detail::event_awaker>( | |||||
[st = awaitable._state](detail::event_impl * e) -> bool | |||||
{ | |||||
st->set_value(e ? true : false); | |||||
return true; | |||||
}); | |||||
_event->wait_(awaker); | |||||
(void)this_scheduler()->timer()->add(tp, | |||||
[awaker](bool ) | |||||
{ | |||||
awaker->awake(nullptr, 1); | |||||
}); | |||||
return awaitable.get_future(); | |||||
} | |||||
struct wait_any_awaker | |||||
{ | |||||
typedef state_t<intptr_t> state_type; | |||||
counted_ptr<state_type> st; | |||||
std::vector<detail::event_impl *> evts; | |||||
wait_any_awaker(const counted_ptr<state_type> & st_, std::vector<detail::event_impl *> && evts_) | |||||
: st(st_) | |||||
, evts(std::forward<std::vector<detail::event_impl *>>(evts_)) | |||||
{} | |||||
wait_any_awaker(const wait_any_awaker &) = delete; | |||||
wait_any_awaker(wait_any_awaker &&) = default; | |||||
bool operator()(detail::event_impl * e) const | |||||
{ | |||||
if (e) | |||||
{ | |||||
for (auto i = evts.begin(); i != evts.end(); ++i) | |||||
{ | |||||
if (e == (*i)) | |||||
{ | |||||
st->set_value((intptr_t)(i - evts.begin())); | |||||
return true; | |||||
} | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
st->set_value(-1); | |||||
} | |||||
return false; | |||||
} | |||||
}; | |||||
future_t<intptr_t> event_t::wait_any_(std::vector<event_impl_ptr> && evts) | |||||
{ | |||||
awaitable_t<intptr_t> awaitable; | |||||
if (evts.size() <= 0) | |||||
{ | |||||
awaitable._state->set_value(-1); | |||||
return awaitable.get_future(); | |||||
} | |||||
auto awaker = std::make_shared<detail::event_awaker>( | |||||
[st = awaitable._state, evts](detail::event_impl * e) -> bool | |||||
{ | |||||
if (e) | |||||
{ | |||||
for (auto i = evts.begin(); i != evts.end(); ++i) | |||||
{ | |||||
if (e == (*i).get()) | |||||
{ | |||||
st->set_value((intptr_t)(i - evts.begin())); | |||||
return true; | |||||
} | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
st->set_value(-1); | |||||
} | |||||
return false; | |||||
}); | |||||
for (auto e : evts) | |||||
{ | |||||
e->wait_(awaker); | |||||
} | |||||
return awaitable.get_future(); | |||||
} | |||||
future_t<intptr_t> event_t::wait_any_until_(const clock_type::time_point & tp, std::vector<event_impl_ptr> && evts) | |||||
{ | |||||
awaitable_t<intptr_t> awaitable; | |||||
auto awaker = std::make_shared<detail::event_awaker>( | |||||
[st = awaitable._state, evts](detail::event_impl * e) -> bool | |||||
{ | |||||
if (e) | |||||
{ | |||||
for (auto i = evts.begin(); i != evts.end(); ++i) | |||||
{ | |||||
if (e == (*i).get()) | |||||
{ | |||||
st->set_value((intptr_t)(i - evts.begin())); | |||||
return true; | |||||
} | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
st->set_value(-1); | |||||
} | |||||
return false; | |||||
}); | |||||
for (auto e : evts) | |||||
{ | |||||
e->wait_(awaker); | |||||
} | |||||
(void)this_scheduler()->timer()->add(tp, | |||||
[awaker](bool ) | |||||
{ | |||||
awaker->awake(nullptr, 1); | |||||
}); | |||||
return awaitable.get_future(); | |||||
} | |||||
future_t<bool> event_t::wait_all_(std::vector<event_impl_ptr> && evts) | |||||
{ | |||||
awaitable_t<bool> awaitable; | |||||
if (evts.size() <= 0) | |||||
{ | |||||
awaitable._state->set_value(false); | |||||
return awaitable.get_future(); | |||||
} | |||||
auto awaker = std::make_shared<detail::event_awaker>( | |||||
[st = awaitable._state](detail::event_impl * e) -> bool | |||||
{ | |||||
st->set_value(e ? true : false); | |||||
return true; | |||||
}, | |||||
evts.size()); | |||||
for (auto e : evts) | |||||
{ | |||||
e->wait_(awaker); | |||||
} | |||||
return awaitable.get_future(); | |||||
} | |||||
struct event_t::wait_all_ctx | |||||
{ | |||||
//typedef spinlock lock_type; | |||||
typedef std::recursive_mutex lock_type; | |||||
counted_ptr<state_t<bool>> st; | |||||
std::vector<event_impl_ptr> evts; | |||||
std::vector<event_impl_ptr> evts_waited; | |||||
timer_handler th; | |||||
lock_type _lock; | |||||
wait_all_ctx() | |||||
{ | |||||
#if RESUMEF_DEBUG_COUNTER | |||||
++g_resumef_evtctx_count; | |||||
#endif | |||||
} | |||||
~wait_all_ctx() | |||||
{ | |||||
th.stop(); | |||||
#if RESUMEF_DEBUG_COUNTER | |||||
--g_resumef_evtctx_count; | |||||
#endif | |||||
} | |||||
bool awake(detail::event_impl * eptr) | |||||
{ | |||||
scoped_lock<lock_type> lock_(this->_lock); | |||||
//如果st为nullptr,则说明之前已经返回过值了。本环境无效了。 | |||||
if (!st.get()) | |||||
return false; | |||||
if (eptr) | |||||
{ | |||||
//记录已经等到的事件 | |||||
evts_waited.emplace_back(eptr->shared_from_this()); | |||||
//已经等到的事件达到预期 | |||||
if (evts_waited.size() == evts.size()) | |||||
{ | |||||
evts_waited.clear(); | |||||
//返回true表示等待成功 | |||||
st->set_value(true); | |||||
//丢弃st,以便于还有其它持有的ctx返回false | |||||
st.reset(); | |||||
//取消定时器 | |||||
th.stop(); | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
//超时后,恢复已经等待的事件计数 | |||||
for (auto sptr : evts_waited) | |||||
{ | |||||
sptr->signal(); | |||||
} | |||||
evts_waited.clear(); | |||||
//返回true表示等待失败 | |||||
st->set_value(false); | |||||
//丢弃st,以便于还有其它持有的ctx返回false | |||||
st.reset(); | |||||
//定时器句柄已经无意义了 | |||||
th.reset(); | |||||
} | |||||
return true; | |||||
} | |||||
}; | |||||
//等待所有的事件 | |||||
//超时后的行为应该表现为: | |||||
//要么所有的事件计数减一,要么所有事件计数不动 | |||||
//则需要超时后,恢复已经等待的事件计数 | |||||
future_t<bool> event_t::wait_all_until_(const clock_type::time_point & tp, std::vector<event_impl_ptr> && evts) | |||||
{ | |||||
awaitable_t<bool> awaitable; | |||||
if (evts.size() <= 0) | |||||
{ | |||||
(void)this_scheduler()->timer()->add_handler(tp, | |||||
[st = awaitable._state](bool ) | |||||
{ | |||||
st->set_value(false); | |||||
}); | |||||
return awaitable.get_future(); | |||||
} | |||||
auto ctx = std::make_shared<wait_all_ctx>(); | |||||
ctx->st = awaitable._state; | |||||
ctx->evts_waited.reserve(evts.size()); | |||||
ctx->evts = std::move(evts); | |||||
ctx->th = this_scheduler()->timer()->add_handler(tp, | |||||
[ctx](bool ) | |||||
{ | |||||
ctx->awake(nullptr); | |||||
}); | |||||
for (auto e : ctx->evts) | |||||
{ | |||||
auto awaker = std::make_shared<detail::event_awaker>( | |||||
[ctx](detail::event_impl * eptr) -> bool | |||||
{ | |||||
return ctx->awake(eptr); | |||||
}); | |||||
e->wait_(awaker); | |||||
} | |||||
return awaitable.get_future(); | |||||
} | |||||
void async_manual_reset_event::set() noexcept | |||||
{ | |||||
// Needs to be 'release' so that subsequent 'co_await' has | |||||
// visibility of our prior writes. | |||||
// Needs to be 'acquire' so that we have visibility of prior | |||||
// writes by awaiting coroutines. | |||||
void* oldValue = m_state.exchange(this, std::memory_order_acq_rel); | |||||
if (oldValue != this) | |||||
{ | |||||
// Wasn't already in 'set' state. | |||||
// Treat old value as head of a linked-list of waiters | |||||
// which we have now acquired and need to resume. | |||||
auto* waiters = static_cast<awaiter*>(oldValue); | |||||
while (waiters != nullptr) | |||||
{ | |||||
// Read m_next before resuming the coroutine as resuming | |||||
// the coroutine will likely destroy the awaiter object. | |||||
auto* next = waiters->m_next; | |||||
waiters->m_awaitingCoroutine.resume(); | |||||
waiters = next; | |||||
} | |||||
} | |||||
} | |||||
bool async_manual_reset_event::awaiter::await_suspend( | |||||
coroutine_handle<> awaitingCoroutine) noexcept | |||||
{ | |||||
// Special m_state value that indicates the event is in the 'set' state. | |||||
const void* const setState = &m_event; | |||||
// Remember the handle of the awaiting coroutine. | |||||
m_awaitingCoroutine = awaitingCoroutine; | |||||
// Try to atomically push this awaiter onto the front of the list. | |||||
void* oldValue = m_event.m_state.load(std::memory_order_acquire); | |||||
do | |||||
{ | |||||
// Resume immediately if already in 'set' state. | |||||
if (oldValue == setState) return false; | |||||
// Update linked list to point at current head. | |||||
m_next = static_cast<awaiter*>(oldValue); | |||||
// Finally, try to swap the old list head, inserting this awaiter | |||||
// as the new list head. | |||||
} while (!m_event.m_state.compare_exchange_weak( | |||||
oldValue, | |||||
this, | |||||
std::memory_order_release, | |||||
std::memory_order_acquire)); | |||||
// Successfully enqueued. Remain suspended. | |||||
return true; | |||||
} | |||||
} | |||||
#include "../librf.h" | |||||
RESUMEF_NS | |||||
{ | |||||
namespace detail | |||||
{ | |||||
event_impl::event_impl(intptr_t initial_counter_) | |||||
: _counter(initial_counter_) | |||||
{ | |||||
} | |||||
void event_impl::signal() | |||||
{ | |||||
scoped_lock<lock_type> lock_(this->_lock); | |||||
++this->_counter; | |||||
for (auto iter = this->_awakes.begin(); iter != this->_awakes.end(); ) | |||||
{ | |||||
auto awaker = *iter; | |||||
iter = this->_awakes.erase(iter); | |||||
if (awaker->awake(this, 1)) | |||||
{ | |||||
if (--this->_counter == 0) | |||||
break; | |||||
} | |||||
} | |||||
} | |||||
void event_impl::reset() | |||||
{ | |||||
scoped_lock<lock_type> lock_(this->_lock); | |||||
this->_awakes.clear(); | |||||
this->_counter = 0; | |||||
} | |||||
bool event_impl::wait_(const event_awaker_ptr& awaker) | |||||
{ | |||||
assert(awaker); | |||||
scoped_lock<lock_type> lock_(this->_lock); | |||||
if (this->_counter > 0) | |||||
{ | |||||
if (awaker->awake(this, 1)) | |||||
{ | |||||
--this->_counter; | |||||
return true; | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
this->_awakes.push_back(awaker); | |||||
} | |||||
return false; | |||||
} | |||||
} | |||||
inline namespace v1 | |||||
{ | |||||
event_t::event_t(intptr_t initial_counter_) | |||||
: _event(std::make_shared<detail::event_impl>(initial_counter_)) | |||||
{ | |||||
} | |||||
future_t<bool> event_t::wait() const | |||||
{ | |||||
awaitable_t<bool> awaitable; | |||||
auto awaker = std::make_shared<detail::event_awaker>( | |||||
[st = awaitable._state](detail::event_impl* e) -> bool | |||||
{ | |||||
st->set_value(e ? true : false); | |||||
return true; | |||||
}); | |||||
_event->wait_(awaker); | |||||
return awaitable.get_future(); | |||||
} | |||||
future_t<bool> event_t::wait_until_(const clock_type::time_point& tp) const | |||||
{ | |||||
awaitable_t<bool> awaitable; | |||||
auto awaker = std::make_shared<detail::event_awaker>( | |||||
[st = awaitable._state](detail::event_impl* e) -> bool | |||||
{ | |||||
st->set_value(e ? true : false); | |||||
return true; | |||||
}); | |||||
_event->wait_(awaker); | |||||
(void)this_scheduler()->timer()->add(tp, | |||||
[awaker](bool) | |||||
{ | |||||
awaker->awake(nullptr, 1); | |||||
}); | |||||
return awaitable.get_future(); | |||||
} | |||||
struct wait_any_awaker | |||||
{ | |||||
typedef state_t<intptr_t> state_type; | |||||
counted_ptr<state_type> st; | |||||
std::vector<detail::event_impl*> evts; | |||||
wait_any_awaker(const counted_ptr<state_type>& st_, std::vector<detail::event_impl*>&& evts_) | |||||
: st(st_) | |||||
, evts(std::forward<std::vector<detail::event_impl*>>(evts_)) | |||||
{} | |||||
wait_any_awaker(const wait_any_awaker&) = delete; | |||||
wait_any_awaker(wait_any_awaker&&) = default; | |||||
bool operator()(detail::event_impl* e) const | |||||
{ | |||||
if (e) | |||||
{ | |||||
for (auto i = evts.begin(); i != evts.end(); ++i) | |||||
{ | |||||
if (e == (*i)) | |||||
{ | |||||
st->set_value((intptr_t)(i - evts.begin())); | |||||
return true; | |||||
} | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
st->set_value(-1); | |||||
} | |||||
return false; | |||||
} | |||||
}; | |||||
future_t<intptr_t> event_t::wait_any_(std::vector<event_impl_ptr>&& evts) | |||||
{ | |||||
awaitable_t<intptr_t> awaitable; | |||||
if (evts.size() <= 0) | |||||
{ | |||||
awaitable._state->set_value(-1); | |||||
return awaitable.get_future(); | |||||
} | |||||
auto awaker = std::make_shared<detail::event_awaker>( | |||||
[st = awaitable._state, evts](detail::event_impl* e) -> bool | |||||
{ | |||||
if (e) | |||||
{ | |||||
for (auto i = evts.begin(); i != evts.end(); ++i) | |||||
{ | |||||
if (e == (*i).get()) | |||||
{ | |||||
st->set_value((intptr_t)(i - evts.begin())); | |||||
return true; | |||||
} | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
st->set_value(-1); | |||||
} | |||||
return false; | |||||
}); | |||||
for (auto e : evts) | |||||
{ | |||||
e->wait_(awaker); | |||||
} | |||||
return awaitable.get_future(); | |||||
} | |||||
future_t<intptr_t> event_t::wait_any_until_(const clock_type::time_point& tp, std::vector<event_impl_ptr>&& evts) | |||||
{ | |||||
awaitable_t<intptr_t> awaitable; | |||||
auto awaker = std::make_shared<detail::event_awaker>( | |||||
[st = awaitable._state, evts](detail::event_impl* e) -> bool | |||||
{ | |||||
if (e) | |||||
{ | |||||
for (auto i = evts.begin(); i != evts.end(); ++i) | |||||
{ | |||||
if (e == (*i).get()) | |||||
{ | |||||
st->set_value((intptr_t)(i - evts.begin())); | |||||
return true; | |||||
} | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
st->set_value(-1); | |||||
} | |||||
return false; | |||||
}); | |||||
for (auto e : evts) | |||||
{ | |||||
e->wait_(awaker); | |||||
} | |||||
(void)this_scheduler()->timer()->add(tp, | |||||
[awaker](bool) | |||||
{ | |||||
awaker->awake(nullptr, 1); | |||||
}); | |||||
return awaitable.get_future(); | |||||
} | |||||
future_t<bool> event_t::wait_all_(std::vector<event_impl_ptr>&& evts) | |||||
{ | |||||
awaitable_t<bool> awaitable; | |||||
if (evts.size() <= 0) | |||||
{ | |||||
awaitable._state->set_value(false); | |||||
return awaitable.get_future(); | |||||
} | |||||
auto awaker = std::make_shared<detail::event_awaker>( | |||||
[st = awaitable._state](detail::event_impl* e) -> bool | |||||
{ | |||||
st->set_value(e ? true : false); | |||||
return true; | |||||
}, | |||||
evts.size()); | |||||
for (auto e : evts) | |||||
{ | |||||
e->wait_(awaker); | |||||
} | |||||
return awaitable.get_future(); | |||||
} | |||||
struct event_t::wait_all_ctx | |||||
{ | |||||
//typedef spinlock lock_type; | |||||
typedef std::recursive_mutex lock_type; | |||||
counted_ptr<state_t<bool>> st; | |||||
std::vector<event_impl_ptr> evts; | |||||
std::vector<event_impl_ptr> evts_waited; | |||||
timer_handler th; | |||||
lock_type _lock; | |||||
wait_all_ctx() | |||||
{ | |||||
#if RESUMEF_DEBUG_COUNTER | |||||
++g_resumef_evtctx_count; | |||||
#endif | |||||
} | |||||
~wait_all_ctx() | |||||
{ | |||||
th.stop(); | |||||
#if RESUMEF_DEBUG_COUNTER | |||||
--g_resumef_evtctx_count; | |||||
#endif | |||||
} | |||||
bool awake(detail::event_impl* eptr) | |||||
{ | |||||
scoped_lock<lock_type> lock_(this->_lock); | |||||
//如果st为nullptr,则说明之前已经返回过值了。本环境无效了。 | |||||
if (!st.get()) | |||||
return false; | |||||
if (eptr) | |||||
{ | |||||
//记录已经等到的事件 | |||||
evts_waited.emplace_back(eptr->shared_from_this()); | |||||
//已经等到的事件达到预期 | |||||
if (evts_waited.size() == evts.size()) | |||||
{ | |||||
evts_waited.clear(); | |||||
//返回true表示等待成功 | |||||
st->set_value(true); | |||||
//丢弃st,以便于还有其它持有的ctx返回false | |||||
st.reset(); | |||||
//取消定时器 | |||||
th.stop(); | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
//超时后,恢复已经等待的事件计数 | |||||
for (auto sptr : evts_waited) | |||||
{ | |||||
sptr->signal(); | |||||
} | |||||
evts_waited.clear(); | |||||
//返回true表示等待失败 | |||||
st->set_value(false); | |||||
//丢弃st,以便于还有其它持有的ctx返回false | |||||
st.reset(); | |||||
//定时器句柄已经无意义了 | |||||
th.reset(); | |||||
} | |||||
return true; | |||||
} | |||||
}; | |||||
//等待所有的事件 | |||||
//超时后的行为应该表现为: | |||||
//要么所有的事件计数减一,要么所有事件计数不动 | |||||
//则需要超时后,恢复已经等待的事件计数 | |||||
future_t<bool> event_t::wait_all_until_(const clock_type::time_point& tp, std::vector<event_impl_ptr>&& evts) | |||||
{ | |||||
awaitable_t<bool> awaitable; | |||||
if (evts.size() <= 0) | |||||
{ | |||||
(void)this_scheduler()->timer()->add_handler(tp, | |||||
[st = awaitable._state](bool) | |||||
{ | |||||
st->set_value(false); | |||||
}); | |||||
return awaitable.get_future(); | |||||
} | |||||
auto ctx = std::make_shared<wait_all_ctx>(); | |||||
ctx->st = awaitable._state; | |||||
ctx->evts_waited.reserve(evts.size()); | |||||
ctx->evts = std::move(evts); | |||||
ctx->th = this_scheduler()->timer()->add_handler(tp, | |||||
[ctx](bool) | |||||
{ | |||||
ctx->awake(nullptr); | |||||
}); | |||||
for (auto e : ctx->evts) | |||||
{ | |||||
auto awaker = std::make_shared<detail::event_awaker>( | |||||
[ctx](detail::event_impl* eptr) -> bool | |||||
{ | |||||
return ctx->awake(eptr); | |||||
}); | |||||
e->wait_(awaker); | |||||
} | |||||
return awaitable.get_future(); | |||||
} | |||||
} | |||||
} |
#pragma once | |||||
RESUMEF_NS | |||||
{ | |||||
namespace detail | |||||
{ | |||||
struct event_impl; | |||||
typedef _awaker<event_impl> event_awaker; | |||||
typedef std::shared_ptr<event_awaker> event_awaker_ptr; | |||||
struct event_impl : public std::enable_shared_from_this<event_impl> | |||||
{ | |||||
private: | |||||
//typedef spinlock lock_type; | |||||
typedef std::recursive_mutex lock_type; | |||||
std::list<event_awaker_ptr> _awakes; | |||||
intptr_t _counter; | |||||
lock_type _lock; | |||||
public: | |||||
event_impl(intptr_t initial_counter_); | |||||
void signal(); | |||||
void reset(); | |||||
//如果已经触发了awaker,则返回true | |||||
bool wait_(const event_awaker_ptr& awaker); | |||||
template<class callee_t, class dummy_t = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, event_awaker_ptr>::value>> | |||||
decltype(auto) wait(callee_t&& awaker, dummy_t* dummy_ = nullptr) | |||||
{ | |||||
(void)dummy_; | |||||
return wait_(std::make_shared<event_awaker>(std::forward<callee_t>(awaker))); | |||||
} | |||||
event_impl(const event_impl&) = delete; | |||||
event_impl(event_impl&&) = delete; | |||||
event_impl& operator = (const event_impl&) = delete; | |||||
event_impl& operator = (event_impl&&) = delete; | |||||
}; | |||||
} | |||||
inline namespace v1 | |||||
{ | |||||
//提供一种在协程和非协程之间同步的手段。 | |||||
//典型用法是在非协程的线程,或者异步代码里,调用signal()方法触发信号, | |||||
//协程代码里,调用co_await wait()等系列方法等待同步。 | |||||
struct event_t | |||||
{ | |||||
typedef std::shared_ptr<detail::event_impl> event_impl_ptr; | |||||
typedef std::weak_ptr<detail::event_impl> event_impl_wptr; | |||||
typedef std::chrono::system_clock clock_type; | |||||
private: | |||||
event_impl_ptr _event; | |||||
struct wait_all_ctx; | |||||
public: | |||||
event_t(intptr_t initial_counter_ = 0); | |||||
void signal() const | |||||
{ | |||||
_event->signal(); | |||||
} | |||||
void reset() const | |||||
{ | |||||
_event->reset(); | |||||
} | |||||
future_t<bool> | |||||
wait() const; | |||||
template<class _Rep, class _Period> | |||||
future_t<bool> | |||||
wait_for(const std::chrono::duration<_Rep, _Period>& dt) const | |||||
{ | |||||
return wait_for_(std::chrono::duration_cast<clock_type::duration>(dt)); | |||||
} | |||||
template<class _Clock, class _Duration> | |||||
future_t<bool> | |||||
wait_until(const std::chrono::time_point<_Clock, _Duration>& tp) const | |||||
{ | |||||
return wait_until_(std::chrono::time_point_cast<clock_type::duration>(tp)); | |||||
} | |||||
template<class _Iter> | |||||
static future_t<intptr_t> | |||||
wait_any(_Iter begin_, _Iter end_) | |||||
{ | |||||
return wait_any_(make_event_vector(begin_, end_)); | |||||
} | |||||
template<class _Cont> | |||||
static future_t<intptr_t> | |||||
wait_any(const _Cont& cnt_) | |||||
{ | |||||
return wait_any_(make_event_vector(std::begin(cnt_), std::end(cnt_))); | |||||
} | |||||
template<class _Rep, class _Period, class _Iter> | |||||
static future_t<intptr_t> | |||||
wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_) | |||||
{ | |||||
return wait_any_for_(std::chrono::duration_cast<clock_type::duration>(dt), make_event_vector(begin_, end_)); | |||||
} | |||||
template<class _Rep, class _Period, class _Cont> | |||||
static future_t<intptr_t> | |||||
wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_) | |||||
{ | |||||
return wait_any_for_(std::chrono::duration_cast<clock_type::duration>(dt), make_event_vector(std::begin(cnt_), std::end(cnt_))); | |||||
} | |||||
template<class _Clock, class _Duration, class _Iter> | |||||
static future_t<intptr_t> | |||||
wait_any_until(const std::chrono::time_point<_Clock, _Duration>& tp, _Iter begin_, _Iter end_) | |||||
{ | |||||
return wait_any_until_(std::chrono::time_point_cast<clock_type::duration>(tp), make_event_vector(begin_, end_)); | |||||
} | |||||
template<class _Clock, class _Duration, class _Cont> | |||||
static future_t<intptr_t> | |||||
wait_any_until(const std::chrono::time_point<_Clock, _Duration>& tp, const _Cont& cnt_) | |||||
{ | |||||
return wait_any_until_(std::chrono::time_point_cast<clock_type::duration>(tp), make_event_vector(std::begin(cnt_), std::end(cnt_))); | |||||
} | |||||
template<class _Iter> | |||||
static future_t<bool> | |||||
wait_all(_Iter begin_, _Iter end_) | |||||
{ | |||||
return wait_all_(make_event_vector(begin_, end_)); | |||||
} | |||||
template<class _Cont> | |||||
static future_t<bool> | |||||
wait_all(const _Cont& cnt_) | |||||
{ | |||||
return wait_all(std::begin(cnt_), std::end(cnt_)); | |||||
} | |||||
template<class _Rep, class _Period, class _Iter> | |||||
static future_t<bool> | |||||
wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_) | |||||
{ | |||||
return wait_all_for_(std::chrono::duration_cast<clock_type::duration>(dt), make_event_vector(begin_, end_)); | |||||
} | |||||
template<class _Rep, class _Period, class _Cont> | |||||
static future_t<bool> | |||||
wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_) | |||||
{ | |||||
return wait_all_for_(std::chrono::duration_cast<clock_type::duration>(dt), make_event_vector(std::begin(cnt_), std::end(cnt_))); | |||||
} | |||||
template<class _Clock, class _Duration, class _Iter> | |||||
static future_t<bool> | |||||
wait_all_until(const std::chrono::time_point<_Clock, _Duration>& tp, _Iter begin_, _Iter end_) | |||||
{ | |||||
return wait_all_until_(std::chrono::time_point_cast<clock_type::duration>(tp), make_event_vector(begin_, end_)); | |||||
} | |||||
template<class _Clock, class _Duration, class _Cont> | |||||
static future_t<bool> | |||||
wait_all_until(const std::chrono::time_point<_Clock, _Duration>& tp, const _Cont& cnt_) | |||||
{ | |||||
return wait_all_until_(std::chrono::time_point_cast<clock_type::duration>(tp), make_event_vector(std::begin(cnt_), std::end(cnt_))); | |||||
} | |||||
event_t(const event_t&) = default; | |||||
event_t(event_t&&) = default; | |||||
event_t& operator = (const event_t&) = default; | |||||
event_t& operator = (event_t&&) = default; | |||||
private: | |||||
template<class _Iter> | |||||
static std::vector<event_impl_ptr> make_event_vector(_Iter begin_, _Iter end_) | |||||
{ | |||||
std::vector<event_impl_ptr> evts; | |||||
evts.reserve(std::distance(begin_, end_)); | |||||
for (auto i = begin_; i != end_; ++i) | |||||
evts.push_back((*i)._event); | |||||
return evts; | |||||
} | |||||
inline future_t<bool> wait_for_(const clock_type::duration& dt) const | |||||
{ | |||||
return wait_until_(clock_type::now() + dt); | |||||
} | |||||
future_t<bool> wait_until_(const clock_type::time_point& tp) const; | |||||
static future_t<intptr_t> wait_any_(std::vector<event_impl_ptr>&& evts); | |||||
inline static future_t<intptr_t> wait_any_for_(const clock_type::duration& dt, std::vector<event_impl_ptr>&& evts) | |||||
{ | |||||
return wait_any_until_(clock_type::now() + dt, std::forward<std::vector<event_impl_ptr>>(evts)); | |||||
} | |||||
static future_t<intptr_t> wait_any_until_(const clock_type::time_point& tp, std::vector<event_impl_ptr>&& evts); | |||||
static future_t<bool> wait_all_(std::vector<event_impl_ptr>&& evts); | |||||
inline static future_t<bool> wait_all_for_(const clock_type::duration& dt, std::vector<event_impl_ptr>&& evts) | |||||
{ | |||||
return wait_all_until_(clock_type::now() + dt, std::forward<std::vector<event_impl_ptr>>(evts)); | |||||
} | |||||
static future_t<bool> wait_all_until_(const clock_type::time_point& tp, std::vector<event_impl_ptr>&& evts); | |||||
}; | |||||
} | |||||
} |
#include "../librf.h" | |||||
RESUMEF_NS | |||||
{ | |||||
namespace detail | |||||
{ | |||||
void event_v2_impl::notify_all() noexcept | |||||
{ | |||||
// Needs to be 'release' so that subsequent 'co_await' has | |||||
// visibility of our prior writes. | |||||
// Needs to be 'acquire' so that we have visibility of prior | |||||
// writes by awaiting coroutines. | |||||
void* oldValue = m_state.exchange(this, std::memory_order_acq_rel); | |||||
if (oldValue != this) | |||||
{ | |||||
// Wasn't already in 'set' state. | |||||
// Treat old value as head of a linked-list of waiters | |||||
// which we have now acquired and need to resume. | |||||
state_event_t* state = static_cast<state_event_t*>(oldValue); | |||||
while (state != nullptr) | |||||
{ | |||||
// Read m_next before resuming the coroutine as resuming | |||||
// the coroutine will likely destroy the awaiter object. | |||||
auto* next = state->m_next; | |||||
state->on_notify(); | |||||
state = next; | |||||
} | |||||
} | |||||
} | |||||
void event_v2_impl::notify_one() noexcept | |||||
{ | |||||
// Needs to be 'release' so that subsequent 'co_await' has | |||||
// visibility of our prior writes. | |||||
// Needs to be 'acquire' so that we have visibility of prior | |||||
// writes by awaiting coroutines. | |||||
void* oldValue = m_state.exchange(nullptr, std::memory_order_acq_rel); | |||||
if (oldValue != this) | |||||
{ | |||||
// Wasn't already in 'set' state. | |||||
// Treat old value as head of a linked-list of waiters | |||||
// which we have now acquired and need to resume. | |||||
state_event_t* state = static_cast<state_event_t*>(oldValue); | |||||
if (state != nullptr) | |||||
{ | |||||
// Read m_next before resuming the coroutine as resuming | |||||
// the coroutine will likely destroy the awaiter object. | |||||
auto* next = state->m_next; | |||||
state->on_notify(); | |||||
if (next != nullptr) | |||||
add_notify_list(next); | |||||
} | |||||
} | |||||
} | |||||
bool event_v2_impl::add_notify_list(state_event_t* state) noexcept | |||||
{ | |||||
// Try to atomically push this awaiter onto the front of the list. | |||||
void* oldValue = m_state.load(std::memory_order_acquire); | |||||
do | |||||
{ | |||||
// Resume immediately if already in 'set' state. | |||||
if (oldValue == this) return false; | |||||
// Update linked list to point at current head. | |||||
state->m_next = static_cast<state_event_t*>(oldValue); | |||||
// Finally, try to swap the old list head, inserting this awaiter | |||||
// as the new list head. | |||||
} while (!m_state.compare_exchange_weak( | |||||
oldValue, | |||||
state, | |||||
std::memory_order_release, | |||||
std::memory_order_acquire)); | |||||
return true; | |||||
} | |||||
void state_event_t::destroy_deallocate() | |||||
{ | |||||
size_t _Size = sizeof(state_event_t); | |||||
#if RESUMEF_DEBUG_COUNTER | |||||
std::cout << "destroy_deallocate, size=" << _Size << std::endl; | |||||
#endif | |||||
this->~state_event_t(); | |||||
_Alloc_char _Al; | |||||
return _Al.deallocate(reinterpret_cast<char*>(this), _Size); | |||||
} | |||||
void state_event_t::resume() | |||||
{ | |||||
coroutine_handle<> handler = _coro; | |||||
if (handler) | |||||
{ | |||||
_coro = nullptr; | |||||
_scheduler->del_final(this); | |||||
handler.resume(); | |||||
} | |||||
} | |||||
bool state_event_t::has_handler() const noexcept | |||||
{ | |||||
return (bool)_coro; | |||||
} | |||||
} | |||||
namespace v2 | |||||
{ | |||||
event_t::event_t(bool initially) | |||||
:_event(std::make_shared<detail::event_v2_impl>(initially)) | |||||
{ | |||||
} | |||||
} | |||||
void async_manual_reset_event::set() noexcept | |||||
{ | |||||
// Needs to be 'release' so that subsequent 'co_await' has | |||||
// visibility of our prior writes. | |||||
// Needs to be 'acquire' so that we have visibility of prior | |||||
// writes by awaiting coroutines. | |||||
void* oldValue = m_state.exchange(this, std::memory_order_acq_rel); | |||||
if (oldValue != this) | |||||
{ | |||||
// Wasn't already in 'set' state. | |||||
// Treat old value as head of a linked-list of waiters | |||||
// which we have now acquired and need to resume. | |||||
auto* waiters = static_cast<awaiter*>(oldValue); | |||||
while (waiters != nullptr) | |||||
{ | |||||
// Read m_next before resuming the coroutine as resuming | |||||
// the coroutine will likely destroy the awaiter object. | |||||
auto* next = waiters->m_next; | |||||
waiters->m_awaitingCoroutine.resume(); | |||||
waiters = next; | |||||
} | |||||
} | |||||
} | |||||
bool async_manual_reset_event::awaiter::await_suspend( | |||||
coroutine_handle<> awaitingCoroutine) noexcept | |||||
{ | |||||
// Special m_state value that indicates the event is in the 'set' state. | |||||
const void* const setState = &m_event; | |||||
// Remember the handle of the awaiting coroutine. | |||||
m_awaitingCoroutine = awaitingCoroutine; | |||||
// Try to atomically push this awaiter onto the front of the list. | |||||
void* oldValue = m_event.m_state.load(std::memory_order_acquire); | |||||
do | |||||
{ | |||||
// Resume immediately if already in 'set' state. | |||||
if (oldValue == setState) return false; | |||||
// Update linked list to point at current head. | |||||
m_next = static_cast<awaiter*>(oldValue); | |||||
// Finally, try to swap the old list head, inserting this awaiter | |||||
// as the new list head. | |||||
} while (!m_event.m_state.compare_exchange_weak( | |||||
oldValue, | |||||
this, | |||||
std::memory_order_release, | |||||
std::memory_order_acquire)); | |||||
// Successfully enqueued. Remain suspended. | |||||
return true; | |||||
} | |||||
} |
#pragma once | |||||
RESUMEF_NS | |||||
{ | |||||
namespace detail | |||||
{ | |||||
struct state_event_t; | |||||
//仿照cppcoro的event是行不通的。 | |||||
//虽然cppcoro的event的触发和等待之间是线程安全的,但是并不能实现只触发指定数量。并且多线程触发之间是不安全的。 | |||||
//所以,还得用锁结构来实现(等待实现,今日不空)。 | |||||
struct event_v2_impl : public std::enable_shared_from_this<event_v2_impl> | |||||
{ | |||||
event_v2_impl(bool initially = false) noexcept | |||||
: m_state(initially ? this : nullptr) | |||||
{} | |||||
// No copying/moving | |||||
event_v2_impl(const event_v2_impl&) = delete; | |||||
event_v2_impl(event_v2_impl&&) = delete; | |||||
event_v2_impl& operator=(const event_v2_impl&) = delete; | |||||
event_v2_impl& operator=(event_v2_impl&&) = delete; | |||||
bool is_set() const noexcept | |||||
{ | |||||
return m_state.load(std::memory_order_acquire) == this; | |||||
} | |||||
void reset() noexcept | |||||
{ | |||||
void* oldValue = this; | |||||
m_state.compare_exchange_strong(oldValue, nullptr, std::memory_order_acquire); | |||||
} | |||||
void notify_all() noexcept; //多线程同时调用notify_one/notify_all是非线程安全的 | |||||
void notify_one() noexcept; //多线程同时调用notify_one/notify_all是非线程安全的 | |||||
bool add_notify_list(state_event_t* state) noexcept; | |||||
private: | |||||
mutable std::atomic<void*> m_state; //event_v2_impl or state_event_t | |||||
}; | |||||
struct state_event_t : public state_base_t | |||||
{ | |||||
virtual void resume() override; | |||||
virtual bool has_handler() const noexcept override; | |||||
void on_notify() | |||||
{ | |||||
assert(this->_scheduler != nullptr); | |||||
if (this->_coro) | |||||
this->_scheduler->add_generator(this); | |||||
} | |||||
//将自己加入到通知链表里 | |||||
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>> | |||||
bool event_await_suspend(coroutine_handle<_PromiseT> handler) noexcept | |||||
{ | |||||
_PromiseT& promise = handler.promise(); | |||||
auto* parent_state = promise.get_state(); | |||||
scheduler_t* sch = parent_state->get_scheduler(); | |||||
this->_scheduler = sch; | |||||
this->_coro = handler; | |||||
return m_event->add_notify_list(this); | |||||
} | |||||
static state_event_t* _Alloc_state(event_v2_impl * e) | |||||
{ | |||||
_Alloc_char _Al; | |||||
size_t _Size = sizeof(state_event_t); | |||||
#if RESUMEF_DEBUG_COUNTER | |||||
std::cout << "state_event_t::alloc, size=" << sizeof(state_event_t) << std::endl; | |||||
#endif | |||||
char* _Ptr = _Al.allocate(_Size); | |||||
return new(_Ptr) state_event_t(e); | |||||
} | |||||
private: | |||||
friend struct event_v2_impl; | |||||
state_event_t(event_v2_impl * e) noexcept | |||||
{ | |||||
if (e != nullptr) | |||||
m_event = e->shared_from_this(); | |||||
} | |||||
std::shared_ptr<event_v2_impl> m_event; | |||||
state_event_t* m_next = nullptr; | |||||
virtual void destroy_deallocate() override; | |||||
}; | |||||
} | |||||
namespace v2 | |||||
{ | |||||
struct event_t | |||||
{ | |||||
typedef std::shared_ptr<detail::event_v2_impl> event_impl_ptr; | |||||
typedef std::weak_ptr<detail::event_v2_impl> event_impl_wptr; | |||||
typedef std::chrono::system_clock clock_type; | |||||
event_t(bool initially = false); | |||||
void notify_all() const noexcept | |||||
{ | |||||
_event->notify_all(); | |||||
} | |||||
void notify_one() const noexcept | |||||
{ | |||||
_event->notify_one(); | |||||
} | |||||
void reset() const noexcept | |||||
{ | |||||
_event->reset(); | |||||
} | |||||
struct awaiter | |||||
{ | |||||
awaiter(detail::event_v2_impl* e) noexcept | |||||
: _event(e) | |||||
{} | |||||
bool await_ready() const noexcept | |||||
{ | |||||
return _event->is_set(); | |||||
} | |||||
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>> | |||||
bool await_suspend(coroutine_handle<_PromiseT> handler) noexcept | |||||
{ | |||||
_state = detail::state_event_t::_Alloc_state(_event); | |||||
return _state->event_await_suspend(handler); | |||||
} | |||||
void await_resume() noexcept | |||||
{ | |||||
} | |||||
private: | |||||
detail::event_v2_impl * _event; | |||||
counted_ptr<detail::state_event_t> _state; | |||||
}; | |||||
awaiter operator co_await() const noexcept | |||||
{ | |||||
return { _event.get() }; | |||||
} | |||||
private: | |||||
event_impl_ptr _event; | |||||
}; | |||||
} | |||||
class async_manual_reset_event | |||||
{ | |||||
public: | |||||
async_manual_reset_event(bool initiallySet = false) noexcept | |||||
: m_state(initiallySet ? this : nullptr) | |||||
{} | |||||
// No copying/moving | |||||
async_manual_reset_event(const async_manual_reset_event&) = delete; | |||||
async_manual_reset_event(async_manual_reset_event&&) = delete; | |||||
async_manual_reset_event& operator=(const async_manual_reset_event&) = delete; | |||||
async_manual_reset_event& operator=(async_manual_reset_event&&) = delete; | |||||
bool is_set() const noexcept | |||||
{ | |||||
return m_state.load(std::memory_order_acquire) == this; | |||||
} | |||||
struct awaiter; | |||||
awaiter operator co_await() const noexcept; | |||||
void set() noexcept; | |||||
void reset() noexcept | |||||
{ | |||||
void* oldValue = this; | |||||
m_state.compare_exchange_strong(oldValue, nullptr, std::memory_order_acquire); | |||||
} | |||||
private: | |||||
friend struct awaiter; | |||||
// - 'this' => set state | |||||
// - otherwise => not set, head of linked list of awaiter*. | |||||
mutable std::atomic<void*> m_state; | |||||
}; | |||||
struct async_manual_reset_event::awaiter | |||||
{ | |||||
awaiter(const async_manual_reset_event& event) noexcept | |||||
: m_event(event) | |||||
{} | |||||
bool await_ready() const noexcept | |||||
{ | |||||
return m_event.is_set(); | |||||
} | |||||
bool await_suspend(coroutine_handle<> awaitingCoroutine) noexcept; | |||||
void await_resume() noexcept {} | |||||
private: | |||||
friend class async_manual_reset_event; | |||||
const async_manual_reset_event& m_event; | |||||
coroutine_handle<> m_awaitingCoroutine; | |||||
awaiter* m_next; | |||||
}; | |||||
inline async_manual_reset_event::awaiter async_manual_reset_event::operator co_await() const noexcept | |||||
{ | |||||
return awaiter{ *this }; | |||||
} | |||||
} |
namespace detail | namespace detail | ||||
{ | { | ||||
struct mutex_impl; | struct mutex_impl; | ||||
typedef _awaker<mutex_impl> mutex_awaker; | |||||
typedef ::resumef::detail::_awaker<mutex_impl> mutex_awaker; | |||||
typedef std::shared_ptr<mutex_awaker> mutex_awaker_ptr; | typedef std::shared_ptr<mutex_awaker> mutex_awaker_ptr; | ||||
struct mutex_impl : public std::enable_shared_from_this<mutex_impl> | struct mutex_impl : public std::enable_shared_from_this<mutex_impl> |
#pragma once | |||||
//BUILTIN(__builtin_coro_resume, "vv*", "") | |||||
//BUILTIN(__builtin_coro_destroy, "vv*", "") | |||||
//BUILTIN(__builtin_coro_done, "bv*", "n") | |||||
//BUILTIN(__builtin_coro_promise, "v*v*IiIb", "n") | |||||
// | |||||
//BUILTIN(__builtin_coro_size, "z", "n") | |||||
//BUILTIN(__builtin_coro_frame, "v*", "n") | |||||
//BUILTIN(__builtin_coro_noop, "v*", "n") | |||||
//BUILTIN(__builtin_coro_free, "v*v*", "n") | |||||
// | |||||
//BUILTIN(__builtin_coro_id, "v*Iiv*v*v*", "n") | |||||
//BUILTIN(__builtin_coro_alloc, "b", "n") | |||||
//BUILTIN(__builtin_coro_begin, "v*v*", "n") | |||||
//BUILTIN(__builtin_coro_end, "bv*Ib", "n") | |||||
//BUILTIN(__builtin_coro_suspend, "cIb", "n") | |||||
//BUILTIN(__builtin_coro_param, "bv*v*", "n") | |||||
extern "C" void __builtin_coro_resume(void* addr); | |||||
extern "C" void __builtin_coro_destroy(void* addr); | |||||
extern "C" bool __builtin_coro_done(void* addr); | |||||
extern "C" void* __builtin_coro_promise(void* addr, int alignment, bool from_promise); | |||||
#pragma intrinsic(__builtin_coro_resume) | |||||
#pragma intrinsic(__builtin_coro_destroy) | |||||
#pragma intrinsic(__builtin_coro_done) | |||||
#pragma intrinsic(__builtin_coro_promise) | |||||
extern "C" size_t __builtin_coro_size(); | |||||
extern "C" void* __builtin_coro_frame(); | |||||
extern "C" void* __builtin_coro_free(void* coro_frame); | |||||
#pragma intrinsic(__builtin_coro_size) | |||||
#pragma intrinsic(__builtin_coro_frame) | |||||
#pragma intrinsic(__builtin_coro_free) | |||||
extern "C" void* __builtin_coro_id(int align, void* promise, void* fnaddr, void* parts); | |||||
extern "C" bool __builtin_coro_alloc(); | |||||
//extern "C" void* __builtin_coro_begin(void* memory); | |||||
//extern "C" void* __builtin_coro_end(void* coro_frame, bool unwind); | |||||
extern "C" char __builtin_coro_suspend(bool final); | |||||
extern "C" bool __builtin_coro_param(void* original, void* copy); | |||||
#pragma intrinsic(__builtin_coro_id) | |||||
#pragma intrinsic(__builtin_coro_alloc) | |||||
//#pragma intrinsic(__builtin_coro_begin) | |||||
//#pragma intrinsic(__builtin_coro_end) | |||||
#pragma intrinsic(__builtin_coro_suspend) | |||||
#pragma intrinsic(__builtin_coro_param) | |||||
#ifdef __clang__ | |||||
#define _coro_frame_size() __builtin_coro_size() | |||||
#define _coro_frame_ptr() __builtin_coro_frame() | |||||
#endif |
| |||||
#include <chrono> | |||||
#include <iostream> | |||||
#include <string> | |||||
#include <conio.h> | |||||
#include <thread> | |||||
#include "librf.h" | |||||
using namespace resumef; | |||||
//非协程的逻辑线程,或异步代码,可以通过event_t通知到协程,并且不会阻塞协程所在的线程。 | |||||
std::thread async_set_event_all(const v2::event_t & e, std::chrono::milliseconds dt) | |||||
{ | |||||
return std::thread([=] | |||||
{ | |||||
std::this_thread::sleep_for(dt); | |||||
e.notify_all(); | |||||
}); | |||||
} | |||||
std::thread async_set_event_one(const v2::event_t& e, std::chrono::milliseconds dt) | |||||
{ | |||||
return std::thread([=] | |||||
{ | |||||
std::this_thread::sleep_for(dt); | |||||
e.notify_one(); | |||||
}); | |||||
} | |||||
future_t<> resumable_wait_event(const v2::event_t & e, int idx) | |||||
{ | |||||
co_await e; | |||||
std::cout << "[" << idx << "]event signal!" << std::endl; | |||||
} | |||||
void test_notify_all() | |||||
{ | |||||
using namespace std::chrono; | |||||
{ | |||||
v2::event_t evt; | |||||
go resumable_wait_event(evt, 0); | |||||
go resumable_wait_event(evt, 1); | |||||
go resumable_wait_event(evt, 2); | |||||
auto tt = async_set_event_all(evt, 100ms); | |||||
this_scheduler()->run_until_notask(); | |||||
tt.join(); | |||||
} | |||||
} | |||||
//目前还没法测试在多线程调度下,是否线程安全 | |||||
void test_notify_one() | |||||
{ | |||||
using namespace std::chrono; | |||||
{ | |||||
v2::event_t evt; | |||||
go resumable_wait_event(evt, 10); | |||||
go resumable_wait_event(evt, 11); | |||||
go resumable_wait_event(evt, 12); | |||||
auto tt1 = async_set_event_one(evt, 100ms); | |||||
auto tt2 = async_set_event_one(evt, 500ms); | |||||
auto tt3 = async_set_event_one(evt, 800ms); | |||||
this_scheduler()->run_until_notask(); | |||||
tt1.join(); | |||||
tt2.join(); | |||||
tt3.join(); | |||||
} | |||||
} | |||||
void resumable_main_event_v2() | |||||
{ | |||||
test_notify_all(); | |||||
std::cout << std::endl; | |||||
test_notify_one(); | |||||
std::cout << std::endl; | |||||
} |
extern void resumable_main_mutex(); | extern void resumable_main_mutex(); | ||||
extern void resumable_main_exception(); | extern void resumable_main_exception(); | ||||
extern void resumable_main_event(); | extern void resumable_main_event(); | ||||
extern void resumable_main_event_v2(); | |||||
extern void resumable_main_event_timeout(); | extern void resumable_main_event_timeout(); | ||||
extern void resumable_main_dynamic_go(); | extern void resumable_main_dynamic_go(); | ||||
extern void resumable_main_channel(); | extern void resumable_main_channel(); | ||||
{ | { | ||||
(void)argc; | (void)argc; | ||||
(void)argv; | (void)argv; | ||||
//resumable_main_layout(); | |||||
//return 0; | |||||
resumable_main_event_v2(); | |||||
return 0; | |||||
//if (argc > 1) | //if (argc > 1) | ||||
// resumable_main_benchmark_asio_client(atoi(argv[1])); | // resumable_main_benchmark_asio_client(atoi(argv[1])); | ||||
resumable_main_benchmark_mem(false); | resumable_main_benchmark_mem(false); | ||||
resumable_main_mutex(); | resumable_main_mutex(); | ||||
resumable_main_event(); | resumable_main_event(); | ||||
resumable_main_event_v2(); | |||||
resumable_main_event_timeout(); | resumable_main_event_timeout(); | ||||
resumable_main_channel(); | resumable_main_channel(); | ||||
resumable_main_channel_mult_thread(); | resumable_main_channel_mult_thread(); |
</PropertyGroup> | </PropertyGroup> | ||||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration"> | <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration"> | ||||
<ConfigurationType>Application</ConfigurationType> | <ConfigurationType>Application</ConfigurationType> | ||||
<PlatformToolset>ClangCL</PlatformToolset> | |||||
<PlatformToolset>v142</PlatformToolset> | |||||
<UseDebugLibraries>true</UseDebugLibraries> | <UseDebugLibraries>true</UseDebugLibraries> | ||||
</PropertyGroup> | </PropertyGroup> | ||||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration"> | <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration"> | ||||
<ClCompile Include="..\benchmark\benchmark_channel_passing_next.cpp"> | <ClCompile Include="..\benchmark\benchmark_channel_passing_next.cpp"> | ||||
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild> | <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild> | ||||
</ClCompile> | </ClCompile> | ||||
<ClCompile Include="..\librf\src\event.cpp" /> | |||||
<ClCompile Include="..\librf\src\event_v1.cpp" /> | |||||
<ClCompile Include="..\librf\src\event_v2.cpp" /> | |||||
<ClCompile Include="..\librf\src\mutex.cpp" /> | <ClCompile Include="..\librf\src\mutex.cpp" /> | ||||
<ClCompile Include="..\librf\src\rf_task.cpp" /> | <ClCompile Include="..\librf\src\rf_task.cpp" /> | ||||
<ClCompile Include="..\librf\src\scheduler.cpp" /> | <ClCompile Include="..\librf\src\scheduler.cpp" /> | ||||
<ClCompile Include="..\tutorial\test_async_dynamic_go.cpp" /> | <ClCompile Include="..\tutorial\test_async_dynamic_go.cpp" /> | ||||
<ClCompile Include="..\tutorial\test_async_event.cpp" /> | <ClCompile Include="..\tutorial\test_async_event.cpp" /> | ||||
<ClCompile Include="..\tutorial\test_async_event_timeout.cpp" /> | <ClCompile Include="..\tutorial\test_async_event_timeout.cpp" /> | ||||
<ClCompile Include="..\tutorial\test_async_event_v2.cpp" /> | |||||
<ClCompile Include="..\tutorial\test_async_exception.cpp" /> | <ClCompile Include="..\tutorial\test_async_exception.cpp" /> | ||||
<ClCompile Include="..\tutorial\test_async_memory_layout.cpp" /> | <ClCompile Include="..\tutorial\test_async_memory_layout.cpp" /> | ||||
<ClCompile Include="..\tutorial\test_async_modern_cb.cpp" /> | <ClCompile Include="..\tutorial\test_async_modern_cb.cpp" /> | ||||
<ClInclude Include="..\librf\src\counted_ptr.h" /> | <ClInclude Include="..\librf\src\counted_ptr.h" /> | ||||
<ClInclude Include="..\librf\src\def.h" /> | <ClInclude Include="..\librf\src\def.h" /> | ||||
<ClInclude Include="..\librf\src\event.h" /> | <ClInclude Include="..\librf\src\event.h" /> | ||||
<ClInclude Include="..\librf\src\event_v1.h" /> | |||||
<ClInclude Include="..\librf\src\event_v2.h" /> | |||||
<ClInclude Include="..\librf\src\future.h" /> | <ClInclude Include="..\librf\src\future.h" /> | ||||
<ClInclude Include="..\librf\src\generator.h" /> | <ClInclude Include="..\librf\src\generator.h" /> | ||||
<ClInclude Include="..\librf\src\promise.h" /> | <ClInclude Include="..\librf\src\promise.h" /> |
<ClCompile Include="librf.cpp"> | <ClCompile Include="librf.cpp"> | ||||
<Filter>Source Files</Filter> | <Filter>Source Files</Filter> | ||||
</ClCompile> | </ClCompile> | ||||
<ClCompile Include="..\librf\src\event.cpp"> | |||||
<Filter>librf\src</Filter> | |||||
</ClCompile> | |||||
<ClCompile Include="..\librf\src\mutex.cpp"> | <ClCompile Include="..\librf\src\mutex.cpp"> | ||||
<Filter>librf\src</Filter> | <Filter>librf\src</Filter> | ||||
</ClCompile> | </ClCompile> | ||||
<ClCompile Include="..\tutorial\test_async_switch_scheduler.cpp"> | <ClCompile Include="..\tutorial\test_async_switch_scheduler.cpp"> | ||||
<Filter>tutorial</Filter> | <Filter>tutorial</Filter> | ||||
</ClCompile> | </ClCompile> | ||||
<ClCompile Include="..\librf\src\event_v2.cpp"> | |||||
<Filter>librf\src</Filter> | |||||
</ClCompile> | |||||
<ClCompile Include="..\librf\src\event_v1.cpp"> | |||||
<Filter>librf\src</Filter> | |||||
</ClCompile> | |||||
<ClCompile Include="..\tutorial\test_async_event_v2.cpp"> | |||||
<Filter>tutorial</Filter> | |||||
</ClCompile> | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<ClInclude Include="..\librf\librf.h"> | <ClInclude Include="..\librf\librf.h"> | ||||
<ClInclude Include="..\librf\src\unix\clang_builtin.h"> | <ClInclude Include="..\librf\src\unix\clang_builtin.h"> | ||||
<Filter>librf\src\unix</Filter> | <Filter>librf\src\unix</Filter> | ||||
</ClInclude> | </ClInclude> | ||||
<ClInclude Include="..\librf\src\event_v1.h"> | |||||
<Filter>librf\src</Filter> | |||||
</ClInclude> | |||||
<ClInclude Include="..\librf\src\event_v2.h"> | |||||
<Filter>librf\src</Filter> | |||||
</ClInclude> | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<None Include="..\librf\src\asio_task_1.12.0.inl"> | <None Include="..\librf\src\asio_task_1.12.0.inl"> |