Browse Source

改版后第一次正常跑完回调测试

tags/v2.9.7
tearshark 4 years ago
parent
commit
3f64156e82

+ 2
- 2
librf/src/asio_task_1.10.0.inl View File

@@ -43,7 +43,7 @@ namespace asio {

template<typename Allocator>
promise_handler(const rf_task_t<Allocator> &)
: state_(resumef::make_counted<state_type>())
: state_(resumef::make_counted<state_type>(true))
{
}

@@ -76,7 +76,7 @@ namespace asio {

template<typename Allocator>
promise_handler(const rf_task_t<Allocator> &)
: state_(resumef::make_counted<state_type>())
: state_(resumef::make_counted<state_type>(true))
{
}


+ 1
- 1
librf/src/asio_task_1.12.0.inl View File

@@ -28,7 +28,7 @@ namespace asio {
typedef resumef::state_t<result_type> state_type;

promise_handler_base()
: state_(resumef::make_counted<state_type>())
: state_(resumef::make_counted<state_type>(true))
{
}


+ 2
- 2
librf/src/awaitable.h View File

@@ -11,7 +11,7 @@ namespace resumef
using lock_type = typename state_type::lock_type;

private:
mutable counted_ptr<state_type> _state = make_counted<state_type>();
mutable counted_ptr<state_type> _state = make_counted<state_type>(true);
public:
awaitable_t() {}
awaitable_t(const awaitable_t&) = default;
@@ -46,7 +46,7 @@ namespace resumef
using future_type = future_t<void>;
using lock_type = typename state_type::lock_type;

mutable counted_ptr<state_type> _state = make_counted<state_type>();
mutable counted_ptr<state_type> _state = make_counted<state_type>(true);

awaitable_t() {}
awaitable_t(const awaitable_t&) = default;

+ 3
- 3
librf/src/counted_ptr.h View File

@@ -99,10 +99,10 @@ namespace resumef
T* _p = nullptr;
};
template <typename T>
counted_ptr<T> make_counted()
template <typename T, typename... Args>
counted_ptr<T> make_counted(Args&&... args)
{
return new T{};
return new T{std::forward<Args>(args)...};
}
}

+ 1
- 0
librf/src/def.h View File

@@ -37,6 +37,7 @@ extern std::mutex g_resumef_cout_mutex;
extern std::atomic<intptr_t> g_resumef_state_count;
extern std::atomic<intptr_t> g_resumef_task_count;
extern std::atomic<intptr_t> g_resumef_evtctx_count;
extern std::atomic<intptr_t> g_resumef_state_id;
#endif
namespace std

+ 1
- 1
librf/src/promise.h View File

@@ -12,7 +12,7 @@ namespace resumef
using future_type = future_t<value_type>;
using lock_type = typename state_type::lock_type;

counted_ptr<state_type> _state = make_counted<state_type>();
counted_ptr<state_type> _state = make_counted<state_type>(false);

promise_impl_t() {}
promise_impl_t(promise_impl_t&& _Right) noexcept = default;

+ 24
- 5
librf/src/promise.inl View File

@@ -7,18 +7,38 @@ namespace resumef
{
struct suspend_on_initial
{
counted_ptr<state_base_t> _state;
state_base_t* _state;

inline bool await_ready() noexcept
{
return false;
}
inline void await_suspend(coroutine_handle<> handler) noexcept
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
inline void await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
_state->set_handler(handler);
_state->promise_initial_suspend(handler);
}
inline void await_resume() noexcept
{
_state->promise_await_resume();
}
};
struct suspend_on_final
{
state_base_t* _state;

inline bool await_ready() noexcept
{
return false;
}
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
inline void await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
_state->promise_final_suspend(handler);
}
inline void await_resume() noexcept
{
_state->promise_await_resume();
}
};

@@ -31,8 +51,7 @@ namespace resumef
template <typename _Ty>
inline auto promise_impl_t<_Ty>::final_suspend() noexcept
{
_state->promise_final_suspend();
return std::experimental::suspend_never{};
return suspend_on_final{ _state.get() };
}

