Browse Source

优化state占用的内存

tags/v2.9.7
tearshark 4 years ago
parent
commit
421e884680

+ 1
- 0
librf/librf.h View File

@@ -24,6 +24,7 @@
#include "src/scheduler.h"
#include "src/promise.inl"
#include "src/state.inl"
#include "src/generator.h"
#include "src/sleep.h"
#include "src/when.h"

+ 0
- 17
librf/src/counted_ptr.h View File

@@ -3,23 +3,6 @@
namespace resumef
{
template<class _Derived>
struct counted_t
{
std::atomic<intptr_t> _count{ 0 };
void lock()
{
++_count;
}
void unlock()
{
if (--_count == 0)
delete static_cast<_Derived*>(this);
}
};
template <typename T>
struct counted_ptr
{

+ 3
- 5
librf/src/def.h View File

@@ -19,8 +19,6 @@
#include <experimental/coroutine>
//#include <experimental/generator>
//替代<experimental/generator>,因为VS2015/VS2017的generator<>未实现return_value,导致yield后不能return
#include "generator.h"
#define LIB_RESUMEF_VERSION 200000 // 2.0.0
@@ -32,6 +30,9 @@ namespace resumef
struct future_t;
using future_vt [[deprecated]] = future_t<>;
template <typename _Ty = std::nullptr_t, typename _Alloc = std::allocator<char>>
struct generator_t;
template<class _Ty = void>
struct promise_t;
@@ -41,9 +42,6 @@ namespace resumef
//获得当前线程下的调度器
scheduler_t* this_scheduler();
template <typename _Ty = std::nullptr_t, typename _Alloc = std::allocator<char>>
using generator_t = std::experimental::generator<_Ty, _Alloc>;
template<typename _PromiseT = void>
using coroutine_handle = std::experimental::coroutine_handle<_PromiseT>;

+ 173
- 180
librf/src/generator.h View File

@@ -1,240 +1,233 @@
/***
*generator
*
* Copyright (c) Microsoft Corporation. All rights reserved.
*
* Purpose: Library support of coroutines. generator class
* http://open-std.org/JTC1/SC22/WG21/docs/papers/2015/p0057r0.pdf
*
* [Public]
*
****/
/*
* Modify from <experimental/generator_t.h>
* Purpose: Library support of coroutines. generator_t class
* http://open-std.org/JTC1/SC22/WG21/docs/papers/2015/p0057r0.pdf
*/
#pragma once
#ifndef _EXPERIMENTAL_GENERATOR_
#define _EXPERIMENTAL_GENERATOR_
#ifndef RC_INVOKED
#include <experimental/coroutine>
#pragma pack(push,_CRT_PACKING)
#pragma push_macro("new")
#undef new
namespace std {
namespace experimental {
namespace resumef
{
template <typename _Ty, typename promise_type>
struct generator_iterator;
template<typename promise_type>
struct generator_iterator<void, promise_type>
{
typedef std::input_iterator_tag iterator_category;
typedef ptrdiff_t difference_type;
template <typename _Ty, typename promise_type>
struct generator_iterator;
coroutine_handle<promise_type> _Coro;
template<typename promise_type>
struct generator_iterator<void, promise_type>
generator_iterator(nullptr_t) : _Coro(nullptr)
{
typedef std::input_iterator_tag iterator_category;
typedef ptrdiff_t difference_type;
}
coroutine_handle<promise_type> _Coro;
generator_iterator(coroutine_handle<promise_type> _CoroArg) : _Coro(_CoroArg)
{
}
generator_iterator(nullptr_t) : _Coro(nullptr)
{
}
generator_iterator& operator++()
{
_Coro.resume();
if (_Coro.done())
_Coro = nullptr;
return *this;
}
generator_iterator(coroutine_handle<promise_type> _CoroArg) : _Coro(_CoroArg)
{
}
void operator++(int)
{
// This postincrement operator meets the requirements of the Ranges TS
// InputIterator concept, but not those of Standard C++ InputIterator.
++* this;
}
bool operator==(generator_iterator const& right_) const
{
return _Coro == right_._Coro;
}
bool operator!=(generator_iterator const& right_) const
{
return !(*this == right_);
}
};
template <typename promise_type>
struct generator_iterator<nullptr_t, promise_type> : public generator_iterator<void, promise_type>
{
generator_iterator(nullptr_t) : generator_iterator<void, promise_type>(nullptr)
{
}
generator_iterator(coroutine_handle<promise_type> _CoroArg) : generator_iterator<void, promise_type>(_CoroArg)
{
}
};
template <typename _Ty, typename promise_type>
struct generator_iterator : public generator_iterator<void, promise_type>
{
using value_type = _Ty;
using reference = _Ty const&;
using pointer = _Ty const*;
generator_iterator& operator++()
generator_iterator(nullptr_t) : generator_iterator<void, promise_type>(nullptr)
{
}
generator_iterator(coroutine_handle<promise_type> _CoroArg) : generator_iterator<void, promise_type>(_CoroArg)
{
}
reference operator*() const
{
return *this->_Coro.promise()._CurrentValue;
}
pointer operator->() const
{
return this->_Coro.promise()._CurrentValue;
}
};
template <typename _Ty, typename _Alloc>
struct generator_t
{
using value_type = _Ty;
using state_type = state_generator_t;
struct promise_type
{
using value_type = _Ty;
using state_type = state_generator_t;
using future_type = generator_t<value_type>;
_Ty const* _CurrentValue;
promise_type& get_return_object()
{
_Coro.resume();
if (_Coro.done())
_Coro = nullptr;
return *this;
}
void operator++(int)
bool initial_suspend()
{
// This postincrement operator meets the requirements of the Ranges TS
// InputIterator concept, but not those of Standard C++ InputIterator.
++* this;
return (true);
}
bool operator==(generator_iterator const& right_) const
bool final_suspend()
{
return _Coro == right_._Coro;
return (true);
}
bool operator!=(generator_iterator const& right_) const
void yield_value(_Ty const& _Value)
{
return !(*this == right_);
_CurrentValue = std::addressof(_Value);
}
};
template <typename promise_type>
struct generator_iterator<nullptr_t, promise_type> : public generator_iterator<void, promise_type>
{
generator_iterator(nullptr_t) : generator_iterator<void, promise_type>(nullptr)
template<class = std::enable_if_t<!std::is_same_v<_Ty, void>, _Ty>>
void return_value(_Ty const& _Value)
{
_CurrentValue = std::addressof(_Value);
}
generator_iterator(coroutine_handle<promise_type> _CoroArg) : generator_iterator<void, promise_type>(_CoroArg)
template<class = std::enable_if_t<std::is_same_v<_Ty, void>, _Ty>>
void return_value()
{
_CurrentValue = nullptr;
}
};
template <typename _Ty, typename promise_type>
struct generator_iterator : public generator_iterator<void, promise_type>
{
using value_type = _Ty;
using reference = _Ty const&;
using pointer = _Ty const*;
generator_iterator(nullptr_t) : generator_iterator<void, promise_type>(nullptr)
{
}
generator_iterator(coroutine_handle<promise_type> _CoroArg) : generator_iterator<void, promise_type>(_CoroArg)
template <typename _Uty>
_Uty&& await_transform(_Uty&& _Whatever)
{
static_assert(std::is_same_v<_Uty, void>,
"co_await is not supported in coroutines of type std::experiemental::generator_t");
return std::forward<_Uty>(_Whatever);
}
reference operator*() const
using _Alloc_char = typename std::allocator_traits<_Alloc>::template rebind_alloc<char>;
static_assert(std::is_same_v<char*, typename std::allocator_traits<_Alloc_char>::pointer>,
"generator_t does not support allocators with fancy pointer types");
static_assert(std::allocator_traits<_Alloc_char>::is_always_equal::value,
"generator_t only supports stateless allocators");
void* operator new(size_t _Size)
{
return *this->_Coro.promise()._CurrentValue;
_Alloc_char _Al;
void* ptr = _Al.allocate(_Size);
return ptr;
}
pointer operator->() const
void operator delete(void* _Ptr, size_t _Size)
{
return this->_Coro.promise()._CurrentValue;
_Alloc_char _Al;
return _Al.deallocate(static_cast<char*>(_Ptr), _Size);
}
};
template <typename _Ty, typename _Alloc = allocator<char>>
struct generator
typedef generator_iterator<_Ty, promise_type> iterator;
iterator begin()
{
struct promise_type
if (_Coro)
{
_Ty const* _CurrentValue;
promise_type& get_return_object()
{
return *this;
}
bool initial_suspend()
{
return (true);
}
bool final_suspend()
{
return (true);
}
void yield_value(_Ty const& _Value)
{
_CurrentValue = std::addressof(_Value);
}
template<class = std::enable_if_t<!std::is_same_v<_Ty, void>, _Ty>>
void return_value(_Ty const& _Value)
{
_CurrentValue = std::addressof(_Value);
}
template<class = std::enable_if_t<std::is_same_v<_Ty, void>, _Ty>>
void return_value()
{
_CurrentValue = nullptr;
}
template <typename _Uty>
_Uty&& await_transform(_Uty&& _Whatever)
{
static_assert(std::is_same_v<_Uty, void>,
"co_await is not supported in coroutines of type std::experiemental::generator");
return std::forward<_Uty>(_Whatever);
}
using _Alloc_char = typename allocator_traits<_Alloc>::template rebind_alloc<char>;
static_assert(std::is_same_v<char*, typename std::allocator_traits<_Alloc_char>::pointer>,
"generator does not support allocators with fancy pointer types");
static_assert(std::allocator_traits<_Alloc_char>::is_always_equal::value,
"generator only supports stateless allocators");
void* operator new(size_t _Size)
{
_Alloc_char _Al;
return _Al.allocate(_Size);
}
void operator delete(void* _Ptr, size_t _Size)
{
_Alloc_char _Al;
return _Al.deallocate(static_cast<char*>(_Ptr), _Size);
}
};
typedef generator_iterator<_Ty, promise_type> iterator;
iterator begin()
{
if (_Coro)
{
_Coro.resume();
if (_Coro.done())
return{ nullptr };
}
return { _Coro };
_Coro.resume();
if (_Coro.done())
return{ nullptr };
}
return { _Coro };
}
iterator end()
{
return{ nullptr };
}
iterator end()
{
return{ nullptr };
}
explicit generator(promise_type& _Prom)
: _Coro(coroutine_handle<promise_type>::from_promise(_Prom))
{
}
explicit generator_t(promise_type& _Prom)
: _Coro(coroutine_handle<promise_type>::from_promise(_Prom))
{
}
generator() = default;
generator_t() = default;
generator(generator const&) = delete;
generator_t(generator_t const&) = delete;
generator& operator=(generator const&) = delete;
generator_t& operator=(generator_t const&) = delete;
generator(generator&& right_) noexcept
: _Coro(right_._Coro)
{
right_._Coro = nullptr;
}
generator_t(generator_t&& right_) noexcept
: _Coro(right_._Coro)
{
right_._Coro = nullptr;
}
generator& operator=(generator&& right_) noexcept
{
if (this != std::addressof(right_)) {
_Coro = right_._Coro;
right_._Coro = nullptr;
}
return *this;
generator_t& operator=(generator_t&& right_) noexcept
{
if (this != std::addressof(right_)) {
_Coro = right_._Coro;
right_._Coro = nullptr;
}
return *this;
}
~generator()
{
if (_Coro) {
_Coro.destroy();
}
~generator_t()
{
if (_Coro) {
_Coro.destroy();
}
}
coroutine_handle<promise_type> detach()
{
auto t = _Coro;
_Coro = nullptr;
return t;
}
coroutine_handle<promise_type> detach()
{
auto t = _Coro;
_Coro = nullptr;
return t;
}
private:
coroutine_handle<promise_type> _Coro = nullptr;
};
private:
coroutine_handle<promise_type> _Coro = nullptr;
};
} // namespace experimental
} // namespace std
} // namespace resumef
#pragma pop_macro("new")
#pragma pack(pop)
#endif /* RC_INVOKED */
#endif /* _EXPERIMENTAL_GENERATOR_ */

+ 25
- 1
librf/src/promise.h View File

@@ -13,9 +13,9 @@ namespace resumef
using state_type = state_t<value_type>;
using promise_type = promise_t<value_type>;
using future_type = future_t<value_type>;
using lock_type = typename state_type::lock_type;

counted_ptr<state_type> _state = make_counted<state_type>(false);
char __xxx[16];

promise_impl_t() {}
promise_impl_t(promise_impl_t&& _Right) noexcept = default;
@@ -31,6 +31,25 @@ namespace resumef
#endif
future_type get_return_object();
void cancellation_requested();

static const size_t _ALIGN_REQ = sizeof(void*) * 2;

using _Alloc_char = std::allocator<char>;
void* operator new(size_t _Size)
{
std::cout << "promise::new, size=" << _Size << std::endl;

_Alloc_char _Al;
size_t _State_size = ((sizeof(state_type) + _ALIGN_REQ - 1) & ~(_ALIGN_REQ - 1));
char* ptr = _Al.allocate(_Size + _State_size);
return ptr + _State_size;
}

void operator delete(void* _Ptr, size_t _Size)
{
_Alloc_char _Al;
return _Al.deallocate(static_cast<char*>(_Ptr), _Size);
}
};

template<class _Ty>
@@ -52,5 +71,10 @@ namespace resumef
void yield_value();
};

template<class _Ty = void>
constexpr size_t promise_align_size()
{
return (sizeof(promise_t<_Ty>) + promise_t<_Ty>::_ALIGN_REQ - 1) & ~(promise_t<_Ty>::_ALIGN_REQ - 1);
}
}


