Browse Source

将state和协程所用的内存分配到一起。

处理好协程的析构
tags/v2.9.7
tearshark 4 years ago
parent
commit
9dcb2d8baf

+ 13
- 1
librf/src/awaitable.h View File

using state_type = state_t<value_type>; using state_type = state_t<value_type>;
using future_type = future_t<value_type>; using future_type = future_t<value_type>;
using lock_type = typename state_type::lock_type; using lock_type = typename state_type::lock_type;
using _Alloc_char = typename state_type::_Alloc_char;


awaitable_impl_t() {} awaitable_impl_t() {}
awaitable_impl_t(const awaitable_impl_t&) = default; awaitable_impl_t(const awaitable_impl_t&) = default;
return future_type{ this->_state }; return future_type{ this->_state };
} }


mutable counted_ptr<state_type> _state = make_counted<state_type>(true);
mutable counted_ptr<state_type> _state = _Alloc_state();
private:
static state_type* _Alloc_state()
{
_Alloc_char _Al;
size_t _Size = sizeof(state_type);
#if RESUMEF_DEBUG_COUNTER
std::cout << "awaitable_t::alloc, size=" << _Size << std::endl;
#endif
char * _Ptr = _Al.allocate(_Size);
return new(_Ptr) state_type(true);
}
}; };


template<class _Ty> template<class _Ty>

+ 27
- 12
librf/src/promise.h View File

using promise_type = promise_t<value_type>; using promise_type = promise_t<value_type>;
using future_type = future_t<value_type>; using future_type = future_t<value_type>;


counted_ptr<state_type> _state = make_counted<state_type>(false);
char __xxx[16];
promise_impl_t()
{
state_type* st = get_state();
new(st) state_type();
st->lock();
}


promise_impl_t() {}
promise_impl_t(promise_impl_t&& _Right) noexcept = default; promise_impl_t(promise_impl_t&& _Right) noexcept = default;
promise_impl_t& operator = (promise_impl_t&& _Right) noexcept = default; promise_impl_t& operator = (promise_impl_t&& _Right) noexcept = default;
promise_impl_t(const promise_impl_t&) = delete; promise_impl_t(const promise_impl_t&) = delete;
promise_impl_t& operator = (const promise_impl_t&) = delete; promise_impl_t& operator = (const promise_impl_t&) = delete;


state_type* get_state()
{
size_t _State_size = _Align_size<state_type>();
char* ptr = reinterpret_cast<char*>(this) - _State_size;
return reinterpret_cast<state_type*>(ptr);
}

suspend_on_initial initial_suspend() noexcept; suspend_on_initial initial_suspend() noexcept;
suspend_on_final final_suspend() noexcept; suspend_on_final final_suspend() noexcept;
void set_exception(std::exception_ptr e); void set_exception(std::exception_ptr e);
using _Alloc_char = std::allocator<char>; using _Alloc_char = std::allocator<char>;
void* operator new(size_t _Size) void* operator new(size_t _Size)
{ {
std::cout << "promise::new, size=" << _Size << std::endl;
size_t _State_size = _Align_size<state_type>();
assert(_Size >= sizeof(uint32_t) && _Size < std::numeric_limits<uint32_t>::max() - sizeof(_State_size));
#if RESUMEF_DEBUG_COUNTER
std::cout << "promise::new, size=" << (_Size + _State_size) << std::endl;
#endif


_Alloc_char _Al; _Alloc_char _Al;
size_t _State_size = _Align_size<state_type>();
char* ptr = _Al.allocate(_Size + _State_size); char* ptr = _Al.allocate(_Size + _State_size);
return ptr;
return ptr + _State_size;
} }


void operator delete(void* _Ptr, size_t _Size) void operator delete(void* _Ptr, size_t _Size)
{ {
size_t _State_size = _Align_size<state_type>();
assert(_Size >= sizeof(uint32_t) && _Size < std::numeric_limits<uint32_t>::max() - sizeof(_State_size));

_Alloc_char _Al; _Alloc_char _Al;
return _Al.deallocate(static_cast<char*>(_Ptr), _Size);
state_type* st = reinterpret_cast<state_type*>(static_cast<char*>(_Ptr) - _State_size);
st->set_alloc_size(static_cast<uint32_t>(_Size + _State_size));
st->unlock();
} }
}; };


template<class _Ty> template<class _Ty>
struct promise_t : public promise_impl_t<_Ty>
struct promise_t sealed : public promise_impl_t<_Ty>
{ {
using typename promise_impl_t<_Ty>::value_type; using typename promise_impl_t<_Ty>::value_type;
using promise_impl_t<_Ty>::_state;


void return_value(value_type val); void return_value(value_type val);
void yield_value(value_type val); void yield_value(value_type val);
}; };