template <typename _Ty>

+ 17
- 16
librf/src/scheduler.cpp View File

@@ -6,6 +6,7 @@ std::mutex g_resumef_cout_mutex;
std::atomic<intptr_t> g_resumef_state_count = 0;
std::atomic<intptr_t> g_resumef_task_count = 0;
std::atomic<intptr_t> g_resumef_evtctx_count = 0;
std::atomic<intptr_t> g_resumef_state_id = 0;
#endif
namespace resumef
@@ -88,24 +89,21 @@ namespace resumef
}
//如果是单独的future,没有被co_await过,则handler是nullptr。
if (sptr->get_handler() != nullptr)
sptr->set_scheduler(this);
if (sptr->has_handler())
this->add_initial(sptr);
else
sptr->set_scheduler(this);
}
void scheduler_t::add_initial(state_base_t* sptr)
{
sptr->set_scheduler(this);
scoped_lock<spinlock, lock_type> __guard(_lock_ready, _lock_running);
scoped_lock<lock_type> __guard(_lock_running);
_runing_states.emplace_back(sptr);
_ready_task.try_emplace(sptr, nullptr);
}
void scheduler_t::add_await(state_base_t* sptr, coroutine_handle<> handler)
void scheduler_t::add_await(state_base_t* sptr)
{
sptr->set_scheduler(this);
sptr->set_handler(handler);
if (sptr->is_ready())
{
scoped_lock<lock_type> __guard(_lock_running);
@@ -116,25 +114,28 @@ namespace resumef
void scheduler_t::add_ready(state_base_t* sptr)
{
assert(sptr->get_scheduler() == this);
assert(sptr->is_ready());
if (sptr->get_handler() != nullptr)
if (sptr->has_handler())
{
scoped_lock<lock_type> __guard(_lock_running);
_runing_states.emplace_back(sptr);
}
else
{
scoped_lock<spinlock> __guard(_lock_ready);
this->_ready_task.erase(sptr);
}
}
void scheduler_t::del_final(state_base_t* sptr)
{
assert(sptr->get_scheduler() == this);
scoped_lock<spinlock> __guard(_lock_ready);
this->_ready_task.erase(sptr);
{
scoped_lock<spinlock> __guard(_lock_ready);
this->_ready_task.erase(sptr);
}
if (sptr->has_handler())
{
scoped_lock<lock_type> __guard(_lock_running);
_runing_states.emplace_back(sptr);
}
}
/*

+ 3
- 3
librf/src/scheduler.h View File

@@ -56,8 +56,8 @@ namespace resumef
inline bool empty() const
{
scoped_lock<spinlock> __guard(_lock_ready);
return this->_ready_task.empty() && this->_timer->empty();
scoped_lock<spinlock, lock_type> __guard(_lock_ready, _lock_running);
return _ready_task.empty() && _runing_states.empty() && _timer->empty();
}
inline timer_manager * timer() const
@@ -66,7 +66,7 @@ namespace resumef
}
void add_initial(state_base_t* sptr);
void add_await(state_base_t* sptr, coroutine_handle<> handler);
void add_await(state_base_t* sptr);
void add_ready(state_base_t* sptr);
void del_final(state_base_t* sptr);

+ 6
- 15
librf/src/state.cpp View File

@@ -5,28 +5,18 @@
namespace resumef
{
std::atomic<intptr_t> g_resumef_static_count = {0};
state_base_t::~state_base_t()
{
}
void state_base_t::promise_final_suspend()
{
scoped_lock<lock_type> __guard(this->_mtx);
scheduler_t* _scheduler = this->_scheduler;
if (_scheduler != nullptr)
_scheduler->del_final(this);
}
void state_base_t::set_exception(std::exception_ptr e)
{
scoped_lock<lock_type> __guard(this->_mtx);
this->_exception = std::move(e);
if (this->_scheduler != nullptr)
this->_scheduler->add_ready(this);
scheduler_t* sch = this->get_scheduler();
if (sch != nullptr)
sch->add_ready(this);
}
void state_t<void>::future_await_resume()
@@ -41,7 +31,8 @@ namespace resumef
scoped_lock<lock_type> __guard(this->_mtx);
this->_has_value = true;
if (this->_scheduler != nullptr)
this->_scheduler->add_ready(this);
scheduler_t* sch = this->get_scheduler();
if (sch != nullptr)
sch->add_ready(this);
}
}

+ 61
- 20
librf/src/state.h View File

@@ -8,8 +8,6 @@
namespace resumef
{
extern std::atomic<intptr_t> g_resumef_static_count;
struct state_base_t : public counted_t<state_base_t>
{
typedef std::recursive_mutex lock_type;
@@ -17,19 +15,25 @@ namespace resumef
protected:
mutable lock_type _mtx;
scheduler_t* _scheduler = nullptr;
coroutine_handle<> _initor;
//可能来自协程里的promise产生的,则经过co_await操作后,_coro在初始时不会为nullptr。
//也可能来自awaitable_t,如果
// 一、经过co_await操作后,_coro在初始时不会为nullptr。
// 二、没有co_await操作,直接加入到了调度器里,则_coro在初始时为nullptr。调度器需要特殊处理此种情况。
coroutine_handle<> _coro;
std::exception_ptr _exception;
public:
intptr_t _id;
state_base_t* _parent = nullptr;
state_base_t()
#if RESUMEF_DEBUG_COUNTER
intptr_t _id;
#endif
bool _is_awaitor;
public:
state_base_t(bool awaitor)
{
_id = ++g_resumef_static_count;
#if RESUMEF_DEBUG_COUNTER
_id = ++g_resumef_state_id;
#endif
_is_awaitor = awaitor;
}
RF_API virtual ~state_base_t();
@@ -37,51 +41,87 @@ namespace resumef
bool is_ready() const
{
return has_value() && _exception != nullptr;
return _is_awaitor == false || has_value() || _exception != nullptr;
}
void resume()
{
coroutine_handle<> handler;
scoped_lock<lock_type> __guard(_mtx);
if (_initor != nullptr)
{
handler = _initor;
_initor = nullptr;
handler();
}
else if(_coro != nullptr)
{
handler = _coro;
_coro = nullptr;
handler();
}
}
coroutine_handle<> get_handler() const
{
return _coro;
}
bool has_handler() const
{
return _initor != nullptr || _coro != nullptr;
}
auto handler = _coro;
_coro = nullptr;
handler();
void set_scheduler(scheduler_t* sch)
{
scoped_lock<lock_type> __guard(_mtx);
_scheduler = sch;
}
void set_handler(coroutine_handle<> handler)
void set_scheduler_handler(scheduler_t* sch, coroutine_handle<> handler)
{
scoped_lock<lock_type> __guard(_mtx);
_scheduler = sch;
assert(_coro == nullptr);
_coro = handler;
}
coroutine_handle<> get_handler() const
scheduler_t* get_scheduler() const
{
return _coro;
return _parent ? _parent->get_scheduler() : _scheduler;
}
void set_scheduler(scheduler_t* sch)
state_base_t * get_parent() const
{
scoped_lock<lock_type> __guard(_mtx);
_scheduler = sch;
return _parent;
}
scheduler_t* get_scheduler() const
/*
const state_base_t* root_state() const
{
return _scheduler;
return _parent ? _parent->root_state() : this;
}
state_base_t* root_state()
{
return _parent ? _parent->root_state() : this;
}
*/
void set_exception(std::exception_ptr e);
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void future_await_suspend(coroutine_handle<_PromiseT> handler);
void promise_final_suspend();
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void promise_initial_suspend(coroutine_handle<_PromiseT> handler);
void promise_await_resume();
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void promise_final_suspend(coroutine_handle<_PromiseT> handler);
};
template <typename _Ty>
struct state_t : public state_base_t
{
using state_base_t::state_base_t;
using state_base_t::lock_type;
using value_type = _Ty;
protected:
@@ -107,6 +147,7 @@ namespace resumef
template<>
struct state_t<void> : public state_base_t
{
using state_base_t::state_base_t;
using state_base_t::lock_type;
protected:
std::atomic<bool> _has_value{ false };

+ 45
- 5
librf/src/state.inl View File

@@ -2,6 +2,37 @@

namespace resumef
{
template<class _PromiseT, typename _Enable>
inline void state_base_t::promise_initial_suspend(coroutine_handle<_PromiseT> handler)
{
_PromiseT& promise = handler.promise();

state_base_t* parent_state = promise._state.get();
assert(this == parent_state);
assert(this->_scheduler == nullptr);
assert(this->_coro == nullptr);
this->_initor = handler;
}

inline void state_base_t::promise_await_resume()
{
}

template<class _PromiseT, typename _Enable>
inline void state_base_t::promise_final_suspend(coroutine_handle<_PromiseT> handler)
{
scoped_lock<lock_type> __guard(this->_mtx);

_PromiseT& promise = handler.promise();

state_base_t* parent_state = promise._state.get();
assert(this == parent_state);

scheduler_t* sch = this->get_scheduler();
assert(sch != nullptr);
sch->del_final(this);
}

template<class _PromiseT, typename _Enable>
inline void state_base_t::future_await_suspend(coroutine_handle<_PromiseT> handler)
{
@@ -10,9 +41,17 @@ namespace resumef
_PromiseT& promise = handler.promise();

state_base_t* parent_state = promise._state.get();
this->_parent = parent_state;
scheduler_t* sch = parent_state->_scheduler;
sch->add_await(this, handler);
scheduler_t* sch = parent_state->get_scheduler();
if (this != parent_state)
{
this->_parent = parent_state;
this->_scheduler = sch;
}

if (this->_coro == nullptr)
this->_coro = handler;
if (sch != nullptr)
sch->add_await(this);
}

template<typename _Ty>
@@ -30,8 +69,9 @@ namespace resumef
scoped_lock<lock_type> __guard(this->_mtx);

this->_value = std::move(val);
if (this->_scheduler != nullptr)
this->_scheduler->add_ready(this);
scheduler_t* sch = this->get_scheduler();
if (sch != nullptr)
sch->add_ready(this);
}
}


+ 3
- 6
tutorial/test_async_cb.cpp View File

@@ -14,7 +14,7 @@ void callback_get_long(int64_t val, _Ctype&& cb)
std::thread([val, cb = std::forward<_Ctype>(cb)]
{
//std::this_thread::sleep_for(500ms);
std::this_thread::sleep_for(10s);
std::this_thread::sleep_for(2s);
cb(val * val);
}).detach();
}
@@ -61,16 +61,13 @@ resumef::future_t<int64_t> loop_get_long(int64_t val)
void resumable_main_cb()
{
std::cout << std::this_thread::get_id() << std::endl;
go wait_get_long(3);
resumef::this_scheduler()->run_until_notask();
GO
{
auto val = co_await loop_get_long(2);
std::cout << val << std::endl;
std::cout << "GO:" << val << std::endl;
};
resumef::this_scheduler()->run_until_notask();
//go resumable_get_long(3);
go resumable_get_long(3);
resumef::this_scheduler()->run_until_notask();
}

+ 1
- 1
vs_proj/librf.vcxproj View File

@@ -104,7 +104,7 @@
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;ASIO_STANDALONE;RESUMEF_DEBUG_COUNTER=0;RESUMEF_ENABLE_MULT_SCHEDULER=1;RESUMEF_USE_BOOST_ANY=0;_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING=1;ASIO_DISABLE_CONCEPTS=1;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;ASIO_STANDALONE;RESUMEF_DEBUG_COUNTER=1;RESUMEF_ENABLE_MULT_SCHEDULER=1;RESUMEF_USE_BOOST_ANY=0;_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING=1;ASIO_DISABLE_CONCEPTS=1;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<SDLCheck>true</SDLCheck>
<AdditionalIncludeDirectories>..\librf;..\..\asio\asio\include;..\..\asio-1.10.6\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalOptions>/await</AdditionalOptions>

Loading…
Cancel
Save