Browse Source

提升channel在多线程下的性能

3.0.0
tearshark 2 years ago
parent
commit
bd2a8e8cea
3 changed files with 39 additions and 40 deletions
  1. 2
    2
      include/librf/src/channel_v2.h
  2. 31
    31
      include/librf/src/channel_v2.inl
  3. 6
    7
      tutorial/test_async_channel.cpp

+ 2
- 2
include/librf/src/channel_v2.h View File

@@ -5,7 +5,7 @@ namespace librf
#ifndef DOXYGEN_SKIP_PROPERTY
namespace detail
{
template<class _Ty, bool _Optional>
template<class _Ty, bool _Optional, bool _OptimizationThread>
struct channel_impl_v2;
} //namespace detail

@@ -83,7 +83,7 @@ namespace detail
static constexpr bool optimization_for_multithreading = _OptimizationThread;

using optional_type = std::conditional_t<use_optional, std::optional<value_type>, value_type>;
using channel_type = detail::channel_impl_v2<value_type, use_optional>;
using channel_type = detail::channel_impl_v2<value_type, use_optional, optimization_for_multithreading>;
using lock_type = typename channel_type::lock_type;

channel_t(const channel_t&) = default;

+ 31
- 31
include/librf/src/channel_v2.inl View File

@@ -69,12 +69,12 @@ namespace detail

//-----------------------------------------------------------------------------------------------------------------------------------------