+ 15
- 13
librf/src/state.cpp View File

@@ -38,26 +38,32 @@ namespace resumef
void state_future_t::resume()
{
coroutine_handle<> handler;
scoped_lock<lock_type> __guard(_mtx);
if (_initor != nullptr)
{
handler = _initor;
coroutine_handle<> handler = _initor;
_initor = nullptr;
handler();
handler.resume();
}
else if (_coro != nullptr)
{
handler = _coro;
coroutine_handle<> handler = _coro;
_coro = nullptr;
handler();
handler.resume();
}
}
bool state_future_t::has_handler() const
{
return _initor != nullptr || _coro != nullptr;
scoped_lock<lock_type> __guard(_mtx);
return _coro != nullptr || _initor != nullptr;
}
bool state_future_t::is_ready() const
{
scoped_lock<lock_type> __guard(this->_mtx);
return _exception != nullptr || _has_value || !_is_awaitor;
}
void state_future_t::set_exception(std::exception_ptr e)
@@ -69,18 +75,14 @@ namespace resumef
if (sch != nullptr)
sch->add_ready(this);
}
bool state_t<void>::is_ready() const
{
scoped_lock<lock_type> __guard(this->_mtx);
return _is_awaitor == false || _has_value || _exception != nullptr;
}
void state_t<void>::future_await_resume()
{
scoped_lock<lock_type> __guard(this->_mtx);
if (this->_exception)
std::rethrow_exception(std::move(this->_exception));
if (!this->_has_value)
std::rethrow_exception(std::make_exception_ptr(future_exception{error_code::not_ready}));
}
void state_t<void>::set_value()

