Selaa lähdekoodia

完善协程支持stop_token相关功能

tags/2.9.10
tearshark 4 vuotta sitten
vanhempi
commit
ba29351ddd

+ 1
- 1
benchmark/benchmark_async_mem.cpp Näytä tiedosto

@@ -6,7 +6,7 @@
#include "librf.h"
const size_t N = 10000000;
const size_t N = 5000000;
const size_t LOOP_COUNT = 50;
std::atomic<size_t> globalValue{0};

+ 11
- 8
librf/src/awaitable.h Näytä tiedosto

@@ -30,8 +30,8 @@ namespace resumef
*/
void set_exception(std::exception_ptr e) const
{
this->_state->set_exception(std::move(e));
this->_state = nullptr;
counted_ptr<state_type> cp(std::move(this->_state));
cp->set_exception(std::move(e));
}

/**
@@ -82,6 +82,7 @@ namespace resumef
struct [[nodiscard]] awaitable_t : public awaitable_impl_t<_Ty>
{
using typename awaitable_impl_t<_Ty>::value_type;
using typename awaitable_impl_t<_Ty>::state_type;
using awaitable_impl_t<_Ty>::awaitable_impl_t;

/**
@@ -93,8 +94,8 @@ namespace resumef
template<class U>
void set_value(U&& value) const
{
this->_state->set_value(std::forward<U>(value));
this->_state = nullptr;
counted_ptr<state_type> cp(std::move(this->_state));
cp->set_value(std::forward<U>(value));
}
};

@@ -103,24 +104,26 @@ namespace resumef
struct [[nodiscard]] awaitable_t<_Ty&> : public awaitable_impl_t<_Ty&>
{
using typename awaitable_impl_t<_Ty&>::value_type;
using typename awaitable_impl_t<_Ty&>::state_type;
using awaitable_impl_t<_Ty&>::awaitable_impl_t;

void set_value(_Ty& value) const
{
this->_state->set_value(value);
this->_state = nullptr;
counted_ptr<state_type> cp(std::move(this->_state));
cp->set_value(value);
}
};

template<>
struct [[nodiscard]] awaitable_t<void> : public awaitable_impl_t<void>
{
using awaitable_impl_t<void>::state_type;
using awaitable_impl_t<void>::awaitable_impl_t;

void set_value() const
{
this->_state->set_value();
this->_state = nullptr;
counted_ptr<state_type> cp(std::move(this->_state));
cp->set_value();
}
};
#endif //DOXYGEN_SKIP_PROPERTY

+ 2
- 2
librf/src/channel_v2.h Näytä tiedosto

@@ -70,7 +70,7 @@ inline namespace channel_v2
#ifndef DOXYGEN_SKIP_PROPERTY
RESUMEF_REQUIRES(std::is_constructible_v<_Ty, U&&>)
#endif //DOXYGEN_SKIP_PROPERTY
write_awaiter operator << (U&& val) const noexcept(std::is_move_constructible_v<U>);
write_awaiter operator << (U&& val) const noexcept(std::is_nothrow_move_constructible_v<U>);

/**
* @brief 在协程中向channel_t里写入一个数据。
@@ -87,7 +87,7 @@ inline namespace channel_v2
#ifndef DOXYGEN_SKIP_PROPERTY
RESUMEF_REQUIRES(std::is_constructible_v<_Ty, U&&>)
#endif //DOXYGEN_SKIP_PROPERTY
write_awaiter write(U&& val) const noexcept(std::is_move_constructible_v<U>);
write_awaiter write(U&& val) const noexcept(std::is_nothrow_move_constructible_v<U>);


#ifndef DOXYGEN_SKIP_PROPERTY

+ 2
- 2
librf/src/channel_v2.inl Näytä tiedosto

@@ -490,7 +490,7 @@ inline namespace channel_v2
template<class _Ty, bool _Optional, bool _OptimizationThread>
template<class U COMMA_RESUMEF_ENABLE_IF_TYPENAME()> RESUMEF_REQUIRES(std::is_constructible_v<_Ty, U&&>)
typename channel_t<_Ty, _Optional, _OptimizationThread>::write_awaiter
channel_t<_Ty, _Optional, _OptimizationThread>::write(U&& val) const noexcept(std::is_move_constructible_v<U>)
channel_t<_Ty, _Optional, _OptimizationThread>::write(U&& val) const noexcept(std::is_nothrow_move_constructible_v<U>)
{
return write_awaiter{ _chan.get(), std::forward<U>(val) };
}
@@ -498,7 +498,7 @@ inline namespace channel_v2
template<class _Ty, bool _Optional, bool _OptimizationThread>
template<class U COMMA_RESUMEF_ENABLE_IF_TYPENAME()> RESUMEF_REQUIRES(std::is_constructible_v<_Ty, U&&>)
typename channel_t<_Ty, _Optional, _OptimizationThread>::write_awaiter
channel_t<_Ty, _Optional, _OptimizationThread>::operator << (U&& val) const noexcept(std::is_move_constructible_v<U>)
channel_t<_Ty, _Optional, _OptimizationThread>::operator << (U&& val) const noexcept(std::is_nothrow_move_constructible_v<U>)
{
return write_awaiter{ _chan.get(), std::forward<U>(val) };
}

+ 23
- 8
librf/src/counted_ptr.h Näytä tiedosto

@@ -16,7 +16,7 @@ namespace resumef
/**
* @brief 拷贝构造函数。
*/
counted_ptr(const counted_ptr& cp) : _p(cp._p)
counted_ptr(const counted_ptr& cp) noexcept : _p(cp._p)
{
_lock();
}
@@ -24,7 +24,7 @@ namespace resumef
/**
* @brief 通过裸指针构造一个计数指针。
*/
counted_ptr(T* p) : _p(p)
counted_ptr(T* p) noexcept : _p(p)
{
_lock();
}
@@ -32,9 +32,8 @@ namespace resumef
/**
* @brief 移动构造函数。
*/
counted_ptr(counted_ptr&& cp) noexcept
counted_ptr(counted_ptr&& cp) noexcept : _p(std::exchange(cp._p, nullptr))
{
std::swap(_p, cp._p);
}
/**
@@ -44,7 +43,7 @@ namespace resumef
{
if (&cp != this)
{
counted_ptr t = cp;
counted_ptr t(cp);
std::swap(_p, t._p);
}
return *this;
@@ -53,13 +52,21 @@ namespace resumef
/**
* @brief 移动赋值函数。
*/
counted_ptr& operator=(counted_ptr&& cp) noexcept
counted_ptr& operator=(counted_ptr&& cp)
{
if (&cp != this)
{
std::swap(_p, cp._p);
cp._unlock();
}
return *this;
}
void swap(counted_ptr& cp) noexcept
{
std::swap(_p, cp._p);
}
/**
* @brief 析构函数中自动做一个计数减一操作。计数减为0,则删除state对象。
*/
@@ -101,13 +108,13 @@ namespace resumef
t->unlock();
}
}
void _lock(T* p)
void _lock(T* p) noexcept
{
if (p != nullptr)
p->lock();
_p = p;
}
void _lock()
void _lock() noexcept
{
if (_p != nullptr)
_p->lock();
@@ -142,3 +149,11 @@ namespace resumef
}
}
namespace std
{
template<typename T>
inline void swap(resumef::counted_ptr<T>& a, resumef::counted_ptr<T>& b) noexcept
{
a.swap(b);
}
}