template<> template<>
struct promise_t<void> : public promise_impl_t<void>
struct promise_t<void> sealed : public promise_impl_t<void>
{ {
using promise_impl_t<void>::_state;

void return_void(); void return_void();
void yield_value(); void yield_value();
}; };

+ 8
- 8
librf/src/promise.inl View File

template <typename _Ty> template <typename _Ty>
inline suspend_on_initial promise_impl_t<_Ty>::initial_suspend() noexcept inline suspend_on_initial promise_impl_t<_Ty>::initial_suspend() noexcept
{ {
return { _state.get() };
return { get_state() };
} }


template <typename _Ty> template <typename _Ty>
inline suspend_on_final promise_impl_t<_Ty>::final_suspend() noexcept inline suspend_on_final promise_impl_t<_Ty>::final_suspend() noexcept
{ {
return { _state.get() };
return { get_state() };
} }


template <typename _Ty> template <typename _Ty>
inline void promise_impl_t<_Ty>::set_exception(std::exception_ptr e) inline void promise_impl_t<_Ty>::set_exception(std::exception_ptr e)
{ {
_state->set_exception(std::move(e));
get_state()->set_exception(std::move(e));
} }


#ifdef __clang__ #ifdef __clang__
template <typename _Ty> template <typename _Ty>
inline future_t<_Ty> promise_impl_t<_Ty>::get_return_object() inline future_t<_Ty> promise_impl_t<_Ty>::get_return_object()
{ {
return { _state };
return { get_state() };
} }


template <typename _Ty> template <typename _Ty>
template<class _Ty> template<class _Ty>
inline void promise_t<_Ty>::return_value(value_type val) inline void promise_t<_Ty>::return_value(value_type val)
{ {
_state->set_value(std::move(val));
get_state()->set_value(std::move(val));
} }


template<class _Ty> template<class _Ty>
inline void promise_t<_Ty>::yield_value(value_type val) inline void promise_t<_Ty>::yield_value(value_type val)
{ {
_state->promise_yield_value(this, std::move(val));
get_state()->promise_yield_value(this, std::move(val));
} }


inline void promise_t<void>::return_void() inline void promise_t<void>::return_void()
{ {
_state->set_value();
get_state()->set_value();
} }


inline void promise_t<void>::yield_value() inline void promise_t<void>::yield_value()
{ {
_state->promise_yield_value(this);
get_state()->promise_yield_value(this);
} }


} }

+ 1
- 1
librf/src/scheduler.h View File

using state_vector = std::vector<state_sptr>; using state_vector = std::vector<state_sptr>;
private: private:
using lock_type = std::recursive_mutex; using lock_type = std::recursive_mutex;
using task_dictionary_type = std::unordered_map<state_base_t*, task_base_t*>;
using task_dictionary_type = std::unordered_map<state_base_t*, std::unique_ptr<task_base_t>>;
mutable lock_type _lock_running; mutable lock_type _lock_running;
state_vector _runing_states; state_vector _runing_states;

+ 37
- 3
librf/src/state.cpp View File