+ 33
- 22
librf/src/state.h View File

@@ -8,9 +8,21 @@
namespace resumef
{
struct state_base_t : public counted_t<state_base_t>
struct state_base_t
{
RF_API virtual ~state_base_t();
private:
std::atomic<intptr_t> _count{ 0 };
public:
void lock()
{
++_count;
}
void unlock()
{
if (--_count == 0)
delete this;
}
protected:
scheduler_t* _scheduler = nullptr;
//可能来自协程里的promise产生的,则经过co_await操作后,_coro在初始时不会为nullptr。
@@ -52,11 +64,12 @@ namespace resumef
protected:
mutable lock_type _mtx;
coroutine_handle<> _initor;
std::exception_ptr _exception;
state_future_t* _parent = nullptr;
#if RESUMEF_DEBUG_COUNTER
intptr_t _id;
#endif
std::exception_ptr _exception;
bool _has_value = false;
bool _is_awaitor;
public:
state_future_t()
@@ -76,6 +89,7 @@ namespace resumef
virtual void resume() override;
virtual bool has_handler() const override;
virtual bool is_ready() const override;
scheduler_t* get_scheduler() const
{
@@ -97,6 +111,11 @@ namespace resumef
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void future_await_suspend(coroutine_handle<_PromiseT> handler);
bool future_await_ready()
{
scoped_lock<lock_type> __guard(this->_mtx);
return _has_value;
}
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void promise_initial_suspend(coroutine_handle<_PromiseT> handler);
@@ -111,21 +130,22 @@ namespace resumef
using state_future_t::state_future_t;
using state_future_t::lock_type;
using value_type = _Ty;
protected:
std::optional<value_type> _value;
public:
virtual bool is_ready() const override
private:
union union_value_type
{
scoped_lock<lock_type> __guard(this->_mtx);
return _is_awaitor == false || _value.has_value() || _exception != nullptr;
}
value_type _value;
char _[1];
bool future_await_ready()
union_value_type() {}
~union_value_type() {}
};
union_value_type uv;
public:
~state_t()
{
scoped_lock<lock_type> __guard(this->_mtx);
return _value.has_value();
if (_has_value)
uv._value.~value_type();
}
auto future_await_resume() -> value_type;
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void promise_yield_value(_PromiseT* promise, value_type val);
@@ -138,16 +158,7 @@ namespace resumef
{
using state_future_t::state_future_t;
using state_future_t::lock_type;
protected:
std::atomic<bool> _has_value{ false };
public:
virtual bool is_ready() const override;
bool future_await_ready()
{
return _has_value;
}
void future_await_resume();
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void promise_yield_value(_PromiseT* promise);

+ 25
- 5
librf/src/state.inl View File

@@ -49,9 +49,9 @@ namespace resumef
this->_parent = parent_state;
this->_scheduler = sch;
}

if (this->_coro == nullptr)
if (_coro == nullptr)
this->_coro = handler;

if (sch != nullptr)
sch->add_await(this);
}
@@ -80,7 +80,15 @@ namespace resumef
{
scoped_lock<lock_type> __guard(this->_mtx);

this->_value = std::move(val);
if (this->_has_value)
{
this->uv._value = std::move(val);
}
else
{
this->_has_value = true;
new(&this->uv._value) value_type(std::move(val));
}

coroutine_handle<_PromiseT> handler = coroutine_handle<_PromiseT>::from_promise(*promise);
if (!handler.done())
@@ -99,7 +107,10 @@ namespace resumef
scoped_lock<lock_type> __guard(this->_mtx);
if (this->_exception)
std::rethrow_exception(std::move(this->_exception));
return std::move(this->_value.value());
if (!this->_has_value)
std::rethrow_exception(std::make_exception_ptr(future_exception{error_code::not_ready}));

return std::move(this->uv._value);
}

template<typename _Ty>
@@ -107,7 +118,16 @@ namespace resumef
{
scoped_lock<lock_type> __guard(this->_mtx);

this->_value = std::move(val);
if (this->_has_value)
{
this->uv._value = std::move(val);
}
else
{
this->_has_value = true;
new(&this->uv._value) value_type(std::move(val));
}

scheduler_t* sch = this->get_scheduler();
if (sch != nullptr)
sch->add_ready(this);

+ 23
- 8
tutorial/test_async_cb.cpp View File

@@ -7,6 +7,8 @@
#include "librf.h"
using namespace resumef;
template<class _Ctype>
void callback_get_long(int64_t val, _Ctype&& cb)
{
@@ -19,23 +21,36 @@ void callback_get_long(int64_t val, _Ctype&& cb)
}
//这种情况下,没有生成 frame-context,因此,并没有promise_type被内嵌在frame-context里
auto async_get_long(int64_t val)
future_t<int64_t> async_get_long(int64_t val)
{
resumef::awaitable_t<int64_t> st;
callback_get_long(val, [st](int64_t val)
/*
void* frame_ptr = _coro_frame_ptr();
size_t frame_size = _coro_frame_size();
std::cout << "test_routine_use_timer" << std::endl;
std::cout << "frame point=" << frame_ptr << ", size=" << frame_size << ", promise_size=" << promise_align_size<>() << std::endl;
auto handler = coroutine_handle<promise_t<>>::from_address(frame_ptr);
auto st = handler.promise()._state;
scheduler_t* sch = st->get_scheduler();
auto parent = st->get_parent();
std::cout << "st=" << st.get() << ", scheduler=" << sch << ", parent=" << parent << std::endl;
*/
resumef::awaitable_t<int64_t> awaitable;
callback_get_long(val, [awaitable](int64_t val)
{
st.set_value(val);
awaitable.set_value(val);
});
return st.get_future();
return awaitable.get_future();
}
resumef::future_t<> wait_get_long(int64_t val)
future_t<> wait_get_long(int64_t val)
{
co_await async_get_long(val);
}
//这种情况下,会生成对应的 frame-context,一个promise_type被内嵌在frame-context里
resumef::future_t<> resumable_get_long(int64_t val)
future_t<> resumable_get_long(int64_t val)
{
std::cout << val << std::endl;
val = co_await async_get_long(val);
@@ -46,7 +61,7 @@ resumef::future_t<> resumable_get_long(int64_t val)
std::cout << val << std::endl;
}
resumef::future_t<int64_t> loop_get_long(int64_t val)
future_t<int64_t> loop_get_long(int64_t val)
{
std::cout << val << std::endl;
for (int i = 0; i < 5; ++i)

+ 2
- 2
tutorial/test_async_resumable.cpp View File

@@ -26,7 +26,7 @@ void dump(size_t idx, std::string name, T start, T end, intptr_t count)
static const intptr_t N = 3000000;
//static const int N = 10;
auto yield_switch(intptr_t coro) -> std::experimental::generator<intptr_t>
auto yield_switch(intptr_t coro) -> resumef::generator_t<intptr_t>
{
for (intptr_t i = 0; i < N / coro; ++i)
co_yield i;
@@ -42,7 +42,7 @@ void resumable_switch(intptr_t coro, size_t idx)
for (intptr_t i = 0; i < coro; ++i)
{
//go yield_switch(coro);
go [=] ()->std::experimental::generator<intptr_t>
go [=] ()->resumef::generator_t<intptr_t>
{
for (intptr_t i = 0; i < N / coro; ++i)
co_yield i;

+ 12
- 2
tutorial/test_async_routine.cpp View File

@@ -12,7 +12,17 @@ using namespace resumef;
future_t<> test_routine_use_timer()
{
using namespace std::chrono;
void* frame_ptr = _coro_frame_ptr();
size_t frame_size = _coro_frame_size();
std::cout << "test_routine_use_timer" << std::endl;
std::cout << "frame point=" << frame_ptr << ", size=" << frame_size << ", promise_size=" << promise_align_size<>() << std::endl;
auto handler = coroutine_handle<promise_t<>>::from_address(frame_ptr);
auto st = handler.promise()._state;
scheduler_t* sch = st->get_scheduler();
auto parent = st->get_parent();
std::cout << "st=" << st.get() << ", scheduler=" << sch << ", parent=" << parent << std::endl;
for (size_t i = 0; i < 3; ++i)
{
@@ -36,7 +46,7 @@ future_t<> test_routine_use_timer_2()
void resumable_main_routine()
{
go test_routine_use_timer_2();
//go test_routine_use_timer();
//go test_routine_use_timer_2();
go test_routine_use_timer();
this_scheduler()->run_until_notask();
}

+ 6
- 5
vs_proj/librf.cpp View File

@@ -31,13 +31,14 @@ int main(int argc, const char* argv[])
{
(void)argc;
(void)argv;
//resumable_main_routine();
if (argc > 1)
resumable_main_benchmark_asio_client(atoi(argv[1]));
else
resumable_main_benchmark_asio_server();
//if (argc > 1)
// resumable_main_benchmark_asio_client(atoi(argv[1]));
//else
// resumable_main_benchmark_asio_server();
//resumable_main_cb();
resumable_main_cb();
//resumable_main_modern_cb();
//resumable_main_suspend_always();
//resumable_main_yield_return();

Loading…
Cancel
Save