Browse Source

优化channel的缓存数据的方式

tags/v2.9.7
tearshark 4 years ago
parent
commit
d4a95348b2
3 changed files with 120 additions and 75 deletions
  1. 85
    59
      librf/src/channel_v2.h
  2. 8
    10
      librf/src/ring_queue.h
  3. 27
    6
      tutorial/test_async_channel.cpp

+ 85
- 59
librf/src/channel_v2.h View File

{ {
namespace detail namespace detail
{ {
template<class _Ty>
struct channel_impl_v2 : public std::enable_shared_from_this<channel_impl_v2<_Ty>>
template<class _Ty, class _Opty>
struct channel_impl_v2 : public std::enable_shared_from_this<channel_impl_v2<_Ty, _Opty>>
{ {
using value_type = _Ty; using value_type = _Ty;

struct state_channel_t;
using state_read_t = state_channel_t;
using state_write_t = state_channel_t;

channel_impl_v2(size_t max_counter_);

bool try_read(value_type& val);
bool try_read_nolock(value_type& val);
void add_read_list_nolock(state_read_t* state);

bool try_write(value_type& val);
bool try_write_nolock(value_type& val);
void add_write_list_nolock(state_write_t* state);

size_t capacity() const noexcept
{
return _max_counter;
}
using optional_type = _Opty;


struct state_channel_t : public state_base_t struct state_channel_t : public state_base_t
{ {
state_channel_t(channel_impl_v2* ch, value_type& val) noexcept
state_channel_t(channel_impl_v2* ch) noexcept
: _channel(ch->shared_from_this()) : _channel(ch->shared_from_this())
, _value(&val)
{ {
} }


} }
} }


friend channel_impl_v2;
public: public:
state_channel_t* _next = nullptr; state_channel_t* _next = nullptr;
protected:
std::shared_ptr<channel_impl_v2> _channel;
error_code _error = error_code::none;
};

struct state_read_t : public state_channel_t
{
state_read_t(channel_impl_v2* ch, optional_type& val) noexcept
: state_channel_t(ch)
, _value(&val)
{
}
protected: protected:
friend channel_impl_v2; friend channel_impl_v2;
optional_type* _value;
};


std::shared_ptr<channel_impl_v2> _channel;
struct state_write_t : public state_channel_t
{
state_write_t(channel_impl_v2* ch, value_type& val) noexcept
: state_channel_t(ch)
, _value(&val)
{
}
protected:
friend channel_impl_v2;
value_type* _value; value_type* _value;
error_code _error = error_code::none;
}; };

channel_impl_v2(size_t max_counter_);

bool try_read(optional_type& val);
bool try_read_nolock(optional_type& val);
void add_read_list_nolock(state_read_t* state);

bool try_write(value_type& val);
bool try_write_nolock(value_type& val);
void add_write_list_nolock(state_write_t* state);

size_t capacity() const noexcept
{
return _max_counter;
}
private: private:
auto try_pop_reader_()->state_read_t*; auto try_pop_reader_()->state_read_t*;
auto try_pop_writer_()->state_write_t*; auto try_pop_writer_()->state_write_t*;
bool awake_one_reader_(value_type& val); bool awake_one_reader_(value_type& val);


void awake_one_writer_(); void awake_one_writer_();
bool awake_one_writer_(value_type & val);
bool awake_one_writer_(optional_type& val);


channel_impl_v2(const channel_impl_v2&) = delete; channel_impl_v2(const channel_impl_v2&) = delete;
channel_impl_v2(channel_impl_v2&&) = delete; channel_impl_v2(channel_impl_v2&&) = delete;
write_queue_type _write_awakes; //写队列 write_queue_type _write_awakes; //写队列
}; };


template<class _Ty>
channel_impl_v2<_Ty>::channel_impl_v2(size_t max_counter_)
template<class _Ty, class _Opty>
channel_impl_v2<_Ty, _Opty>::channel_impl_v2(size_t max_counter_)
: _max_counter(max_counter_) : _max_counter(max_counter_)
, _values(USE_RING_QUEUE ? max_counter_ : 0) , _values(USE_RING_QUEUE ? max_counter_ : 0)
{ {
} }


