Browse Source

channel为多线程优化提供手段

tags/v2.9.7
tearshark 4 years ago
parent
commit
a14fd0d640
3 changed files with 66 additions and 41 deletions
  1. 7
    5
      librf/src/channel_v2.h
  2. 57
    34
      librf/src/channel_v2.inl
  3. 2
    2
      tutorial/test_async_channel.cpp

+ 7
- 5
librf/src/channel_v2.h View File

inline namespace channel_v2 inline namespace channel_v2
{ {
//如果channel缓存的元素不能凭空产生,或者产生代价较大,则推荐第二个模板参数使用true。从而减小不必要的开销。 //如果channel缓存的元素不能凭空产生,或者产生代价较大,则推荐第二个模板参数使用true。从而减小不必要的开销。
template<class _Ty = bool, bool _Option = false>
template<class _Ty = bool, bool _Optional = false, bool _OptimizationThread = false>
struct channel_t struct channel_t
{ {
struct [[nodiscard]] read_awaiter; struct [[nodiscard]] read_awaiter;


using value_type = _Ty; using value_type = _Ty;


static constexpr bool use_option = _Option;
using optional_type = std::conditional_t<use_option, std::optional<value_type>, value_type>;
static constexpr bool use_optional = _Optional;
static constexpr bool optimization_for_multithreading = _OptimizationThread;

using optional_type = std::conditional_t<use_optional, std::optional<value_type>, value_type>;
using channel_type = detail::channel_impl_v2<value_type, optional_type>; using channel_type = detail::channel_impl_v2<value_type, optional_type>;
using lock_type = typename channel_type::lock_type; using lock_type = typename channel_type::lock_type;


}; };


//不支持channel_t<void> //不支持channel_t<void>
template<bool _Option>
struct channel_t<void, _Option>
template<bool _Option, bool _OptimizationThread>
struct channel_t<void, _Option, _OptimizationThread>
{ {
}; };



+ 57
- 34
librf/src/channel_v2.inl View File



state_channel_t(_Chty* ch, value_type& val) noexcept state_channel_t(_Chty* ch, value_type& val) noexcept
: _channel(ch->shared_from_this()) : _channel(ch->shared_from_this())
, _value(&val)
, _value(std::addressof(val))
{ {
} }


else else
{ {
assert(_values.size() < _max_counter); assert(_values.size() < _max_counter);
_values.push_back(*state->_value);
_values.push_back(std::move(*state->_value));
} }


state->on_notify(); state->on_notify();


inline namespace channel_v2 inline namespace channel_v2
{ {
template<class _Ty, bool _Option>
struct [[nodiscard]] channel_t<_Ty, _Option>::read_awaiter
template<class _Ty, bool _Optional, bool _OptimizationThread>
struct [[nodiscard]] channel_t<_Ty, _Optional, _OptimizationThread>::read_awaiter
{ {
using state_type = typename channel_type::state_read_t; using state_type = typename channel_type::state_read_t;


_channel->try_read(_value); _channel->try_read(_value);
} }


bool await_ready() const noexcept
bool await_ready()
{ {
//在多线程竞争较为多的时候,先检查是否可用,可以稍微提高点效率
if constexpr (optimization_for_multithreading)
{
scoped_lock<lock_type> lock_(_channel->_lock);

if (_channel->try_read_nolock(_value))
{
_channel = nullptr;
return true;
}
}
return false; return false;
} }
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>> template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
if (_state.get() != nullptr) if (_state.get() != nullptr)
_state->on_await_resume(); _state->on_await_resume();


if constexpr (use_option)
if constexpr (use_optional)
return std::move(_value).value(); return std::move(_value).value();
else else
return std::move(_value); return std::move(_value);
} }
private: private:
channel_type* _channel; channel_type* _channel;
counted_ptr<state_type> _state;
counted_ptr<state_type> _state; //延迟到await_suspend()里创建,减小不必要的内存申请
mutable optional_type _value; mutable optional_type _value;
}; };


template<class _Ty, bool _Option>
struct [[nodiscard]] channel_t<_Ty, _Option>::write_awaiter
template<class _Ty, bool _Optional, bool _OptimizationThread>
struct [[nodiscard]] channel_t<_Ty, _Optional, _OptimizationThread>::write_awaiter
{ {
using state_type = typename channel_type::state_write_t; using state_type = typename channel_type::state_write_t;


write_awaiter(channel_type* ch, value_type val) noexcept(std::is_move_constructible_v<value_type>)
template<class U>
write_awaiter(channel_type* ch, U&& val) noexcept(std::is_move_constructible_v<value_type>)
: _channel(ch) : _channel(ch)
, _value(std::move(val))
, _value(std::forward<U>(val))
{} {}


~write_awaiter() ~write_awaiter()
{//为了不在协程中也能正常使用 {//为了不在协程中也能正常使用
if (_channel != nullptr) if (_channel != nullptr)
_channel->try_write(static_cast<value_type&>(_value));
_channel->try_write(_value);
} }


bool await_ready() const noexcept
bool await_ready()
{ {
//在多线程竞争较为多的时候,先检查是否可用,可以稍微提高点效率
if constexpr (optimization_for_multithreading)
{
scoped_lock<lock_type> lock_(_channel->_lock);

if (_channel->try_write_nolock(_value))
{
_channel = nullptr;
return true;
}
}
return false; return false;
} }
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>> template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
{ {
scoped_lock<lock_type> lock_(_channel->_lock); scoped_lock<lock_type> lock_(_channel->_lock);


if (_channel->try_write_nolock(static_cast<value_type&>(_value)))
if (_channel->try_write_nolock(_value))
{ {
_channel = nullptr; _channel = nullptr;
return false; return false;
} }


_state = new state_type(_channel, static_cast<value_type&>(_value));
_state = new state_type(_channel, _value);
_state->on_await_suspend(handler); _state->on_await_suspend(handler);
_channel->add_write_list_nolock(_state.get()); _channel->add_write_list_nolock(_state.get());
_channel = nullptr; _channel = nullptr;
} }
private: private:
channel_type* _channel; channel_type* _channel;
counted_ptr<state_type> _state;
counted_ptr<state_type> _state; //延迟到await_suspend()里创建,减小不必要的内存申请
mutable value_type _value; mutable value_type _value;
}; };