+ 1
- 7
librf/src/rf_task.cpp Näytä tiedosto

@@ -9,17 +9,11 @@ namespace resumef
task_t::~task_t()
{
///TODO : 这里有线程安全问题(2020/05/09)
_stop.clear_callback();
///TODO : 这里有线程安全问题(2020/05/09)
}
const stop_source & task_t::get_stop_source()
{
///TODO : 这里有线程安全问题(2020/05/09)
_stop.make_possible();
///TODO : 这里有线程安全问题(2020/05/09)
_stop.make_sure_possible();
return _stop;
}
}

+ 25
- 3
librf/src/rf_task.h Näytä tiedosto

@@ -19,18 +19,40 @@ namespace resumef
task_t();
virtual ~task_t();
/// TODO : 存在BUG(2020/05/09)
/**
* @brief 获取stop_source,第一次获取时,会生成一个有效的stop_source。
* @return stop_source
*/
const stop_source & get_stop_source();
/// TODO : 存在BUG(2020/05/09)
/**
* @brief 获取一个跟stop_source绑定的,新的stop_token。
* @return stop_token
*/
stop_token get_stop_token()
{
return get_stop_source().get_token();
}
/// TODO : 存在BUG(2020/05/09)
/**
* @brief 要求停止协程。
* @return bool 返回操作成功与否。
*/
bool request_stop()
{
return get_stop_source().request_stop();
}
/**
* @brief 要求停止协程。
* @return bool 返回操作成功与否。
*/
bool request_stop_if_possible()
{
if (_stop.stop_possible())
return _stop.request_stop();
return false;
}
protected:
friend scheduler_t;
counted_ptr<state_base_t> _state;