template<class _Ty>
inline bool channel_impl_v2<_Ty>::try_read(value_type& val)
template<class _Ty, class _Opty>
inline bool channel_impl_v2<_Ty, _Opty>::try_read(optional_type& val)
{ {
scoped_lock<lock_type> lock_(this->_lock); scoped_lock<lock_type> lock_(this->_lock);
return try_read_nolock(val); return try_read_nolock(val);
} }


template<class _Ty>
bool channel_impl_v2<_Ty>::try_read_nolock(value_type& val)
template<class _Ty, class _Opty>
bool channel_impl_v2<_Ty, _Opty>::try_read_nolock(optional_type& val)
{ {
if constexpr (USE_RING_QUEUE) if constexpr (USE_RING_QUEUE)
{ {
return awake_one_writer_(val); return awake_one_writer_(val);
} }


template<class _Ty>
inline void channel_impl_v2<_Ty>::add_read_list_nolock(state_read_t* state)
template<class _Ty, class _Opty>
inline void channel_impl_v2<_Ty, _Opty>::add_read_list_nolock(state_read_t* state)
{ {
assert(state != nullptr); assert(state != nullptr);
_read_awakes.push_back(state); _read_awakes.push_back(state);
} }


template<class _Ty>
inline bool channel_impl_v2<_Ty>::try_write(value_type& val)
template<class _Ty, class _Opty>
inline bool channel_impl_v2<_Ty, _Opty>::try_write(value_type& val)
{ {
scoped_lock<lock_type> lock_(this->_lock); scoped_lock<lock_type> lock_(this->_lock);
return try_write_nolock(val); return try_write_nolock(val);
} }


template<class _Ty>
bool channel_impl_v2<_Ty>::try_write_nolock(value_type& val)
template<class _Ty, class _Opty>
bool channel_impl_v2<_Ty, _Opty>::try_write_nolock(value_type& val)
{ {
if constexpr (USE_RING_QUEUE) if constexpr (USE_RING_QUEUE)
{ {
return awake_one_reader_(val); return awake_one_reader_(val);
} }


template<class _Ty>
inline void channel_impl_v2<_Ty>::add_write_list_nolock(state_write_t* state)
template<class _Ty, class _Opty>
inline void channel_impl_v2<_Ty, _Opty>::add_write_list_nolock(state_write_t* state)
{ {
assert(state != nullptr); assert(state != nullptr);
_write_awakes.push_back(state); _write_awakes.push_back(state);
} }


template<class _Ty>
auto channel_impl_v2<_Ty>::try_pop_reader_()->state_read_t*
template<class _Ty, class _Opty>
auto channel_impl_v2<_Ty, _Opty>::try_pop_reader_()->state_read_t*
{ {
if constexpr (USE_LINK_QUEUE) if constexpr (USE_LINK_QUEUE)
{ {
} }
} }


template<class _Ty>
auto channel_impl_v2<_Ty>::try_pop_writer_()->state_write_t*
template<class _Ty, class _Opty>
auto channel_impl_v2<_Ty, _Opty>::try_pop_writer_()->state_write_t*
{ {
if constexpr (USE_LINK_QUEUE) if constexpr (USE_LINK_QUEUE)
{ {
} }
} }


template<class _Ty>
void channel_impl_v2<_Ty>::awake_one_reader_()
template<class _Ty, class _Opty>
void channel_impl_v2<_Ty, _Opty>::awake_one_reader_()
{ {
state_read_t* state = try_pop_reader_(); state_read_t* state = try_pop_reader_();
if (state != nullptr) if (state != nullptr)
} }
} }


template<class _Ty>
bool channel_impl_v2<_Ty>::awake_one_reader_(value_type& val)
template<class _Ty, class _Opty>
bool channel_impl_v2<_Ty, _Opty>::awake_one_reader_(value_type& val)
{ {
state_read_t* state = try_pop_reader_(); state_read_t* state = try_pop_reader_();
if (state != nullptr) if (state != nullptr)
return false; return false;
} }


template<class _Ty>
void channel_impl_v2<_Ty>::awake_one_writer_()
template<class _Ty, class _Opty>
void channel_impl_v2<_Ty, _Opty>::awake_one_writer_()
{ {
state_write_t* state = try_pop_writer_(); state_write_t* state = try_pop_writer_();
if (state != nullptr) if (state != nullptr)
} }
} }


