bool try_wait_one() noexcept | bool try_wait_one() noexcept | ||||
{ | { | ||||
if (_counter.load(std::memory_order_acquire) > 0) | |||||
if (_counter.load(std::memory_order_consume) > 0) | |||||
{ | { | ||||
if (_counter.fetch_add(-1, std::memory_order_acq_rel) > 0) | if (_counter.fetch_add(-1, std::memory_order_acq_rel) > 0) | ||||
return true; | return true; | ||||
bool await_ready() noexcept | bool await_ready() noexcept | ||||
{ | { | ||||
scoped_lock<detail::event_v2_impl::lock_type> lock_(_event->_lock); | |||||
return _event->try_wait_one(); | return _event->try_wait_one(); | ||||
} | } | ||||
auto* parent_state = promise.get_state(); | auto* parent_state = promise.get_state(); | ||||
scheduler_t* sch = parent_state->get_scheduler(); | scheduler_t* sch = parent_state->get_scheduler(); | ||||
_state->_thandler = sch->timer()->add_handler(_tp, [_state = _state](bool canceld) | |||||
this->_state->_thandler = sch->timer()->add_handler(_tp, [st = this->_state](bool canceld) | |||||
{ | { | ||||
if (!canceld) | if (!canceld) | ||||
_state->on_timeout(); | |||||
st->on_timeout(); | |||||
}); | }); | ||||
return true; | return true; | ||||
bool await_ready() noexcept | bool await_ready() noexcept | ||||
{ | { | ||||
if (_begin == _end) | |||||
return true; | |||||
for (auto iter = _begin; iter != _end; ++iter) | |||||
{ | |||||
detail::event_v2_impl* evt = (*iter)._event.get(); | |||||
if (evt->try_wait_one()) | |||||
{ | |||||
_event = evt; | |||||
return true; | |||||
} | |||||
} | |||||
return false; | |||||
return _begin == _end; | |||||
} | } | ||||
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>> | template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>> | ||||
bool await_suspend(coroutine_handle<_PromiseT> handler) | bool await_suspend(coroutine_handle<_PromiseT> handler) | ||||
{ | { | ||||
std::vector<detail::event_v2_impl::lock_type> lockes; | |||||
using ref_lock_type = std::reference_wrapper<detail::event_v2_impl::lock_type>; | |||||
std::vector<ref_lock_type> lockes; | |||||
lockes.reserve(std::distance(_begin, _end)); | lockes.reserve(std::distance(_begin, _end)); | ||||
for (auto iter = _begin; iter != _end; ++iter) | for (auto iter = _begin; iter != _end; ++iter) | ||||
{ | { | ||||
detail::event_v2_impl* evt = (*iter)._event.get(); | detail::event_v2_impl* evt = (*iter)._event.get(); | ||||
lockes.push_back(evt->_lock); | |||||
lockes.emplace_back(std::ref(evt->_lock)); | |||||
} | } | ||||
scoped_lock_range<detail::event_v2_impl::lock_type> lock_(lockes); | |||||
scoped_lock_range<ref_lock_type> lock_(lockes); | |||||
for (auto iter = _begin; iter != _end; ++iter) | for (auto iter = _begin; iter != _end; ++iter) | ||||
{ | { | ||||
struct [[nodiscard]] event_t::timeout_any_awaiter : timeout_awaitor_impl<event_t::any_awaiter<_Iter>> | struct [[nodiscard]] event_t::timeout_any_awaiter : timeout_awaitor_impl<event_t::any_awaiter<_Iter>> | ||||
{ | { | ||||
timeout_any_awaiter(clock_type::time_point tp, _Iter begin, _Iter end) noexcept | timeout_any_awaiter(clock_type::time_point tp, _Iter begin, _Iter end) noexcept | ||||
: timeout_awaitor_impl(tp, begin, end) | |||||
: timeout_awaitor_impl<event_t::any_awaiter<_Iter>>(tp, begin, end) | |||||
{} | {} | ||||
}; | }; | ||||
{ | { | ||||
intptr_t count = std::distance(_begin, _end); | intptr_t count = std::distance(_begin, _end); | ||||
std::vector<detail::event_v2_impl::lock_type> lockes; | |||||
using ref_lock_type = std::reference_wrapper<detail::event_v2_impl::lock_type>; | |||||
std::vector<ref_lock_type> lockes; | |||||
lockes.reserve(count); | lockes.reserve(count); | ||||
for (auto iter = _begin; iter != _end; ++iter) | for (auto iter = _begin; iter != _end; ++iter) | ||||
_state = new detail::state_event_all_t(count, _value); | _state = new detail::state_event_all_t(count, _value); | ||||
(void)_state->on_await_suspend(handler); | (void)_state->on_await_suspend(handler); | ||||
scoped_lock_range<detail::event_v2_impl::lock_type> lock_(lockes); | |||||
scoped_lock_range<ref_lock_type> lock_(lockes); | |||||
for (auto iter = _begin; iter != _end; ++iter) | for (auto iter = _begin; iter != _end; ++iter) | ||||
{ | { | ||||
struct [[nodiscard]] event_t::timeout_all_awaiter : timeout_awaitor_impl<event_t::all_awaiter<_Iter>> | struct [[nodiscard]] event_t::timeout_all_awaiter : timeout_awaitor_impl<event_t::all_awaiter<_Iter>> | ||||
{ | { | ||||
timeout_all_awaiter(clock_type::time_point tp, _Iter begin, _Iter end) noexcept | timeout_all_awaiter(clock_type::time_point tp, _Iter begin, _Iter end) noexcept | ||||
: timeout_awaitor_impl(tp, begin, end) | |||||
: timeout_awaitor_impl<event_t::all_awaiter<_Iter>>(tp, begin, end) | |||||
{} | {} | ||||
}; | }; | ||||
namespace detail | namespace detail | ||||
{ | { | ||||
template<class _Ty> | |||||
void _Lock_ref(_Ty& _LkN) | |||||
{ | |||||
_LkN.lock(); | |||||
} | |||||
template<class _Ty> | |||||
void _Lock_ref(std::reference_wrapper<_Ty> _LkN) | |||||
{ | |||||
_LkN.get().lock(); | |||||
} | |||||
template<class _Ty> | |||||
void _Unlock_ref(_Ty& _LkN) | |||||
{ | |||||
_LkN.unlock(); | |||||
} | |||||
template<class _Ty> | |||||
void _Unlock_ref(std::reference_wrapper<_Ty> _LkN) | |||||
{ | |||||
_LkN.get().unlock(); | |||||
} | |||||
template<class _Ty> | |||||
bool _Try_lock_ref(_Ty& _LkN) | |||||
{ | |||||
return _LkN.try_lock(); | |||||
} | |||||
template<class _Ty> | |||||
bool _Try_lock_ref(std::reference_wrapper<_Ty> _LkN) | |||||
{ | |||||
return _LkN.get().try_lock(); | |||||
} | |||||
template<class _Ty> | template<class _Ty> | ||||
void _Lock_from_locks(const int _Target, std::vector<_Ty>& _LkN) { // lock _LkN[_Target] | void _Lock_from_locks(const int _Target, std::vector<_Ty>& _LkN) { // lock _LkN[_Target] | ||||
_LkN[_Target].lock(); | |||||
_Lock_ref(_LkN[_Target]); | |||||
} | } | ||||
// FUNCTION TEMPLATE _Try_lock_from_locks | // FUNCTION TEMPLATE _Try_lock_from_locks | ||||
template<class _Ty> | template<class _Ty> | ||||
bool _Try_lock_from_locks(const int _Target, std::vector<_Ty>& _LkN) { // try to lock _LkN[_Target] | bool _Try_lock_from_locks(const int _Target, std::vector<_Ty>& _LkN) { // try to lock _LkN[_Target] | ||||
return _LkN[_Target].try_lock(); | |||||
return _Try_lock_ref(_LkN[_Target]); | |||||
} | } | ||||
// FUNCTION TEMPLATE _Unlock_locks | // FUNCTION TEMPLATE _Unlock_locks | ||||
template<class _Ty> | template<class _Ty> | ||||
void _Unlock_locks(int _First, int _Last, std::vector<_Ty>& _LkN) noexcept /* terminates */ { | void _Unlock_locks(int _First, int _Last, std::vector<_Ty>& _LkN) noexcept /* terminates */ { | ||||
for (; _First != _Last; ++_First) { | for (; _First != _Last; ++_First) { | ||||
_LkN[_First].unlock(); | |||||
_Unlock_ref(_LkN[_First]); | |||||
} | } | ||||
} | } | ||||
} | } | ||||
template <class _Lock0, class _Lock1> | template <class _Lock0, class _Lock1> | ||||
bool _Lock_attempt_small(_Lock0& _Lk0, _Lock1& _Lk1) | |||||
bool _Lock_attempt_small2(_Lock0& _Lk0, _Lock1& _Lk1) | |||||
{ | { | ||||
// attempt to lock 2 locks, by first locking _Lk0, and then trying to lock _Lk1 returns whether to try again | // attempt to lock 2 locks, by first locking _Lk0, and then trying to lock _Lk1 returns whether to try again | ||||
_Lk0.lock(); | |||||
_Lock_ref(_Lk0); | |||||
try { | try { | ||||
if (_Lk1.try_lock()) | |||||
if (_Try_lock_ref(_Lk1)) | |||||
return false; | return false; | ||||
} | } | ||||
catch (...) { | catch (...) { | ||||
_Lk0.unlock(); | |||||
_Unlock_ref(_Lk0); | |||||
throw; | throw; | ||||
} | } | ||||
_Lk0.unlock(); | |||||
_Unlock_ref(_Lk0); | |||||
std::this_thread::yield(); | std::this_thread::yield(); | ||||
return true; | return true; | ||||
} | } | ||||
void _Lock_nonmember2(_Lock0& _Lk0, _Lock1& _Lk1) | void _Lock_nonmember2(_Lock0& _Lk0, _Lock1& _Lk1) | ||||
{ | { | ||||
// lock 2 locks, without deadlock, special case for better codegen and reduced metaprogramming for common case | // lock 2 locks, without deadlock, special case for better codegen and reduced metaprogramming for common case | ||||
while (_Lock_attempt_small(_Lk0, _Lk1) && _Lock_attempt_small(_Lk1, _Lk0)) { // keep trying | |||||
while (_Lock_attempt_small2(_Lk0, _Lk1) && _Lock_attempt_small2(_Lk1, _Lk0)) { // keep trying | |||||
} | } | ||||
} | } | ||||
} | } | ||||
else if (lockes.size() == 1) | else if (lockes.size() == 1) | ||||
{ | { | ||||
lockes[0].lock(); | |||||
_Lock_ref(lockes[0]); | |||||
} | } | ||||
else if (lockes.size() == 2) | else if (lockes.size() == 2) | ||||
{ | { |
intptr_t counter = 0; | intptr_t counter = 0; | ||||
for (;;) | for (;;) | ||||
{ | { | ||||
if (co_await evt.wait_for(500ms)) | |||||
if (co_await evt.wait_for(100ms)) | |||||
break; | break; | ||||
++counter; | ++counter; | ||||
std::cout << "."; | std::cout << "."; | ||||
std::cout << counter << std::endl; | std::cout << counter << std::endl; | ||||
}; | }; | ||||
async_set_event(evt, 10s + 50ms); | |||||
//go resumalbe_set_event(evt, 10s + 50ms); | |||||
async_set_event(evt, 2s + 50ms); | |||||
//go resumalbe_set_event(evt, 2s + 50ms); | |||||
this_scheduler()->run_until_notask(); | this_scheduler()->run_until_notask(); | ||||
} | } | ||||
intptr_t counter = 0; | intptr_t counter = 0; | ||||
for (;;) | for (;;) | ||||
{ | { | ||||
if (co_await event_t::wait_all_for(500ms, evts)) | |||||
if (co_await event_t::wait_all_for(1500ms, evts)) | |||||
{ | { | ||||
std::cout << counter << std::endl; | std::cout << counter << std::endl; | ||||
std::cout << "all event signal!" << std::endl; | std::cout << "all event signal!" << std::endl; | ||||
++counter; | ++counter; | ||||
std::cout << "."; | std::cout << "."; | ||||
std::cout << "timeout!" << std::endl; | |||||
break; | |||||
} | } | ||||
}; | }; | ||||
srand((int)time(nullptr)); | srand((int)time(nullptr)); | ||||
for (auto & e : evts) | for (auto & e : evts) | ||||
{ | { | ||||
go resumalbe_set_event(e, 1ms * (1000 + rand() % 5000)); | |||||
//async_set_event(e, 1ms * (1000 + rand() % 5000)); | |||||
//go resumalbe_set_event(e, 1ms * (1000 + rand() % 5000)); | |||||
async_set_event(e, 1ms * (1000 + rand() % 1000)); | |||||
} | } | ||||
this_scheduler()->run_until_notask(); | this_scheduler()->run_until_notask(); |
{ | { | ||||
using namespace std::chrono; | using namespace std::chrono; | ||||
(void)resumef::sleep_for(100ms); //incorrect!!! | |||||
(void)sleep_for(100ms); //incorrect!!! | |||||
co_await resumef::sleep_for(100ms); | |||||
co_await sleep_for(100ms); | |||||
std::cout << "sleep_for 100ms." << std::endl; | std::cout << "sleep_for 100ms." << std::endl; | ||||
co_await 100ms; | co_await 100ms; | ||||
std::cout << "co_await 100ms." << std::endl; | std::cout << "co_await 100ms." << std::endl; | ||||
try | try | ||||
{ | { | ||||
co_await resumef::sleep_until(system_clock::now() + 200ms); | |||||
co_await sleep_until(system_clock::now() + 200ms); | |||||
std::cout << "timer after 200ms." << std::endl; | std::cout << "timer after 200ms." << std::endl; | ||||
} | } | ||||
catch (resumef::timer_canceled_exception) | |||||
catch (timer_canceled_exception) | |||||
{ | { | ||||
std::cout << "timer canceled." << std::endl; | std::cout << "timer canceled." << std::endl; | ||||
} | } | ||||
{ | { | ||||
go[&, i]() -> future_t<> | go[&, i]() -> future_t<> | ||||
{ | { | ||||
co_await resumef::sleep_for(1ms * (500 + rand() % 1000)); | |||||
co_await sleep_for(1ms * (500 + rand() % 1000)); | |||||
evts[i].signal(); | evts[i].signal(); | ||||
std::cout << "event[ " << i << " ] signal!" << std::endl; | std::cout << "event[ " << i << " ] signal!" << std::endl; | ||||
}; | }; |
//test_ring_queue<resumef::ring_queue_lockfree<int, uint64_t>>(); | //test_ring_queue<resumef::ring_queue_lockfree<int, uint64_t>>(); | ||||
//resumable_main_event_v2(); | //resumable_main_event_v2(); | ||||
resumable_main_event(); | |||||
//resumable_main_event(); | |||||
//resumable_main_event_timeout(); | //resumable_main_event_timeout(); | ||||
//resumable_main_sleep(); | //resumable_main_sleep(); | ||||
return 0; | |||||
//return 0; | |||||
//if (argc > 1) | //if (argc > 1) | ||||
// resumable_main_benchmark_asio_client(atoi(argv[1])); | // resumable_main_benchmark_asio_client(atoi(argv[1])); |