+ 10
- 10
librf/src/state.h Näytä tiedosto

@@ -11,7 +11,7 @@ namespace resumef
private:
std::atomic<intptr_t> _count{0};
public:
void lock()
void lock() noexcept
{
++_count;
}
@@ -38,16 +38,16 @@ namespace resumef
virtual bool has_handler() const noexcept = 0;
virtual state_base_t* get_parent() const noexcept;
void set_scheduler(scheduler_t* sch)
void set_scheduler(scheduler_t* sch) noexcept
{
_scheduler = sch;
}
coroutine_handle<> get_handler() const
coroutine_handle<> get_handler() const noexcept
{
return _coro;
}
state_base_t* get_root() const noexcept
state_base_t* get_root() const
{
state_base_t* root = const_cast<state_base_t*>(this);
state_base_t* next = root->get_parent();
@@ -78,7 +78,7 @@ namespace resumef
bool switch_scheduler_await_suspend(scheduler_t* sch);
void set_initial_suspend(coroutine_handle<> handler)
void set_initial_suspend(coroutine_handle<> handler) noexcept
{
_coro = handler;
}
@@ -131,7 +131,7 @@ namespace resumef
static_assert(sizeof(std::atomic<initor_type>) == 1);
static_assert(alignof(std::atomic<initor_type>) == 1);
protected:
explicit state_future_t(bool awaitor)
explicit state_future_t(bool awaitor) noexcept
{
#if RESUMEF_DEBUG_COUNTER
_id = ++g_resumef_state_id;
@@ -169,7 +169,7 @@ namespace resumef
return _alloc_size;
}
inline bool future_await_ready() noexcept
inline bool future_await_ready() const noexcept
{
//scoped_lock<lock_type> __guard(this->_mtx);
return _has_value.load(std::memory_order_acquire) != result_type::None;
@@ -221,7 +221,7 @@ namespace resumef
using state_future_t::lock_type;
using value_type = _Ty;
private:
explicit state_t(bool awaitor) :state_future_t(awaitor) {}
explicit state_t(bool awaitor) noexcept :state_future_t(awaitor) {}
public:
~state_t()
{
@@ -273,7 +273,7 @@ namespace resumef
using value_type = _Ty;
using reference_type = _Ty&;
private:
explicit state_t(bool awaitor) :state_future_t(awaitor) {}
explicit state_t(bool awaitor) noexcept :state_future_t(awaitor) {}
public:
~state_t()
{
@@ -310,7 +310,7 @@ namespace resumef
friend state_future_t;
using state_future_t::lock_type;
private:
explicit state_t(bool awaitor) :state_future_t(awaitor) {}
explicit state_t(bool awaitor) noexcept :state_future_t(awaitor) {}
public:
void future_await_resume();
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>

+ 8
- 29
librf/src/stop_token.hpp Näytä tiedosto

@@ -4,6 +4,8 @@
* @ V1.0
*************************************************/

//librf注:暂时使用一个网友提供的stop_token实现。
//等待C++20的stop_token被各个编译器都实现后,再使用STL里的stop_token来完成对应功能。
#pragma once

namespace milk
@@ -123,7 +125,6 @@ struct stop_state
{
if (state_.fetch_sub(klockAndTokenRefIncrement, std::memory_order_acq_rel) < kLockedAndZeroRef)
{
clear_callback();
delete this;
}
}
@@ -139,7 +140,6 @@ public:
auto old_state = state_.fetch_sub(kTokenRefIncrement, std::memory_order_acq_rel);
if (old_state < kZeroRef)
{
clear_callback();
delete this;
}
}
@@ -154,7 +154,6 @@ public:
auto old_state = state_.fetch_sub(kSourceRefIncrement, std::memory_order_acq_rel);
if (old_state < kZeroRef)
{
clear_callback();
delete this;
}
}
@@ -286,22 +285,6 @@ __check_state:
remove_token_reference();
}

void clear_callback() noexcept
{
lock();
stop_callback_base* cb = head_;
head_ = nullptr;

while (cb)
{
stop_callback_base* tmp = cb->next;
cb->prev = nullptr;
cb->next = nullptr;
cb = tmp;
}
unlock();
}

};

}//namespace details
@@ -474,14 +457,18 @@ public:
return state_ != nullptr;
}