template<class _Ty, bool _Optional>
struct channel_impl_v2 : public std::enable_shared_from_this<channel_impl_v2<_Ty, _Optional>>
template<class _Ty, bool _Optional, bool _OptimizationThread>
struct channel_impl_v2 : public std::enable_shared_from_this<channel_impl_v2<_Ty, _Optional, _OptimizationThread>>
{
using value_type = _Ty;
using optional_type = std::conditional_t<_Optional, std::optional<value_type>, value_type>;
using this_type = channel_impl_v2<value_type, _Optional>;
using this_type = channel_impl_v2<value_type, _Optional, _OptimizationThread>;

using state_read_t = state_channel_t<optional_type, this_type>;
using state_write_t = state_channel_t<value_type, this_type>;
@@ -107,7 +107,7 @@ namespace detail
channel_impl_v2& operator = (const channel_impl_v2&) = delete;
channel_impl_v2& operator = (channel_impl_v2&&) = delete;

static constexpr bool USE_SPINLOCK = true;
static constexpr bool USE_SPINLOCK = !_OptimizationThread;
static constexpr bool USE_RING_QUEUE = true;
static constexpr bool USE_LINK_QUEUE = true;

@@ -117,7 +117,7 @@ namespace detail

const size_t _max_counter; //数据队列的容量上限
public:
using lock_type = std::conditional_t<USE_SPINLOCK, spinlock, std::deque<std::recursive_mutex>>;
using lock_type = std::conditional_t<USE_SPINLOCK, spinlock, std::mutex>;
lock_type _lock; //保证访问本对象是线程安全的
private:
queue_type _values; //数据队列
@@ -129,22 +129,22 @@ namespace detail

//-----------------------------------------------------------------------------------------------------------------------------------------

template<class _Ty, bool _Optional>
channel_impl_v2<_Ty, _Optional>::channel_impl_v2(size_t cache_size)
template<class _Ty, bool _Optional, bool _OptimizationThread>
channel_impl_v2<_Ty, _Optional, _OptimizationThread>::channel_impl_v2(size_t cache_size)
: _max_counter(cache_size)
, _values(USE_RING_QUEUE ? cache_size : 0)
{
}

template<class _Ty, bool _Optional>
inline bool channel_impl_v2<_Ty, _Optional>::try_read(optional_type& val)
template<class _Ty, bool _Optional, bool _OptimizationThread>
inline bool channel_impl_v2<_Ty, _Optional, _OptimizationThread>::try_read(optional_type& val)
{
scoped_lock<lock_type> lock_(this->_lock);
return try_read_nolock(val);
}

template<class _Ty, bool _Optional>
bool channel_impl_v2<_Ty, _Optional>::try_read_nolock(optional_type& val)
template<class _Ty, bool _Optional, bool _OptimizationThread>
bool channel_impl_v2<_Ty, _Optional, _OptimizationThread>::try_read_nolock(optional_type& val)
{
if constexpr (USE_RING_QUEUE)
{
@@ -169,22 +169,22 @@ namespace detail
return awake_one_writer_(val);
}

template<class _Ty, bool _Optional>
inline void channel_impl_v2<_Ty, _Optional>::add_read_list_nolock(state_read_t* state)
template<class _Ty, bool _Optional, bool _OptimizationThread>
inline void channel_impl_v2<_Ty, _Optional, _OptimizationThread>::add_read_list_nolock(state_read_t* state)
{
assert(state != nullptr);
_read_awakes.push_back(state);
}

template<class _Ty, bool _Optional>
inline bool channel_impl_v2<_Ty, _Optional>::try_write(value_type& val)
template<class _Ty, bool _Optional, bool _OptimizationThread>
inline bool channel_impl_v2<_Ty, _Optional, _OptimizationThread>::try_write(value_type& val)
{
scoped_lock<lock_type> lock_(this->_lock);
return try_write_nolock(val);
}

template<class _Ty, bool _Optional>
bool channel_impl_v2<_Ty, _Optional>::try_write_nolock(value_type& val)
template<class _Ty, bool _Optional, bool _OptimizationThread>
bool channel_impl_v2<_Ty, _Optional, _OptimizationThread>::try_write_nolock(value_type& val)
{
if constexpr (USE_RING_QUEUE)
{
@@ -208,15 +208,15 @@ namespace detail
return awake_one_reader_(val);
}

template<class _Ty, bool _Optional>
inline void channel_impl_v2<_Ty, _Optional>::add_write_list_nolock(state_write_t* state)
template<class _Ty, bool _Optional, bool _OptimizationThread>
inline void channel_impl_v2<_Ty, _Optional, _OptimizationThread>::add_write_list_nolock(state_write_t* state)
{
assert(state != nullptr);
_write_awakes.push_back(state);
}

template<class _Ty, bool _Optional>
auto channel_impl_v2<_Ty, _Optional>::try_pop_reader_()->state_read_t*
template<class _Ty, bool _Optional, bool _OptimizationThread>
auto channel_impl_v2<_Ty, _Optional, _OptimizationThread>::try_pop_reader_()->state_read_t*
{
if constexpr (USE_LINK_QUEUE)
{
@@ -234,8 +234,8 @@ namespace detail
}
}

template<class _Ty, bool _Optional>
auto channel_impl_v2<_Ty, _Optional>::try_pop_writer_()->state_write_t*
template<class _Ty, bool _Optional, bool _OptimizationThread>
auto channel_impl_v2<_Ty, _Optional, _OptimizationThread>::try_pop_writer_()->state_write_t*
{
if constexpr (USE_LINK_QUEUE)
{
@@ -253,8 +253,8 @@ namespace detail
}
}

template<class _Ty, bool _Optional>
void channel_impl_v2<_Ty, _Optional>::awake_one_reader_()
template<class _Ty, bool _Optional, bool _OptimizationThread>
void channel_impl_v2<_Ty, _Optional, _OptimizationThread>::awake_one_reader_()
{
state_read_t* state = try_pop_reader_();
if (state != nullptr)
@@ -275,8 +275,8 @@ namespace detail
}
}

template<class _Ty, bool _Optional>
bool channel_impl_v2<_Ty, _Optional>::awake_one_reader_(value_type& val)
template<class _Ty, bool _Optional, bool _OptimizationThread>
bool channel_impl_v2<_Ty, _Optional, _OptimizationThread>::awake_one_reader_(value_type& val)
{
state_read_t* state = try_pop_reader_();
if (state != nullptr)
@@ -289,8 +289,8 @@ namespace detail
return false;
}

template<class _Ty, bool _Optional>
void channel_impl_v2<_Ty, _Optional>::awake_one_writer_()
template<class _Ty, bool _Optional, bool _OptimizationThread>
void channel_impl_v2<_Ty, _Optional, _OptimizationThread>::awake_one_writer_()
{
state_write_t* state = try_pop_writer_();
if (state != nullptr)
@@ -311,8 +311,8 @@ namespace detail
}
}

template<class _Ty, bool _Optional>
bool channel_impl_v2<_Ty, _Optional>::awake_one_writer_(optional_type& val)
template<class _Ty, bool _Optional, bool _OptimizationThread>
bool channel_impl_v2<_Ty, _Optional, _OptimizationThread>::awake_one_writer_(optional_type& val)
{
state_write_t* writer = try_pop_writer_();
if (writer != nullptr)

+ 6
- 7
tutorial/test_async_channel.cpp View File

@@ -244,13 +244,12 @@ void test_channel_performance_four_coroutine(size_t capacity, size_t nThreads, i
void resumable_main_channel()
{
//test_channel_read_first();
//std::cout << std::endl;
test_channel_read_first();
std::cout << std::endl;
//test_channel_write_first();
//std::cout << std::endl;
test_channel_write_first();
std::cout << std::endl;
/*
std::cout << "single thread" << std::endl;
test_channel_performance_single_thread(1);
test_channel_performance_single_thread(10);
@@ -262,13 +261,13 @@ void resumable_main_channel()
test_channel_performance_double_thread(10);
test_channel_performance_double_thread(100);
test_channel_performance_double_thread(1000);
*/
std::cout << "four thread" << std::endl;
//test_channel_performance_four_thread(1);
test_channel_performance_four_thread(1);
test_channel_performance_four_thread(1000);
std::cout << "four coroutine" << std::endl;
test_channel_performance_four_coroutine(1, 4, N);
test_channel_performance_four_coroutine(1000, 4, N);
}

Loading…
Cancel
Save