template<class _Ty>
bool channel_impl_v2<_Ty>::awake_one_writer_(value_type& val)
template<class _Ty, class _Opty>
bool channel_impl_v2<_Ty, _Opty>::awake_one_writer_(optional_type& val)
{ {
state_write_t* writer = try_pop_writer_(); state_write_t* writer = try_pop_writer_();
if (writer != nullptr) if (writer != nullptr)


inline namespace channel_v2 inline namespace channel_v2
{ {
template<class _Ty = bool>
//如果channel缓存的元素不能凭空产生,或者产生代价较大,则推荐第二个模板参数使用true。从而减小不必要的开销。
template<class _Ty = bool, bool _Option = false>
struct channel_t struct channel_t
{ {
using value_type = _Ty; using value_type = _Ty;
using channel_type = detail::channel_impl_v2<value_type>;

static constexpr bool use_option = _Option;
using optional_type = std::conditional_t<use_option, std::optional<value_type>, value_type>;
using channel_type = detail::channel_impl_v2<value_type, optional_type>;
using lock_type = typename channel_type::lock_type; using lock_type = typename channel_type::lock_type;


channel_t(size_t max_counter = 0) channel_t(size_t max_counter = 0)
return false; return false;
} }


_state = new state_type(_channel, static_cast<value_type&>(_value));
_state = new state_type(_channel, _value);
_state->on_await_suspend(handler); _state->on_await_suspend(handler);
_channel->add_read_list_nolock(_state.get()); _channel->add_read_list_nolock(_state.get());
_channel = nullptr; _channel = nullptr;
{ {
if (_state.get() != nullptr) if (_state.get() != nullptr)
_state->on_await_resume(); _state->on_await_resume();
return std::move(_value);

if constexpr(use_option)
return std::move(_value).value();
else
return std::move(_value);
} }
private: private:
channel_type* _channel; channel_type* _channel;
counted_ptr<state_type> _state; counted_ptr<state_type> _state;
mutable value_type _value;
mutable optional_type _value;
}; };


read_awaiter operator co_await() const noexcept read_awaiter operator co_await() const noexcept
}; };


//不支持channel_t<void> //不支持channel_t<void>
template<>
struct channel_t<void>
template<bool _Option>
struct channel_t<void, _Option>
{ {
}; };



+ 8
- 10
librf/src/ring_queue.h View File

bool full() const noexcept; bool full() const noexcept;
template<class U> template<class U>
bool try_push(U&& value) noexcept(std::is_nothrow_move_assignable_v<U>); bool try_push(U&& value) noexcept(std::is_nothrow_move_assignable_v<U>);
bool try_pop(value_type& value) noexcept(std::is_nothrow_move_assignable_v<value_type>);
template<class U>
bool try_pop(U& value) noexcept(std::is_nothrow_move_assignable_v<value_type>);
private: private:
using container_type = std::conditional_t<std::is_same_v<value_type, bool>, std::unique_ptr<optional_type[]>, std::vector<optional_type>>;
using container_type = std::unique_ptr<optional_type[]>;
container_type m_bufferPtr; container_type m_bufferPtr;
size_type m_bufferSize; size_type m_bufferSize;




template<class _Ty, bool _Option, class _Sty> template<class _Ty, bool _Option, class _Sty>
ring_queue<_Ty, _Option, _Sty>::ring_queue(size_t sz) ring_queue<_Ty, _Option, _Sty>::ring_queue(size_t sz)
: m_bufferSize(static_cast<size_type>(sz + 1))
: m_bufferPtr(new optional_type[sz + 1])
, m_bufferSize(static_cast<size_type>(sz + 1))
, m_writeIndex(0) , m_writeIndex(0)
, m_readIndex(0) , m_readIndex(0)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
, m_count(0) , m_count(0)
#endif #endif
{ {
if constexpr (std::is_same_v<value_type, bool>)
m_bufferPtr = container_type{ new optional_type[sz + 1] };
else
m_bufferPtr.resize(sz + 1);

assert(sz < (std::numeric_limits<size_type>::max)()); assert(sz < (std::numeric_limits<size_type>::max)());
} }


} }