void make_possible()
void make_sure_possible()
{
if (state_ == nullptr)
{
details::stop_state* st = new details::stop_state();
details::stop_state* tmp = nullptr;
if (!std::atomic_compare_exchange_strong_explicit(
reinterpret_cast<std::atomic<details::stop_state*>*>(&state_), &tmp, st, std::memory_order_release, std::memory_order_acquire))
reinterpret_cast<std::atomic<details::stop_state*>*>(&state_),
&tmp,
st,
std::memory_order_release,
std::memory_order_acquire))
{
st->remove_source_reference();
}
@@ -503,14 +490,6 @@ public:
return stop_token{state_};
}

void clear_callback() const noexcept
{
if (state_)
{
state_->clear_callback();
}
}

void swap(stop_source& other) noexcept
{
std::swap(state_, other.state_);

+ 2
- 2
test_librf.cpp Näytä tiedosto

@@ -35,7 +35,7 @@ int main(int argc, const char* argv[])
(void)argc;
(void)argv;

//resumable_main_mutex();
//resumable_main_stop_token();
//return 0;

//if (argc > 1)
@@ -64,7 +64,7 @@ int main(int argc, const char* argv[])
resumable_main_sleep();
resumable_main_when_all();
resumable_main_switch_scheduler();
//resumable_main_stop_token();
resumable_main_stop_token();
std::cout << "ALL OK!" << std::endl;

benchmark_main_channel_passing_next(); //这是一个死循环测试

+ 21
- 19
tutorial/test_async_stop_token.cpp Näytä tiedosto

@@ -8,8 +8,8 @@
using namespace resumef;
using namespace std::chrono;

//token触发停止后,将不再调用cb
template<class _Ctype>
//_Ctype签名:void(bool, int64_t)
template<class _Ctype, typename=std::enable_if_t<std::is_invocable_v<_Ctype, bool, int64_t>>>
static void callback_get_long_with_stop(stop_token token, int64_t val, _Ctype&& cb)
{
std::thread([val, token = std::move(token), cb = std::forward<_Ctype>(cb)]
@@ -17,11 +17,17 @@ static void callback_get_long_with_stop(stop_token token, int64_t val, _Ctype&&
for (int i = 0; i < 10; ++i)
{
if (token.stop_requested())
{
cb(false, 0);
return;
}
std::this_thread::sleep_for(10ms);
}

cb(val * val);
//有可能未检测到token的停止要求
//如果使用stop_callback来停止,则务必保证检测到的退出要求是唯一的,且线程安全的
//否则,多次调用cb,会导致协程在半退出状态下,外部的awaitable_t管理的state获取跟root出现错误。
cb(true, val * val);
}).detach();
}

@@ -29,24 +35,18 @@ static void callback_get_long_with_stop(stop_token token, int64_t val, _Ctype&&
static future_t<int64_t> async_get_long_with_stop(stop_token token, int64_t val)
{
awaitable_t<int64_t> awaitable;
//保证stopptr的生存期,与callback_get_long_with_cancel()的回调参数的生存期一致。
//如果token已经被取消,则传入的lambda会立即被调用,则awaitable将不能再set_value
auto stopptr = make_stop_callback(token, [awaitable]
{
if (awaitable)
awaitable.throw_exception(canceled_exception(error_code::stop_requested));
});

if (awaitable) //处理已经被取消的情况
{
callback_get_long_with_stop(token, val, [awaitable, stopptr = std::move(stopptr)](int64_t val)
{
if (awaitable)
awaitable.set_value(val);
});
}
//在这里通过stop_callback来处理退出,并将退出转化为error_code::stop_requested异常。
//则必然会存在线程竞争问题,导致协程提前于callback_get_long_with_stop的回调之前而退出。
//同时,callback_get_long_with_stop还未必一定能检测到退出要求----毕竟只是一个要求,而不是强制。

callback_get_long_with_stop(token, val, [awaitable](bool ok, int64_t val)
{
if (ok)
awaitable.set_value(val);
else
awaitable.throw_exception(canceled_exception{error_code::stop_requested});
});
return awaitable.get_future();
}

@@ -92,4 +92,6 @@ void resumable_main_stop_token()
srand((int)time(nullptr));
for (int i = 0; i < 10; ++i)
test_get_long_with_stop(i);

std::cout << "OK - stop_token!" << std::endl;
}

Loading…
Peruuta
Tallenna