{ {
} }
void state_future_t::destroy_deallocate()
{
size_t _Size = this->_Alloc_size;
#if RESUMEF_DEBUG_COUNTER
std::cout << "destroy_deallocate, size=" << _Size << std::endl;
#endif
this->~state_future_t();
_Alloc_char _Al;
return _Al.deallocate(reinterpret_cast<char*>(this), _Size);
}
void state_generator_t::destroy_deallocate()
{
delete this;
}
void state_generator_t::resume() void state_generator_t::resume()
{ {
if (_coro != nullptr) if (_coro != nullptr)
void state_future_t::resume() void state_future_t::resume()
{ {
scoped_lock<lock_type> __guard(_mtx);
std::unique_lock<lock_type> __guard(_mtx);
if (_initor != nullptr)
if (_initor != nullptr && _is_initor)
{ {
coroutine_handle<> handler = _initor; coroutine_handle<> handler = _initor;
_initor = nullptr; _initor = nullptr;
__guard.unlock();
handler.resume(); handler.resume();
return;
} }
else if (_coro != nullptr)
if (_coro != nullptr)
{ {
coroutine_handle<> handler = _coro; coroutine_handle<> handler = _coro;
_coro = nullptr; _coro = nullptr;
__guard.unlock();
handler.resume(); handler.resume();
return;
}
if (_initor != nullptr && !_is_initor)
{
coroutine_handle<> handler = _initor;
_initor = nullptr;
__guard.unlock();
handler.destroy();
return;
} }
} }

+ 37
- 7
librf/src/state.h View File

{ {
struct state_base_t struct state_base_t
{ {
using _Alloc_char = std::allocator<char>;
RF_API virtual ~state_base_t(); RF_API virtual ~state_base_t();
private: private:
std::atomic<intptr_t> _count{ 0 };
std::atomic<intptr_t> _count{0};
public: public:
void lock() void lock()
{ {
void unlock() void unlock()
{ {
if (--_count == 0) if (--_count == 0)
delete this;
{
destroy_deallocate();
}
} }
protected: protected:
scheduler_t* _scheduler = nullptr; scheduler_t* _scheduler = nullptr;
// 一、经过co_await操作后,_coro在初始时不会为nullptr。 // 一、经过co_await操作后,_coro在初始时不会为nullptr。
// 二、没有co_await操作,直接加入到了调度器里,则_coro在初始时为nullptr。调度器需要特殊处理此种情况。 // 二、没有co_await操作,直接加入到了调度器里,则_coro在初始时为nullptr。调度器需要特殊处理此种情况。
coroutine_handle<> _coro; coroutine_handle<> _coro;
private:
virtual void destroy_deallocate() = 0;
public: public:
virtual void resume() = 0; virtual void resume() = 0;
virtual bool has_handler() const = 0; virtual bool has_handler() const = 0;
_coro = handler; _coro = handler;
} }
virtual void destroy_deallocate() override;
virtual void resume() override; virtual void resume() override;
virtual bool has_handler() const override; virtual bool has_handler() const override;
virtual bool is_ready() const override; virtual bool is_ready() const override;
intptr_t _id; intptr_t _id;
#endif #endif
std::exception_ptr _exception; std::exception_ptr _exception;
uint32_t _Alloc_size;
bool _has_value = false; bool _has_value = false;
bool _is_awaitor; bool _is_awaitor;
bool _is_initor = false;
public: public:
state_future_t() state_future_t()
{ {
#endif #endif
_is_awaitor = false; _is_awaitor = false;
} }
state_future_t(bool awaitor)
explicit state_future_t(bool awaitor)
{ {
#if RESUMEF_DEBUG_COUNTER #if RESUMEF_DEBUG_COUNTER
_id = ++g_resumef_state_id; _id = ++g_resumef_state_id;
_is_awaitor = awaitor; _is_awaitor = awaitor;
} }
virtual void destroy_deallocate() override;
virtual void resume() override; virtual void resume() override;
virtual bool has_handler() const override; virtual bool has_handler() const override;
virtual bool is_ready() const override; virtual bool is_ready() const override;
{ {
return _parent; return _parent;
} }
void set_alloc_size(uint32_t val)
{
_Alloc_size = val;
}
void set_exception(std::exception_ptr e); void set_exception(std::exception_ptr e);
}; };
template <typename _Ty> template <typename _Ty>
struct state_t : public state_future_t
struct state_t sealed : public state_future_t
{ {
using state_future_t::state_future_t;
using state_future_t::lock_type; using state_future_t::lock_type;
using value_type = _Ty; using value_type = _Ty;
state_t() :state_future_t()
{
_Alloc_size = sizeof(*this);
}
explicit state_t(bool awaitor) :state_future_t(awaitor)
{
_Alloc_size = sizeof(*this);
}
private: private:
union union_value_type union union_value_type
{ {
}; };
template<> template<>
struct state_t<void> : public state_future_t
struct state_t<void> sealed : public state_future_t
{ {
using state_future_t::state_future_t;
using state_future_t::lock_type; using state_future_t::lock_type;
state_t() :state_future_t()
{
_Alloc_size = sizeof(*this);
}
explicit state_t(bool awaitor) :state_future_t(awaitor)
{
_Alloc_size = sizeof(*this);
}
public: public:
void future_await_resume(); void future_await_resume();
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>> template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>

+ 6
- 3
librf/src/state.inl View File

{ {
_PromiseT& promise = handler.promise(); _PromiseT& promise = handler.promise();


state_base_t* parent_state = promise._state.get();
state_base_t* parent_state = promise.get_state();
(void)parent_state; (void)parent_state;
assert(this == parent_state); assert(this == parent_state);
assert(this->_scheduler == nullptr); assert(this->_scheduler == nullptr);
assert(this->_coro == nullptr); assert(this->_coro == nullptr);
this->_initor = handler; this->_initor = handler;
this->_is_initor = true;
} }


inline void state_future_t::promise_await_resume() inline void state_future_t::promise_await_resume()


_PromiseT& promise = handler.promise(); _PromiseT& promise = handler.promise();


state_base_t* parent_state = promise._state.get();
state_base_t* parent_state = promise.get_state();
(void)parent_state; (void)parent_state;
assert(this == parent_state); assert(this == parent_state);
this->_initor = handler;
this->_is_initor = false;


scheduler_t* sch = this->get_scheduler(); scheduler_t* sch = this->get_scheduler();
assert(sch != nullptr); assert(sch != nullptr);


_PromiseT& promise = handler.promise(); _PromiseT& promise = handler.promise();


auto* parent_state = promise._state.get();
auto* parent_state = promise.get_state();
scheduler_t* sch = parent_state->get_scheduler(); scheduler_t* sch = parent_state->get_scheduler();
if (this != parent_state) if (this != parent_state)
{ {

+ 1
- 13
tutorial/test_async_cb.cpp View File

//这种情况下,没有生成 frame-context,因此,并没有promise_type被内嵌在frame-context里 //这种情况下,没有生成 frame-context,因此,并没有promise_type被内嵌在frame-context里
future_t<int64_t> async_get_long(int64_t val) future_t<int64_t> async_get_long(int64_t val)
{ {
/*
void* frame_ptr = _coro_frame_ptr();
size_t frame_size = _coro_frame_size();
std::cout << "test_routine_use_timer" << std::endl;
std::cout << "frame point=" << frame_ptr << ", size=" << frame_size << ", promise_size=" << _Align_size<promise_t<>>() << std::endl;
auto handler = coroutine_handle<promise_t<>>::from_address(frame_ptr);
auto st = handler.promise()._state;
scheduler_t* sch = st->get_scheduler();
auto parent = st->get_parent();
std::cout << "st=" << st.get() << ", scheduler=" << sch << ", parent=" << parent << std::endl;
*/
resumef::awaitable_t<int64_t> awaitable; resumef::awaitable_t<int64_t> awaitable;
callback_get_long(val, [awaitable](int64_t val) callback_get_long(val, [awaitable](int64_t val)
{ {
std::cout << "GO:" << val << std::endl; std::cout << "GO:" << val << std::endl;
}; };
go resumable_get_long(3);
go loop_get_long(3);
resumef::this_scheduler()->run_until_notask(); resumef::this_scheduler()->run_until_notask();
} }

+ 2
- 2
tutorial/test_async_routine.cpp View File

std::cout << "frame point=" << frame_ptr << ", size=" << frame_size << ", promise_size=" << _Align_size<promise_t<>>() << std::endl; std::cout << "frame point=" << frame_ptr << ", size=" << frame_size << ", promise_size=" << _Align_size<promise_t<>>() << std::endl;
auto handler = coroutine_handle<promise_t<>>::from_address(frame_ptr); auto handler = coroutine_handle<promise_t<>>::from_address(frame_ptr);
auto st = handler.promise()._state;
auto st = handler.promise().get_state();
scheduler_t* sch = st->get_scheduler(); scheduler_t* sch = st->get_scheduler();
auto parent = st->get_parent(); auto parent = st->get_parent();
std::cout << "st=" << st.get() << ", scheduler=" << sch << ", parent=" << parent << std::endl;
std::cout << "st=" << st << ", scheduler=" << sch << ", parent=" << parent << std::endl;
for (size_t i = 0; i < 3; ++i) for (size_t i = 0; i < 3; ++i)
{ {

+ 2
- 2
vs_proj/librf.cpp View File

{ {
(void)argc; (void)argc;
(void)argv; (void)argv;
//resumable_main_routine();
resumable_main_routine();
//if (argc > 1) //if (argc > 1)
// resumable_main_benchmark_asio_client(atoi(argv[1])); // resumable_main_benchmark_asio_client(atoi(argv[1]));
//else //else
// resumable_main_benchmark_asio_server(); // resumable_main_benchmark_asio_server();
resumable_main_cb();
//resumable_main_cb();
//resumable_main_modern_cb(); //resumable_main_modern_cb();
//resumable_main_suspend_always(); //resumable_main_suspend_always();
//resumable_main_yield_return(); //resumable_main_yield_return();

+ 1
- 1
vs_proj/librf.vcxproj View File

<ClCompile> <ClCompile>
<WarningLevel>Level3</WarningLevel> <WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization> <Optimization>Disabled</Optimization>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;ASIO_STANDALONE;RESUMEF_DEBUG_COUNTER=0;RESUMEF_ENABLE_MULT_SCHEDULER=1;RESUMEF_USE_BOOST_ANY=0;_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING=1;ASIO_DISABLE_CONCEPTS=1;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;ASIO_STANDALONE;RESUMEF_DEBUG_COUNTER=1;RESUMEF_ENABLE_MULT_SCHEDULER=1;RESUMEF_USE_BOOST_ANY=0;_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING=1;ASIO_DISABLE_CONCEPTS=1;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<SDLCheck>true</SDLCheck> <SDLCheck>true</SDLCheck>
<AdditionalIncludeDirectories>..\librf;..\..\asio\asio\include;..\..\asio-1.10.6\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories> <AdditionalIncludeDirectories>..\librf;..\..\asio\asio\include;..\..\asio-1.10.6\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalOptions>/await</AdditionalOptions> <AdditionalOptions>/await</AdditionalOptions>

Loading…
Cancel
Save