template<class _Ty, bool _Option>
channel_t<_Ty, _Option>::channel_t(size_t max_counter)
template<class _Ty, bool _Optional, bool _OptimizationThread>
channel_t<_Ty, _Optional, _OptimizationThread>::channel_t(size_t max_counter)
:_chan(std::make_shared<channel_type>(max_counter)) :_chan(std::make_shared<channel_type>(max_counter))
{ {


} }


template<class _Ty, bool _Option>
size_t channel_t<_Ty, _Option>::capacity() const noexcept
template<class _Ty, bool _Optional, bool _OptimizationThread>
size_t channel_t<_Ty, _Optional, _OptimizationThread>::capacity() const noexcept
{ {
return _chan->capacity(); return _chan->capacity();
} }


template<class _Ty, bool _Option>
typename channel_t<_Ty, _Option>::read_awaiter
channel_t<_Ty, _Option>::operator co_await() const noexcept
template<class _Ty, bool _Optional, bool _OptimizationThread>
typename channel_t<_Ty, _Optional, _OptimizationThread>::read_awaiter
channel_t<_Ty, _Optional, _OptimizationThread>::operator co_await() const noexcept
{ {
return { _chan.get() }; return { _chan.get() };
} }


template<class _Ty, bool _Option>
typename channel_t<_Ty, _Option>::read_awaiter
channel_t<_Ty, _Option>::read() const noexcept
template<class _Ty, bool _Optional, bool _OptimizationThread>
typename channel_t<_Ty, _Optional, _OptimizationThread>::read_awaiter
channel_t<_Ty, _Optional, _OptimizationThread>::read() const noexcept
{ {
return { _chan.get() }; return { _chan.get() };
} }


template<class _Ty, bool _Option>
template<class _Ty, bool _Optional, bool _OptimizationThread>
template<class U> template<class U>
typename channel_t<_Ty, _Option>::write_awaiter
channel_t<_Ty, _Option>::write(U&& val) const noexcept(std::is_move_constructible_v<U>)
typename channel_t<_Ty, _Optional, _OptimizationThread>::write_awaiter
channel_t<_Ty, _Optional, _OptimizationThread>::write(U&& val) const noexcept(std::is_move_constructible_v<U>)
{ {
return write_awaiter{ _chan.get(), std::move(val) };
return write_awaiter{ _chan.get(), std::forward<U>(val) };
} }


template<class _Ty, bool _Option>
template<class _Ty, bool _Optional, bool _OptimizationThread>
template<class U> template<class U>
typename channel_t<_Ty, _Option>::write_awaiter
channel_t<_Ty, _Option>::operator << (U&& val) const noexcept(std::is_move_constructible_v<U>)
typename channel_t<_Ty, _Optional, _OptimizationThread>::write_awaiter
channel_t<_Ty, _Optional, _OptimizationThread>::operator << (U&& val) const noexcept(std::is_move_constructible_v<U>)
{ {
return write_awaiter{ _chan.get(), std::move(val) };
return write_awaiter{ _chan.get(), std::forward<U>(val) };
} }


} //namespace channel_v2 } //namespace channel_v2

+ 2
- 2
tutorial/test_async_channel.cpp View File

}; };
//如果channel缓存的元素不能凭空产生,或者产生代价较大,则推荐第二个模板参数使用true。从而减小不必要的开销。 //如果channel缓存的元素不能凭空产生,或者产生代价较大,则推荐第二个模板参数使用true。从而减小不必要的开销。
using string_channel_t = channel_t<move_only_type<std::string>, true>;
using string_channel_t = channel_t<move_only_type<std::string>, false, true>;
//channel其实内部引用了一个channel实现体,故可以支持复制拷贝操作 //channel其实内部引用了一个channel实现体,故可以支持复制拷贝操作
future_t<> test_channel_read(string_channel_t c) future_t<> test_channel_read(string_channel_t c)
{ {
//1的话,效率跟golang比,有点惨不忍睹。 //1的话,效率跟golang比,有点惨不忍睹。
//1000的话,由于几乎不需要调度器接入,效率就很高了,随便过千万数量级。 //1000的话,由于几乎不需要调度器接入,效率就很高了,随便过千万数量级。
channel_t<int> c{ buff_size };
channel_t<int, false, true> c{ buff_size };
go[&]() -> future_t<> go[&]() -> future_t<>
{ {

Loading…
Cancel
Save