Browse Source

准备支持调度generator

tags/v2.9.7
tearshark 4 years ago
parent
commit
7289258a75

+ 9
- 0
librf/src/def.h View File

@@ -67,6 +67,8 @@ namespace resumef
template<typename _PromiseT = void>
using coroutine_handle = std::experimental::coroutine_handle<_PromiseT>;
template <typename _Ty, typename _Alloc = std::allocator<char>>
using generator_t = std::experimental::generator<_Ty, _Alloc>;
enum struct error_code
{
@@ -151,6 +153,13 @@ namespace resumef
template<class _Ty>
_INLINE_VAR constexpr bool is_future_v = is_future<std::remove_cvref_t<_Ty>>::value;
template<class _G>
struct is_generator : std::false_type {};
template <typename _Ty, typename _Alloc>
struct is_generator<generator_t<_Ty, _Alloc>> : std::true_type {};
template<class _Ty>
_INLINE_VAR constexpr bool is_generator_v = is_generator<std::remove_cvref_t<_Ty>>::value;
//获得当前线程下的调度器
scheduler_t* this_scheduler();
}

+ 27
- 25
librf/src/generator.h View File

@@ -20,7 +20,10 @@
#include <experimental/resumable>
#pragma pack(push, _CRT_PACKING)
#pragma pack(push,_CRT_PACKING)
#pragma warning(push,_STL_WARNING_LEVEL)
#pragma warning(disable: _STL_DISABLED_WARNINGS)
_STL_DISABLE_CLANG_WARNINGS
#pragma push_macro("new")
#undef new
@@ -49,23 +52,18 @@ namespace experimental {
generator_iterator &operator++()
{
_Coro.resume();
if (_Coro.done())
_Coro = nullptr;
else
_Coro.resume();
return *this;
}
generator_iterator operator++(int) = delete;
// generator iterator current_value
// is a reference to a temporary on the coroutine frame
// implementing postincrement will require storing a copy
// of the value in the iterator.
//{
// auto _Result = *this;
// ++(*this);
// return _Result;
//}
void operator++(int)
{
// This postincrement operator meets the requirements of the Ranges TS
// InputIterator concept, but not those of Standard C++ InputIterator.
++* this;
}
bool operator==(generator_iterator const &right_) const
{
@@ -92,6 +90,10 @@ namespace experimental {
template <typename _Ty, typename promise_type>
struct generator_iterator : public generator_iterator<void, promise_type>
{
using value_type = _Ty;
using reference = _Ty const&;
using pointer = _Ty const*;
generator_iterator(nullptr_t) : generator_iterator<void, promise_type>(nullptr)
{
}
@@ -99,14 +101,14 @@ namespace experimental {
{
}
_Ty const &operator*() const
reference operator*() const
{
return *this->_Coro.promise()._CurrentValue;
return *_Coro.promise()._CurrentValue;
}
_Ty const *operator->() const
pointer operator->() const
{
return _STD addressof(operator*());
return _Coro.promise()._CurrentValue;
}
};
@@ -156,9 +158,11 @@ namespace experimental {
return _STD forward<_Uty>(_Whatever);
}
using _Alloc_traits = allocator_traits<_Alloc>;
using _Alloc_of_char_type =
typename _Alloc_traits::template rebind_alloc<char>;
using _Alloc_char = _Rebind_alloc_t<_Alloc, char>;
static_assert(is_same_v<char*, typename allocator_traits<_Alloc_char>::pointer>,
"generator does not support allocators with fancy pointer types");
static_assert(allocator_traits<_Alloc_char>::is_always_equal::value,
"generator only supports stateless allocators");
void *operator new(size_t _Size)
{
@@ -179,10 +183,9 @@ namespace experimental {
{
if (_Coro)
{
_Coro.resume();
if (_Coro.done())
return{ nullptr };
_Coro.resume();
}
return { _Coro };
}
@@ -193,8 +196,7 @@ namespace experimental {
}
explicit generator(promise_type &_Prom)
: _Coro(coroutine_handle<promise_type>::from_promise(
_Prom))
: _Coro(coroutine_handle<promise_type>::from_promise(_Prom))
{
}
@@ -212,7 +214,7 @@ namespace experimental {
generator &operator=(generator &&right_) noexcept
{
if (&right_ != this) {
if (this != _STD addressof(right_)) {
_Coro = right_._Coro;
right_._Coro = nullptr;
}

+ 2
- 2
librf/src/promise.inl View File

@@ -7,7 +7,7 @@ namespace resumef
{
struct suspend_on_initial
{
state_base_t* _state;
state_future_t* _state;

inline bool await_ready() noexcept
{
@@ -25,7 +25,7 @@ namespace resumef
};
struct suspend_on_final
{
state_base_t* _state;
state_future_t* _state;

inline bool await_ready() noexcept
{

+ 6
- 3
librf/src/scheduler.cpp View File

@@ -113,7 +113,6 @@ namespace resumef
void scheduler_t::add_ready(state_base_t* sptr)
{
assert(sptr->get_scheduler() == this);
assert(sptr->is_ready());
if (sptr->has_handler())
@@ -123,10 +122,14 @@ namespace resumef
}
}
void scheduler_t::del_final(state_base_t* sptr)
void scheduler_t::add_generator(state_base_t* sptr)
{
assert(sptr->get_scheduler() == this);
scoped_lock<lock_type> __guard(_lock_running);
_runing_states.emplace_back(sptr);
}
void scheduler_t::del_final(state_base_t* sptr)
{
{
scoped_lock<spinlock> __guard(_lock_ready);
this->_ready_task.erase(sptr);

+ 1
- 5
librf/src/scheduler.h View File

@@ -49,11 +49,6 @@ namespace resumef
new_task(new ctx_task_t<_Ty>(std::forward<_Ty>(t_)));
}
inline void push_task_internal(task_base_t * t_)
{
new_task(t_);
}
inline bool empty() const
{
scoped_lock<spinlock, lock_type> __guard(_lock_ready, _lock_running);
@@ -68,6 +63,7 @@ namespace resumef
void add_initial(state_base_t* sptr);
void add_await(state_base_t* sptr);
void add_ready(state_base_t* sptr);
void add_generator(state_base_t* sptr);
void del_final(state_base_t* sptr);
friend struct task_base;

+ 35
- 2
librf/src/state.cpp View File

@@ -9,7 +9,29 @@ namespace resumef
{
}
void state_base_t::resume()
void state_generator_t::resume()
{
if (_coro != nullptr)
{
_coro.resume();
if (_coro.done())
_coro = nullptr;
else
_scheduler->add_generator(this);
}
}
bool state_generator_t::is_ready() const
{
return _coro != nullptr && !_coro.done();
}
bool state_generator_t::has_handler() const
{
return _coro != nullptr;
}
void state_future_t::resume()
{
coroutine_handle<> handler;
@@ -28,7 +50,12 @@ namespace resumef
}
}
void state_base_t::set_exception(std::exception_ptr e)
bool state_future_t::has_handler() const
{
return _initor != nullptr || _coro != nullptr;
}
void state_future_t::set_exception(std::exception_ptr e)
{
scoped_lock<lock_type> __guard(this->_mtx);
@@ -37,6 +64,12 @@ namespace resumef
if (sch != nullptr)
sch->add_ready(this);
}
bool state_t<void>::is_ready() const
{
scoped_lock<lock_type> __guard(this->_mtx);
return _is_awaitor == false || _has_value || _exception != nullptr;
}
void state_t<void>::future_await_resume()
{

+ 42
- 48
librf/src/state.h View File

@@ -10,25 +10,50 @@ namespace resumef
{
struct state_base_t : public counted_t<state_base_t>
{
typedef std::recursive_mutex lock_type;
RF_API virtual ~state_base_t();
protected:
mutable lock_type _mtx;
scheduler_t* _scheduler = nullptr;
coroutine_handle<> _initor;
//可能来自协程里的promise产生的,则经过co_await操作后,_coro在初始时不会为nullptr。
//也可能来自awaitable_t,如果
// 一、经过co_await操作后,_coro在初始时不会为nullptr。
// 二、没有co_await操作,直接加入到了调度器里,则_coro在初始时为nullptr。调度器需要特殊处理此种情况。
coroutine_handle<> _coro;
public:
virtual void resume() = 0;
virtual bool is_ready() const = 0;
virtual bool has_handler() const = 0;
void set_scheduler(scheduler_t* sch)
{
_scheduler = sch;
}
coroutine_handle<> get_handler() const
{
return _coro;
}
};
struct state_generator_t : public state_base_t
{
virtual void resume() override;
virtual bool is_ready() const override;
virtual bool has_handler() const override;
};
struct state_future_t : public state_base_t
{
typedef std::recursive_mutex lock_type;
protected:
mutable lock_type _mtx;
coroutine_handle<> _initor;
std::exception_ptr _exception;
state_base_t* _parent = nullptr;
state_future_t* _parent = nullptr;
#if RESUMEF_DEBUG_COUNTER
intptr_t _id;
#endif
bool _is_awaitor;
public:
state_base_t(bool awaitor)
state_future_t(bool awaitor)
{
#if RESUMEF_DEBUG_COUNTER
_id = ++g_resumef_state_id;
@@ -36,41 +61,13 @@ namespace resumef
_is_awaitor = awaitor;
}
RF_API virtual ~state_base_t();
virtual bool has_value() const = 0;
void resume();
bool is_ready() const
{
return _is_awaitor == false || has_value() || _exception != nullptr;
}
coroutine_handle<> get_handler() const
{
return _coro;
}
bool has_handler() const
{
return _initor != nullptr || _coro != nullptr;
}
virtual void resume() override;
virtual bool has_handler() const override;
scheduler_t* get_scheduler() const
{
return _parent ? _parent->get_scheduler() : _scheduler;
}
void set_scheduler(scheduler_t* sch)
{
scoped_lock<lock_type> __guard(_mtx);
_scheduler = sch;
}
void set_scheduler_handler(scheduler_t* sch, coroutine_handle<> handler)
{
scoped_lock<lock_type> __guard(_mtx);
_scheduler = sch;
assert(_coro == nullptr);
_coro = handler;
}
state_base_t * get_parent() const
{
@@ -90,18 +87,18 @@ namespace resumef
};
template <typename _Ty>
struct state_t : public state_base_t
struct state_t : public state_future_t
{
using state_base_t::state_base_t;
using state_base_t::lock_type;
using state_future_t::state_future_t;
using state_future_t::lock_type;
using value_type = _Ty;
protected:
std::optional<value_type> _value;
public:
virtual bool has_value() const override
virtual bool is_ready() const override
{
scoped_lock<lock_type> __guard(this->_mtx);
return _value.has_value();
return _is_awaitor == false || _value.has_value() || _exception != nullptr;
}
bool future_await_ready()
@@ -116,17 +113,14 @@ namespace resumef
};
template<>
struct state_t<void> : public state_base_t
struct state_t<void> : public state_future_t
{
using state_base_t::state_base_t;
using state_base_t::lock_type;
using state_future_t::state_future_t;
using state_future_t::lock_type;
protected:
std::atomic<bool> _has_value{ false };
public:
virtual bool has_value() const override
{
return _has_value;
}
virtual bool is_ready() const override;
bool future_await_ready()
{

+ 5
- 5
librf/src/state.inl View File

@@ -3,7 +3,7 @@
namespace resumef
{
template<class _PromiseT, typename _Enable>
inline void state_base_t::promise_initial_suspend(coroutine_handle<_PromiseT> handler)
inline void state_future_t::promise_initial_suspend(coroutine_handle<_PromiseT> handler)
{
_PromiseT& promise = handler.promise();

@@ -14,12 +14,12 @@ namespace resumef
this->_initor = handler;
}

inline void state_base_t::promise_await_resume()
inline void state_future_t::promise_await_resume()
{
}

template<class _PromiseT, typename _Enable>
inline void state_base_t::promise_final_suspend(coroutine_handle<_PromiseT> handler)
inline void state_future_t::promise_final_suspend(coroutine_handle<_PromiseT> handler)
{
scoped_lock<lock_type> __guard(this->_mtx);

@@ -34,13 +34,13 @@ namespace resumef
}

template<class _PromiseT, typename _Enable>
inline void state_base_t::future_await_suspend(coroutine_handle<_PromiseT> handler)
inline void state_future_t::future_await_suspend(coroutine_handle<_PromiseT> handler)
{
scoped_lock<lock_type> __guard(this->_mtx);

_PromiseT& promise = handler.promise();

state_base_t* parent_state = promise._state.get();
auto* parent_state = promise._state.get();
scheduler_t* sch = parent_state->get_scheduler();
if (this != parent_state)
{

+ 1
- 1
tutorial/test_async_yield_return.cpp View File

@@ -19,7 +19,7 @@ auto test_yield_int() -> std::experimental::generator<int>
std::cout << "3 will yield return" << std::endl;
co_yield 3;
std::cout << "4 will return" << std::endl;
return 4;
co_return 4;
std::cout << "5 will never yield return" << std::endl;
co_yield 5;

+ 2
- 4
vs_proj/librf.vcxproj View File

@@ -263,10 +263,7 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\tutorial\test_async_yield_return.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\tutorial\test_async_yield_return.cpp" />
<ClCompile Include="librf.cpp">
<BasicRuntimeChecks Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Default</BasicRuntimeChecks>
<BufferSecurityCheck Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">false</BufferSecurityCheck>
@@ -282,6 +279,7 @@
<ClInclude Include="..\librf\src\def.h" />
<ClInclude Include="..\librf\src\event.h" />
<ClInclude Include="..\librf\src\future.h" />
<ClInclude Include="..\librf\src\generator.h" />
<ClInclude Include="..\librf\src\promise.h" />
<ClInclude Include="..\librf\src\task_list.h" />
<ClInclude Include="..\librf\src\mutex.h" />

+ 3
- 0
vs_proj/librf.vcxproj.filters View File

@@ -174,6 +174,9 @@
<ClInclude Include="..\librf\src\awaitable.h">
<Filter>librf\src</Filter>
</ClInclude>
<ClInclude Include="..\librf\src\generator.h">
<Filter>librf\src</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<None Include="..\librf\src\asio_task_1.12.0.inl">

Loading…
Cancel
Save