template<class _Ty, bool _Option, class _Sty> template<class _Ty, bool _Option, class _Sty>
bool ring_queue<_Ty, _Option, _Sty>::try_pop(value_type& value) noexcept(std::is_nothrow_move_assignable_v<value_type>)
template<class U>
bool ring_queue<_Ty, _Option, _Sty>::try_pop(U& value) noexcept(std::is_nothrow_move_assignable_v<value_type>)
{ {
if (m_readIndex == m_writeIndex) if (m_readIndex == m_writeIndex)
return false; return false;
optional_type& ov = m_bufferPtr[m_readIndex]; optional_type& ov = m_bufferPtr[m_readIndex];
if constexpr (use_option) if constexpr (use_option)
{ {
value = std::move(ov.value());
value = std::move(ov).value();
ov = std::nullopt; ov = std::nullopt;
} }
else else

+ 27
- 6
tutorial/test_async_channel.cpp View File

const size_t MAX_CHANNEL_QUEUE = 5; //0, 1, 5, 10, -1 const size_t MAX_CHANNEL_QUEUE = 5; //0, 1, 5, 10, -1
future_t<> test_channel_read(const channel_t<std::string> & c)
//如果使用move_only_type来操作channel失败,说明中间过程发生了拷贝操作----这不是设计目标。
template<class _Ty>
struct move_only_type
{
_Ty value;
move_only_type() = default;
move_only_type(const _Ty& val) : value(val) {}
move_only_type(_Ty&& val) : value(std::forward<_Ty>(val)) {}
move_only_type(const move_only_type&) = delete;
move_only_type& operator =(const move_only_type&) = delete;
move_only_type(move_only_type&&) = default;
move_only_type& operator =(move_only_type&&) = default;
};
//如果channel缓存的元素不能凭空产生,或者产生代价较大,则推荐第二个模板参数使用true。从而减小不必要的开销。
using string_channel_t = channel_t<move_only_type<std::string>, true>;
//channel其实内部引用了一个channel实现体,故可以支持复制拷贝操作
future_t<> test_channel_read(string_channel_t c)
{ {
using namespace std::chrono; using namespace std::chrono;
//auto val = co_await c.read(); //auto val = co_await c.read();
auto val = co_await c; //第二种从channel读出数据的方法。利用重载operator co_await(),而不是c是一个awaitable_t。 auto val = co_await c; //第二种从channel读出数据的方法。利用重载operator co_await(),而不是c是一个awaitable_t。
std::cout << val << ":";
std::cout << val.value << ":";
std::cout << std::endl; std::cout << std::endl;
} }
#ifndef __clang__ #ifndef __clang__
} }
} }
future_t<> test_channel_write(const channel_t<std::string> & c)
future_t<> test_channel_write(string_channel_t c)
{ {
using namespace std::chrono; using namespace std::chrono;
for (size_t i = 0; i < 10; ++i) for (size_t i = 0; i < 10; ++i)
{ {
//co_await c.write(std::to_string(i)); //co_await c.write(std::to_string(i));
co_await (c << std::to_string(i)); //第二种写入数据到channel的方法。因为优先级关系,需要将'c << i'括起来
co_await(c << std::to_string(i)); //第二种写入数据到channel的方法。因为优先级关系,需要将'c << i'括起来
std::cout << "<" << i << ">:"; std::cout << "<" << i << ">:";
std::cout << std::endl; std::cout << std::endl;
void test_channel_read_first() void test_channel_read_first()
{ {
channel_t<std::string> c(MAX_CHANNEL_QUEUE);
string_channel_t c(MAX_CHANNEL_QUEUE);
go test_channel_read(c); go test_channel_read(c);
go test_channel_write(c); go test_channel_write(c);
void test_channel_write_first() void test_channel_write_first()
{ {
channel_t<std::string> c(MAX_CHANNEL_QUEUE);
string_channel_t c(MAX_CHANNEL_QUEUE);
go test_channel_write(c); go test_channel_write(c);
go test_channel_read(c); go test_channel_read(c);

Loading…
Cancel
Save