@@ -38,6 +38,7 @@ | |||
#else | |||
#include <experimental/coroutine> | |||
#endif | |||
#include "src/stop_token.hpp" | |||
#include "src/config.h" | |||
@@ -51,6 +51,11 @@ namespace resumef | |||
return future_type{ this->_state }; | |||
} | |||
explicit operator bool() const noexcept | |||
{ | |||
return _state.get() != nullptr; | |||
} | |||
/** | |||
* @brief 管理的state_t<>对象。 | |||
*/ |
@@ -14,9 +14,9 @@ | |||
#ifndef RESUMEF_ENABLE_CONCEPT | |||
#ifdef __cpp_lib_concepts | |||
#define RESUMEF_ENABLE_CONCEPT 0 | |||
/* #undef RESUMEF_ENABLE_CONCEPT */ | |||
#else | |||
#define RESUMEF_ENABLE_CONCEPT 0 | |||
/* #undef RESUMEF_ENABLE_CONCEPT */ | |||
#endif //#ifdef __cpp_lib_concepts | |||
#endif //#ifndef RESUMEF_ENABLE_CONCEPT | |||
@@ -34,6 +34,12 @@ namespace resumef | |||
template<class... _Mutexes> | |||
using scoped_lock = std::scoped_lock<_Mutexes...>; | |||
using stop_source = milk::concurrency::stop_source; | |||
using stop_token = milk::concurrency::stop_token; | |||
template<typename Callback> | |||
using stop_callback = milk::concurrency::stop_callback<Callback>; | |||
using milk::concurrency::nostopstate; | |||
/** | |||
* @brief 版本号。 | |||
*/ | |||
@@ -74,6 +80,17 @@ namespace resumef | |||
return std::is_empty_v<_Ty> ? 0 : | |||
(sizeof(_Ty) + _ALIGN_REQ - 1) & ~(_ALIGN_REQ - 1); | |||
} | |||
template<class _Callable> | |||
auto make_stop_callback(const stop_token& token, _Callable&& cb) ->std::unique_ptr<stop_callback<_Callable>> | |||
{ | |||
return std::make_unique<stop_callback<_Callable>>(token, cb); | |||
} | |||
template<class _Callable> | |||
auto make_stop_callback(stop_token&& token, _Callable&& cb) ->std::unique_ptr<stop_callback<_Callable>> | |||
{ | |||
return std::make_unique<stop_callback<_Callable>>(std::move(token), cb); | |||
} | |||
} | |||
#include "exception.inl" |
@@ -173,7 +173,9 @@ namespace resumef | |||
_state = new detail::state_event_t(_event); | |||
_event = nullptr; | |||
(void)_state->on_await_suspend(handler); | |||
cb(); | |||
if constexpr (!std::is_same_v<std::remove_reference_t<_Timeout>, std::nullptr_t>) | |||
cb(); | |||
evt->add_wait_list(_state.get()); | |||
@@ -183,7 +185,7 @@ namespace resumef | |||
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>> | |||
bool await_suspend(coroutine_handle<_PromiseT> handler) | |||
{ | |||
return await_suspend2(handler, []{}); | |||
return await_suspend2(handler, nullptr); | |||
} | |||
bool await_resume() noexcept | |||
@@ -289,7 +291,9 @@ namespace resumef | |||
_state = new detail::state_event_t(_event); | |||
(void)_state->on_await_suspend(handler); | |||
cb(); | |||
if constexpr (!std::is_same_v<std::remove_reference_t<_Timeout>, std::nullptr_t>) | |||
cb(); | |||
for (auto iter = _begin; iter != _end; ++iter) | |||
{ | |||
@@ -303,7 +307,7 @@ namespace resumef | |||
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>> | |||
bool await_suspend(coroutine_handle<_PromiseT> handler) | |||
{ | |||
return await_suspend2(handler, []{}); | |||
return await_suspend2(handler, nullptr); | |||
} | |||
intptr_t await_resume() noexcept | |||
@@ -404,7 +408,9 @@ namespace resumef | |||
_state = new detail::state_event_all_t(count, _value); | |||
(void)_state->on_await_suspend(handler); | |||
cb(); | |||
if constexpr (!std::is_same_v<std::remove_reference_t<_Timeout>, std::nullptr_t>) | |||
cb(); | |||
batch_lock_t<ref_lock_type> lock_(lockes); | |||
@@ -435,7 +441,7 @@ namespace resumef | |||
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>> | |||
bool await_suspend(coroutine_handle<_PromiseT> handler) | |||
{ | |||
return await_suspend2(handler, []{}); | |||
return await_suspend2(handler, nullptr); | |||
} | |||
bool await_resume() noexcept |
@@ -15,6 +15,7 @@ namespace resumef | |||
read_before_write, ///< 0容量的channel,先读后写 | |||
timer_canceled, ///< 定时器被意外取消 | |||
not_await_lock, ///< 没有在协程中使用co_await等待lock结果 | |||
stop_requested, ///< stop_source触发了 | |||
max__ | |||
}; | |||
@@ -67,11 +68,11 @@ namespace resumef | |||
/** | |||
* @brief 定时器提前取消导致的异常。 | |||
*/ | |||
struct timer_canceled_exception : public std::logic_error | |||
struct canceled_exception : public std::logic_error | |||
{ | |||
error_code _error; | |||
timer_canceled_exception(error_code fe) | |||
: logic_error(get_error_string(fe, "timer canceled")) | |||
canceled_exception(error_code fe) | |||
: logic_error(get_error_string(fe, "canceled_exception")) | |||
, _error(fe) | |||
{ | |||
} |
@@ -222,7 +222,9 @@ namespace resumef | |||
_state = new detail::state_mutex_t(_mutex); | |||
_state->on_await_suspend(handler, parent->get_scheduler(), _root); | |||
cb(); | |||
if constexpr (!std::is_same_v<std::remove_reference_t<_Timeout>, std::nullptr_t>) | |||
cb(); | |||
_mutex->add_wait_list_lockless(_state.get()); | |||
@@ -241,7 +243,7 @@ namespace resumef | |||
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>> | |||
bool await_suspend(coroutine_handle<_PromiseT> handler) | |||
{ | |||
return await_suspend2(handler, []{}); | |||
return await_suspend2(handler, nullptr); | |||
} | |||
batch_unlock_t<mutex_t> await_resume() noexcept | |||
{ | |||
@@ -268,7 +270,7 @@ namespace resumef | |||
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>> | |||
bool await_suspend(coroutine_handle<_PromiseT> handler) | |||
{ | |||
return await_suspend2(handler, []{}); | |||
return await_suspend2(handler, nullptr); | |||
} | |||
void await_resume() noexcept | |||
{ |
@@ -3,11 +3,23 @@ | |||
namespace resumef | |||
{ | |||
task_t::task_t() | |||
: _stop(nostopstate) | |||
{ | |||
} | |||
task_t::~task_t() | |||
{ | |||
///TODO : 这里有线程安全问题(2020/05/09) | |||
_stop.clear_callback(); | |||
///TODO : 这里有线程安全问题(2020/05/09) | |||
} | |||
const stop_source & task_t::get_stop_source() | |||
{ | |||
///TODO : 这里有线程安全问题(2020/05/09) | |||
_stop.make_possible(); | |||
///TODO : 这里有线程安全问题(2020/05/09) | |||
return _stop; | |||
} | |||
} |
@@ -19,9 +19,22 @@ namespace resumef | |||
task_t(); | |||
virtual ~task_t(); | |||
/// TODO : 存在BUG(2020/05/09) | |||
const stop_source & get_stop_source(); | |||
/// TODO : 存在BUG(2020/05/09) | |||
stop_token get_stop_token() | |||
{ | |||
return get_stop_source().get_token(); | |||
} | |||
/// TODO : 存在BUG(2020/05/09) | |||
bool request_stop() | |||
{ | |||
return get_stop_source().request_stop(); | |||
} | |||
protected: | |||
friend scheduler_t; | |||
counted_ptr<state_base_t> _state; | |||
stop_source _stop; | |||
}; | |||
#endif | |||
@@ -19,6 +19,7 @@ namespace resumef | |||
"read_before_write", | |||
"timer_canceled", | |||
"not_await_lock", | |||
"stop_requested", | |||
}; | |||
char sz_future_error_buffer[256]; |
@@ -10,7 +10,7 @@ namespace resumef | |||
[awaitable](bool cancellation_requested) | |||
{ | |||
if (cancellation_requested) | |||
awaitable.throw_exception(timer_canceled_exception{ error_code::timer_canceled }); | |||
awaitable.throw_exception(canceled_exception{ error_code::timer_canceled }); | |||
else | |||
awaitable.set_value(); | |||
}); |
@@ -0,0 +1,594 @@ | |||
/************************************************ | |||
* @ Bright Dream Robotics-Fundamental Research | |||
* @ Author BIP:zhangwencong | |||
* @ V1.0 | |||
*************************************************/ | |||
#pragma once | |||
namespace milk | |||
{ | |||
namespace concurrency | |||
{ | |||
namespace details | |||
{ | |||
struct stop_callback_base | |||
{ | |||
void(*callback)(stop_callback_base*) = nullptr; | |||
stop_callback_base* next = nullptr; | |||
stop_callback_base** prev = nullptr; | |||
bool* is_removed = nullptr; | |||
std::atomic<bool> finished_executing{false}; | |||
void execute() noexcept | |||
{ | |||
callback(this); | |||
} | |||
protected: | |||
stop_callback_base(void(*fnptr)(stop_callback_base*)) | |||
: callback(fnptr) {} | |||
~stop_callback_base() = default; | |||
}; | |||
struct stop_state | |||
{ | |||
static constexpr uint64_t kStopRequestedFlag = 1u; | |||
static constexpr uint64_t kLockedFlag = 2u; | |||
static constexpr uint64_t kTokenRefIncrement = 4u; | |||
static constexpr uint64_t kSourceRefIncrement = static_cast<uint64_t>(1u) << 33u; | |||
static constexpr uint64_t kZeroRef = kTokenRefIncrement + kSourceRefIncrement; | |||
static constexpr uint64_t kLockedAndZeroRef = kZeroRef + kZeroRef; | |||
static constexpr uint64_t kLockAndStopedFlag = kStopRequestedFlag | kLockedFlag; | |||
static constexpr uint64_t kUnlockAndTokenRefIncrement = kLockedFlag - kTokenRefIncrement; | |||
static constexpr uint64_t klockAndTokenRefIncrement = kLockedFlag + kTokenRefIncrement; | |||
// bit 0 - stop-requested | |||
// bit 1 - locked43 | |||
// bits 2-32 - token ref count (31 bits) | |||
// bits 33-63 - source ref count (31 bits) | |||
std::atomic<uint64_t> state_{kSourceRefIncrement}; | |||
stop_callback_base* head_ = nullptr; | |||
std::thread::id signalling_{}; | |||
static bool is_locked(uint64_t state) noexcept | |||
{ | |||
return (state & kLockedFlag) != 0; | |||
} | |||
static bool is_stop_requested(uint64_t state) noexcept | |||
{ | |||
return (state & kStopRequestedFlag) != 0; | |||
} | |||
static bool is_stop_requestable(uint64_t state) noexcept | |||
{ | |||
return is_stop_requested(state) || (state >= kSourceRefIncrement); | |||
} | |||
bool try_lock_and_signal_until_signalled() noexcept | |||
{ | |||
uint64_t old_state = state_.load(std::memory_order_acquire); | |||
do | |||
{ | |||
if (is_stop_requested(old_state)) | |||
{ | |||
return false; | |||
} | |||
while (is_locked(old_state)) | |||
{ | |||
std::this_thread::yield(); | |||
old_state = state_.load(std::memory_order_acquire); | |||
if (is_stop_requested(old_state)) | |||
{ | |||
return false; | |||
} | |||
} | |||
} while(!state_.compare_exchange_weak(old_state, old_state | kLockAndStopedFlag, std::memory_order_acq_rel, std::memory_order_acquire)); | |||
return true; | |||
} | |||
void lock() noexcept | |||
{ | |||
uint64_t old_state = state_.load(std::memory_order_relaxed); | |||
do | |||
{ | |||
while (is_locked(old_state)) | |||
{ | |||
std::this_thread::yield(); | |||
old_state = state_.load(std::memory_order_relaxed); | |||
} | |||
} | |||
while (!state_.compare_exchange_weak(old_state, old_state | kLockedFlag, std::memory_order_acquire, std::memory_order_relaxed)); | |||
} | |||
void unlock() noexcept | |||
{ | |||
state_.fetch_sub(kLockedFlag, std::memory_order_release); | |||
} | |||
void unlock_and_increment_token_ref_count() noexcept | |||
{ | |||
state_.fetch_sub(kUnlockAndTokenRefIncrement, std::memory_order_release); | |||
} | |||
void unlock_and_decrement_token_ref_count() noexcept | |||
{ | |||
if (state_.fetch_sub(klockAndTokenRefIncrement, std::memory_order_acq_rel) < kLockedAndZeroRef) | |||
{ | |||
clear_callback(); | |||
delete this; | |||
} | |||
} | |||
public: | |||
void add_token_reference() noexcept | |||
{ | |||
state_.fetch_add(kTokenRefIncrement, std::memory_order_relaxed); | |||
} | |||
void remove_token_reference() noexcept | |||
{ | |||
auto old_state = state_.fetch_sub(kTokenRefIncrement, std::memory_order_acq_rel); | |||
if (old_state < kZeroRef) | |||
{ | |||
clear_callback(); | |||
delete this; | |||
} | |||
} | |||
void add_source_reference() noexcept | |||
{ | |||
state_.fetch_add(kSourceRefIncrement, std::memory_order_relaxed); | |||
} | |||
void remove_source_reference() noexcept | |||
{ | |||
auto old_state = state_.fetch_sub(kSourceRefIncrement, std::memory_order_acq_rel); | |||
if (old_state < kZeroRef) | |||
{ | |||
clear_callback(); | |||
delete this; | |||
} | |||
} | |||
bool request_stop() noexcept | |||
{ | |||
if (!try_lock_and_signal_until_signalled()) | |||
{ | |||
return false; | |||
} | |||
signalling_ = std::this_thread::get_id(); | |||
while (head_) | |||
{ | |||
auto* cb = head_; | |||
head_ = cb->next; | |||
const bool any_more = head_ != nullptr; | |||
if (any_more) | |||
{ | |||
head_->prev = &head_; | |||
} | |||
cb->prev = nullptr; | |||
unlock(); | |||
bool removed = false; | |||
cb->is_removed = &removed; | |||
cb->execute(); | |||
if (!removed) | |||
{ | |||
cb->is_removed = nullptr; | |||
cb->finished_executing.store(true, std::memory_order_release); | |||
} | |||
if (!any_more) | |||
{ | |||
return true; | |||
} | |||
lock(); | |||
} | |||
unlock(); | |||
return true; | |||
} | |||
bool is_stop_requested() noexcept | |||
{ | |||
return is_stop_requested(state_.load(std::memory_order_acquire)); | |||
} | |||
bool is_stop_requestable() noexcept | |||
{ | |||
return is_stop_requestable(state_.load(std::memory_order_acquire)); | |||
} | |||
bool try_add_callback(stop_callback_base* cb, bool increment_ref_count_if_successful) noexcept | |||
{ | |||
uint64_t old_state; | |||
goto __load_state; | |||
do | |||
{ | |||
goto __check_state; | |||
do | |||
{ | |||
std::this_thread::yield(); | |||
__load_state: | |||
old_state = state_.load(std::memory_order_acquire); | |||
__check_state: | |||
if (is_stop_requested(old_state)) | |||
{ | |||
cb->execute(); | |||
return false; | |||
} | |||
else if (!is_stop_requestable(old_state)) | |||
{ | |||
return false; | |||
} | |||
} while (is_locked(old_state)); | |||
}while (!state_.compare_exchange_weak(old_state, old_state | kLockedFlag, std::memory_order_acquire)); | |||
// callback入队列 | |||
cb->next = head_; | |||
if (cb->next) | |||
{ | |||
cb->next->prev = &cb->next; | |||
} | |||
cb->prev = &head_; | |||
head_ = cb; | |||
if (increment_ref_count_if_successful) | |||
{ | |||
unlock_and_increment_token_ref_count(); | |||
} | |||
else | |||
{ | |||
unlock(); | |||
} | |||
return true; | |||
} | |||
void remove_callback(stop_callback_base* cb) noexcept | |||
{ | |||
lock(); | |||
if (cb->prev) | |||
{ | |||
*cb->prev = cb->next; | |||
if (cb->next) | |||
{ | |||
cb->next->prev = cb->prev; | |||
} | |||
unlock_and_decrement_token_ref_count(); | |||
return; | |||
} | |||
unlock(); | |||
// Callback要么已经执行,要么正在另一个线程上并发执行。 | |||
if (signalling_ == std::this_thread::get_id()) | |||
{ | |||
// 若Callback是当前线程上执行的回调或当前仍在执行并在回调中注销自身的回调。 | |||
if (cb->is_removed) | |||
{ | |||
// 当前在Callback执行中,告知request_stop()知道对象即将被销毁,不能尝试访问 | |||
*cb->is_removed = true; | |||
} | |||
} | |||
else | |||
{ | |||
// 等待其他线程执行Callback完成 | |||
while (!cb->finished_executing.load(std::memory_order_acquire)) | |||
{ | |||
std::this_thread::yield(); | |||
} | |||
} | |||
remove_token_reference(); | |||
} | |||
void clear_callback() noexcept | |||
{ | |||
lock(); | |||
stop_callback_base* cb = head_; | |||
head_ = nullptr; | |||
while (cb) | |||
{ | |||
stop_callback_base* tmp = cb->next; | |||
cb->prev = nullptr; | |||
cb->next = nullptr; | |||
cb = tmp; | |||
} | |||
unlock(); | |||
} | |||
}; | |||
}//namespace details | |||
struct nostopstate_t { explicit nostopstate_t() = default; }; | |||
inline constexpr nostopstate_t nostopstate{}; | |||
class stop_source; | |||
template<typename Callback> | |||
class stop_callback; | |||
class stop_token | |||
{ | |||
private: | |||
details::stop_state* state_; | |||
friend class stop_source; | |||
template<typename> friend class stop_callback; | |||
explicit stop_token(details::stop_state* state) noexcept | |||
: state_(state) | |||
{ | |||
if (state_) | |||
{ | |||
state_->add_token_reference(); | |||
} | |||
} | |||
public: | |||
stop_token() noexcept | |||
: state_(nullptr) | |||
{ | |||
} | |||
stop_token(const stop_token& other) noexcept | |||
: state_(other.state_) | |||
{ | |||
if (state_) | |||
{ | |||
state_->add_token_reference(); | |||
} | |||
} | |||
stop_token(stop_token&& other) noexcept | |||
: state_(std::exchange(other.state_, nullptr)) | |||
{ | |||
} | |||
~stop_token() | |||
{ | |||
if (state_) | |||
{ | |||
state_->remove_token_reference(); | |||
} | |||
} | |||
stop_token& operator=(const stop_token& other) noexcept | |||
{ | |||
if (state_ != other.state_) | |||
{ | |||
stop_token tmp{other}; | |||
swap(tmp); | |||
} | |||
return *this; | |||
} | |||
stop_token& operator=(stop_token&& other) noexcept | |||
{ | |||
stop_token tmp{std::move(other)}; | |||
swap(tmp); | |||
return *this; | |||
} | |||
void swap(stop_token& other) noexcept | |||
{ | |||
std::swap(state_, other.state_); | |||
} | |||
[[nodiscard]] | |||
bool stop_requested() const noexcept | |||
{ | |||
return state_ && state_->is_stop_requested(); | |||
} | |||
[[nodiscard]] | |||
bool stop_possible() const noexcept | |||
{ | |||
return state_ && state_->is_stop_requestable(); | |||
} | |||
[[nodiscard]] | |||
bool operator==(const stop_token& other) noexcept | |||
{ | |||
return state_ == other.state_; | |||
} | |||
[[nodiscard]] | |||
bool operator!=(const stop_token& other) noexcept | |||
{ | |||
return state_ != other.state_; | |||
} | |||
}; | |||
class stop_source | |||
{ | |||
private: | |||
details::stop_state* state_; | |||
public: | |||
stop_source() | |||
: state_(new details::stop_state()) | |||
{ | |||
} | |||
explicit stop_source(nostopstate_t) noexcept | |||
: state_(nullptr) | |||
{ | |||
} | |||
~stop_source() | |||
{ | |||
if (state_) | |||
{ | |||
state_->remove_source_reference(); | |||
} | |||
} | |||
stop_source(const stop_source& other) noexcept | |||
: state_(other.state_) | |||
{ | |||
if (state_) | |||
{ | |||
state_->add_source_reference(); | |||
} | |||
} | |||
stop_source(stop_source&& other) noexcept | |||
: state_(std::exchange(other.state_, nullptr)) | |||
{ | |||
} | |||
stop_source& operator=(stop_source&& other) noexcept | |||
{ | |||
stop_source tmp{std::move(other)}; | |||
swap(tmp); | |||
return *this; | |||
} | |||
stop_source& operator=(const stop_source& other) noexcept | |||
{ | |||
if (state_ != other.state_) | |||
{ | |||
stop_source tmp{other}; | |||
swap(tmp); | |||
} | |||
return *this; | |||
} | |||
[[nodiscard]] | |||
bool stop_requested() const noexcept | |||
{ | |||
return state_ && state_->is_stop_requested(); | |||
} | |||
[[nodiscard]] | |||
bool stop_possible() const noexcept | |||
{ | |||
return state_ != nullptr; | |||
} | |||
void make_possible() | |||
{ | |||
if (state_ == nullptr) | |||
{ | |||
details::stop_state* st = new details::stop_state(); | |||
details::stop_state* tmp = nullptr; | |||
if (!std::atomic_compare_exchange_strong_explicit( | |||
reinterpret_cast<std::atomic<details::stop_state*>*>(&state_), &tmp, st, std::memory_order_release, std::memory_order_acquire)) | |||
{ | |||
st->remove_source_reference(); | |||
} | |||
} | |||
} | |||
bool request_stop() const noexcept | |||
{ | |||
if (state_) | |||
{ | |||
return state_->request_stop(); | |||
} | |||
return false; | |||
} | |||
[[nodiscard]] | |||
stop_token get_token() const noexcept | |||
{ | |||
return stop_token{state_}; | |||
} | |||
void clear_callback() const noexcept | |||
{ | |||
if (state_) | |||
{ | |||
state_->clear_callback(); | |||
} | |||
} | |||
void swap(stop_source& other) noexcept | |||
{ | |||
std::swap(state_, other.state_); | |||
} | |||
[[nodiscard]] | |||
bool operator==(const stop_source& other) noexcept | |||
{ | |||
return state_ == other.state_; | |||
} | |||
[[nodiscard]] | |||
bool operator!=(const stop_source& other) noexcept | |||
{ | |||
return state_ != other.state_; | |||
} | |||
}; | |||
template <typename Callback> | |||
class [[nodiscard]] stop_callback : private details::stop_callback_base | |||
{ | |||
private: | |||
using details::stop_callback_base::execute; | |||
using callack_type = Callback; | |||
details::stop_state* state_; | |||
callack_type callack_; | |||
void execute() noexcept | |||
{ | |||
callack_(); | |||
} | |||
public: | |||
using callback_type = Callback; | |||
template<typename Callable, typename = typename std::enable_if<std::is_constructible<Callback, Callable>::value, int>::type> | |||
explicit stop_callback(const stop_token& token, Callable&& cb) noexcept(std::is_nothrow_constructible<Callback, Callable>::value) | |||
: details::stop_callback_base([](details::stop_callback_base* that) noexcept { static_cast<stop_callback*>(that)->execute(); }) | |||
, state_(nullptr) | |||
, callack_(static_cast<Callable&&>(cb)) | |||
{ | |||
if (token.state_ && token.state_->try_add_callback(this, true)) | |||
{ | |||
state_ = token.state_; | |||
} | |||
} | |||
template<typename Callable, typename = typename std::enable_if<std::is_constructible<Callback, Callable>::value, int>::type> | |||
explicit stop_callback(stop_token&& token, Callable&& cb) noexcept(std::is_nothrow_constructible<Callback, Callable>::value) | |||
: details::stop_callback_base([](details::stop_callback_base* that) noexcept { static_cast<stop_callback*>(that)->execute(); }) | |||
, state_(nullptr) | |||
, callack_(static_cast<Callable&&>(cb)) | |||
{ | |||
if (token.state_ && token.state_->try_add_callback(this, false)) | |||
{ | |||
state_ = std::exchange(token.state_, nullptr); | |||
} | |||
} | |||
~stop_callback() | |||
{ | |||
if (state_) | |||
{ | |||
state_->remove_callback(this); | |||
} | |||
} | |||
stop_callback& operator=(const stop_callback&) = delete; | |||
stop_callback& operator=(stop_callback&&) = delete; | |||
stop_callback(const stop_callback&) = delete; | |||
stop_callback(stop_callback&&) = delete; | |||
}; | |||
template<typename Callback> | |||
stop_callback(stop_token, Callback)->stop_callback<Callback>;//C++17 deduction guides | |||
}//namespace concurrency | |||
}//namespace milk |
@@ -22,6 +22,7 @@ extern void resumable_main_channel_mult_thread(); | |||
extern void resumable_main_when_all(); | |||
extern void resumable_main_layout(); | |||
extern void resumable_main_switch_scheduler(); | |||
extern void resumable_main_stop_token(); | |||
extern void resumable_main_benchmark_mem(bool wait_key); | |||
extern void benchmark_main_channel_passing_next(); | |||
@@ -34,7 +35,7 @@ int main(int argc, const char* argv[]) | |||
(void)argc; | |||
(void)argv; | |||
//resumable_main_mutex(); | |||
//resumable_main_stop_token(); | |||
//return 0; | |||
//if (argc > 1) | |||
@@ -63,6 +64,7 @@ int main(int argc, const char* argv[]) | |||
resumable_main_sleep(); | |||
resumable_main_when_all(); | |||
resumable_main_switch_scheduler(); | |||
//resumable_main_stop_token(); | |||
std::cout << "ALL OK!" << std::endl; | |||
benchmark_main_channel_passing_next(); //ÕâÊÇÒ»¸öËÀÑ»·²âÊÔ |
@@ -24,7 +24,7 @@ future_t<> test_sleep_use_timer() | |||
co_await sleep_until(system_clock::now() + 200ms); | |||
std::cout << "timer after 200ms." << std::endl; | |||
} | |||
catch (timer_canceled_exception) | |||
catch (canceled_exception) | |||
{ | |||
std::cout << "timer canceled." << std::endl; | |||
} |
@@ -0,0 +1,95 @@ | |||
#include <chrono> | |||
#include <iostream> | |||
#include <string> | |||
#include <thread> | |||
#include "librf.h" | |||
using namespace resumef; | |||
using namespace std::chrono; | |||
//token触发停止后,将不再调用cb | |||
template<class _Ctype> | |||
static void callback_get_long_with_stop(stop_token token, int64_t val, _Ctype&& cb) | |||
{ | |||
std::thread([val, token = std::move(token), cb = std::forward<_Ctype>(cb)] | |||
{ | |||
for (int i = 0; i < 10; ++i) | |||
{ | |||
if (token.stop_requested()) | |||
return; | |||
std::this_thread::sleep_for(10ms); | |||
} | |||
cb(val * val); | |||
}).detach(); | |||
} | |||
//token触发后,设置canceled_exception异常。 | |||
static future_t<int64_t> async_get_long_with_stop(stop_token token, int64_t val) | |||
{ | |||
awaitable_t<int64_t> awaitable; | |||
//保证stopptr的生存期,与callback_get_long_with_cancel()的回调参数的生存期一致。 | |||
//如果token已经被取消,则传入的lambda会立即被调用,则awaitable将不能再set_value | |||
auto stopptr = make_stop_callback(token, [awaitable] | |||
{ | |||
if (awaitable) | |||
awaitable.throw_exception(canceled_exception(error_code::stop_requested)); | |||
}); | |||
if (awaitable) //处理已经被取消的情况 | |||
{ | |||
callback_get_long_with_stop(token, val, [awaitable, stopptr = std::move(stopptr)](int64_t val) | |||
{ | |||
if (awaitable) | |||
awaitable.set_value(val); | |||
}); | |||
} | |||
return awaitable.get_future(); | |||
} | |||
//如果关联的协程被取消了,则触发canceled_exception异常。 | |||
static future_t<int64_t> async_get_long_with_stop(int64_t val) | |||
{ | |||
task_t* task = current_task(); | |||
co_return co_await async_get_long_with_stop(task->get_stop_token(), val); | |||
} | |||
//测试取消协程 | |||
static void test_get_long_with_stop(int64_t val) | |||
{ | |||
//异步获取值的协程 | |||
task_t* task = GO | |||
{ | |||
try | |||
{ | |||
int64_t result = co_await async_get_long_with_stop(val); | |||
std::cout << result << std::endl; | |||
} | |||
catch (std::logic_error& e) | |||
{ | |||
std::cout << e.what() << std::endl; | |||
} | |||
}; | |||
//task的生命周期只在task代表的协程生存期间存在。 | |||
//但通过复制与其关联的stop_source,生存期可以超过task的生存期。 | |||
stop_source stops = task->get_stop_source(); | |||
//取消上一个协程的延迟协程 | |||
GO | |||
{ | |||
co_await sleep_for(1ms * (rand() % 300)); | |||
stops.request_stop(); | |||
}; | |||
this_scheduler()->run_until_notask(); | |||
} | |||
void resumable_main_stop_token() | |||
{ | |||
srand((int)time(nullptr)); | |||
for (int i = 0; i < 10; ++i) | |||
test_get_long_with_stop(i); | |||
} |
@@ -34,7 +34,7 @@ | |||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration"> | |||
<ConfigurationType>Application</ConfigurationType> | |||
<UseDebugLibraries>false</UseDebugLibraries> | |||
<PlatformToolset>v142</PlatformToolset> | |||
<PlatformToolset>ClangCL</PlatformToolset> | |||
<WholeProgramOptimization>true</WholeProgramOptimization> | |||
<CharacterSet>NotSet</CharacterSet> | |||
</PropertyGroup> | |||
@@ -46,7 +46,7 @@ | |||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration"> | |||
<ConfigurationType>Application</ConfigurationType> | |||
<UseDebugLibraries>false</UseDebugLibraries> | |||
<PlatformToolset>v142</PlatformToolset> | |||
<PlatformToolset>ClangCL</PlatformToolset> | |||
<WholeProgramOptimization>true</WholeProgramOptimization> | |||
<CharacterSet>NotSet</CharacterSet> | |||
</PropertyGroup> | |||
@@ -213,6 +213,7 @@ | |||
<ClCompile Include="..\tutorial\test_async_resumable.cpp" /> | |||
<ClCompile Include="..\tutorial\test_async_routine.cpp" /> | |||
<ClCompile Include="..\tutorial\test_async_sleep.cpp" /> | |||
<ClCompile Include="..\tutorial\test_async_stop_token.cpp" /> | |||
<ClCompile Include="..\tutorial\test_async_suspend_always.cpp" /> | |||
<ClCompile Include="..\tutorial\test_async_switch_scheduler.cpp" /> | |||
<ClCompile Include="..\tutorial\test_async_timer.cpp" /> | |||
@@ -249,6 +250,7 @@ | |||
<ClInclude Include="..\librf\src\sleep.h" /> | |||
<ClInclude Include="..\librf\src\spinlock.h" /> | |||
<ClInclude Include="..\librf\src\state.h" /> | |||
<ClInclude Include="..\librf\src\stop_token.hpp" /> | |||
<ClInclude Include="..\librf\src\switch_scheduler.h" /> | |||
<ClInclude Include="..\librf\src\timer.h" /> | |||
<ClInclude Include="..\librf\src\unix\clang_builtin.h" /> |
@@ -130,6 +130,9 @@ | |||
<ClCompile Include="..\tutorial\gcc_bugs.cpp"> | |||
<Filter>tutorial</Filter> | |||
</ClCompile> | |||
<ClCompile Include="..\tutorial\test_async_stop_token.cpp"> | |||
<Filter>tutorial</Filter> | |||
</ClCompile> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ClInclude Include="..\librf\librf.h"> | |||
@@ -252,6 +255,9 @@ | |||
<ClInclude Include="..\asio\asio_task.h"> | |||
<Filter>asio</Filter> | |||
</ClInclude> | |||
<ClInclude Include="..\librf\src\stop_token.hpp"> | |||
<Filter>librf\src</Filter> | |||
</ClInclude> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<None Include="..\librf\src\promise.inl"> |