Bladeren bron

整理channel_v2代码,并添加详细注释。

tags/v2.9.7
tearshark 4 jaren geleden
bovenliggende
commit
1589272e12
7 gewijzigde bestanden met toevoegingen van 104 en 53 verwijderingen
  1. 73
    7
      librf/src/channel_v2.h
  2. 19
    35
      librf/src/channel_v2.inl
  3. 2
    2
      librf/src/config.h
  4. 1
    1
      librf/src/def.h
  5. 4
    4
      librf/src/mutex_v2.h
  6. 3
    2
      test_librf.cpp
  7. 2
    2
      tutorial/test_async_channel.cpp

+ 73
- 7
librf/src/channel_v2.h Bestand weergeven

@@ -1,5 +1,6 @@
#pragma once

#ifndef DOXYGEN_SKIP_PROPERTY
RESUMEF_NS
{
namespace detail
@@ -10,25 +11,83 @@ namespace detail

inline namespace channel_v2
{
//如果channel缓存的元素不能凭空产生,或者产生代价较大,则推荐第二个模板参数使用true。从而减小不必要的开销。
template<class _Ty = bool, bool _Optional = false, bool _OptimizationThread = false>
#endif

/**
* @brief 可传递数据的模板信号量。
* @remarks 不支持数据类型为void的特例化。
* @param _Ty 传递的数据类型。要求此类型至少支持移动构造和移动赋值。
* @param _Optional 内部是否采用std::optional<>来存数据。\n
* 默认不是POD类型则采用std::optional<>。如果channel缓存的元素不能凭空产生,或者产生代价较大,则推荐将此参数设置为true,从而减小不必要的开销。
* @param _OptimizationThread 针对多线程优化。目前此算法提升效率不稳定,需要自行根据实际情况决定。
*/
template<class _Ty = bool, bool _Optional = !std::is_pod_v<_Ty>, bool _OptimizationThread = false>
struct channel_t
{
static_assert((std::is_copy_constructible_v<_Ty>&& std::is_copy_assignable_v<_Ty>) ||
(std::is_move_constructible_v<_Ty> && std::is_move_assignable_v<_Ty>));

struct [[nodiscard]] read_awaiter;
struct [[nodiscard]] write_awaiter;

channel_t(size_t max_counter = 1);
/**
* @brief 构造函数。
* @param cache_size 缓存的数量。0 表示内部无缓存。
*/
channel_t(size_t cache_size = 1);

/**
* @brief 获得缓存数量。
*/
size_t capacity() const noexcept;

/**
* @brief 在协程中从channel_t里读取一个数据。参考read()函数
*/
read_awaiter operator co_await() const noexcept;

/**
* @brief 在协程中从channel_t里读取一个数据
* @details 如果没有写入数据,则会阻塞协程。
* @remarks 无缓冲的时候,先读后写,不再抛channel_exception异常。这是跟channel_v1的区别。\n
* 在非协程中也可以使用。如果不能立即读取成功,则会阻塞线程。\n
* 但如此用法并不能获得读取的结果,仅仅用作同步手段。
* @return [co_await] value_type
*/
read_awaiter read() const noexcept;

template<class U>
write_awaiter write(U&& val) const noexcept(std::is_move_constructible_v<U>);
template<class U>
/**
* @brief 在协程中向channel_t里写入一个数据。参考write()函数
*/
template<class U
#ifndef DOXYGEN_SKIP_PROPERTY
COMMA_RESUMEF_ENABLE_IF(std::is_constructible_v<value_type, U&&>)
#endif
>
#ifndef DOXYGEN_SKIP_PROPERTY
RESUMEF_REQUIRES(std::is_constructible_v<_Ty, U&&>)
#endif
write_awaiter operator << (U&& val) const noexcept(std::is_move_constructible_v<U>);

/**
* @brief 在协程中向channel_t里写入一个数据。
* @details 在没有读操作等待时,且数据缓冲区满的情况下,则会阻塞协程。
* @remarks 在非协程中也可以使用。如果不能立即写入成功,则会阻塞线程。
* @param val 写入的数据。必须是可以成功构造_Ty(val)的类型。
* @return [co_await] void
*/
template<class U
#ifndef DOXYGEN_SKIP_PROPERTY
COMMA_RESUMEF_ENABLE_IF(std::is_constructible_v<value_type, U&&>)
#endif
>
#ifndef DOXYGEN_SKIP_PROPERTY
RESUMEF_REQUIRES(std::is_constructible_v<_Ty, U&&>)
#endif
write_awaiter write(U&& val) const noexcept(std::is_move_constructible_v<U>);


#ifndef DOXYGEN_SKIP_PROPERTY
using value_type = _Ty;

static constexpr bool use_optional = _Optional;
@@ -44,15 +103,22 @@ inline namespace channel_v2
channel_t& operator = (channel_t&&) = default;
private:
std::shared_ptr<channel_type> _chan;
#endif
};


#ifndef DOXYGEN_SKIP_PROPERTY
//不支持channel_t<void>
template<bool _Option, bool _OptimizationThread>
struct channel_t<void, _Option, _OptimizationThread>
{
};
#endif

using semaphore_t = channel_t<bool, false>;
/**
* @brief 利用channel_t重定义的信号量。
*/
using semaphore_t = channel_t<bool, false, true>;

} //namespace channel_v2
} //RESUMEF_NS

+ 19
- 35
librf/src/channel_v2.inl Bestand weergeven

@@ -54,14 +54,6 @@ namespace detail
this->_coro = handler;
}

void on_await_resume()
{
if (_error != error_code::none)
{
std::rethrow_exception(std::make_exception_ptr(channel_exception{ _error }));
}
}

friend _Chty;
public:
//为浸入式单向链表提供的next指针
@@ -73,7 +65,6 @@ namespace detail
std::shared_ptr<_Chty> _channel;
protected:
value_type* _value;
error_code _error = error_code::none;
};


@@ -90,7 +81,7 @@ namespace detail
using state_read_t = state_channel_t<optional_type, this_type>;
using state_write_t = state_channel_t<value_type, this_type>;

channel_impl_v2(size_t max_counter_);
channel_impl_v2(size_t cache_size);

bool try_read(optional_type& val);
bool try_read_nolock(optional_type& val);
@@ -142,9 +133,9 @@ namespace detail
//-----------------------------------------------------------------------------------------------------------------------------------------

template<class _Ty, class _Opty>
channel_impl_v2<_Ty, _Opty>::channel_impl_v2(size_t max_counter_)
: _max_counter(max_counter_)
, _values(USE_RING_QUEUE ? max_counter_ : 0)
channel_impl_v2<_Ty, _Opty>::channel_impl_v2(size_t cache_size)
: _max_counter(cache_size)
, _values(USE_RING_QUEUE ? cache_size : 0)
{
}

@@ -271,22 +262,16 @@ namespace detail
state_read_t* state = try_pop_reader_();
if (state != nullptr)
{
assert(!_values.empty());

if constexpr (USE_RING_QUEUE)
{
if (!_values.try_pop(*state->_value))
state->_error = error_code::read_before_write;
_values.try_pop(*state->_value);
}
else
{
if (_values.size() > 0)
{
*state->_value = std::move(_values.front());
_values.pop_front();
}
else
{
state->_error = error_code::read_before_write;
}
*state->_value = std::move(_values.front());
_values.pop_front();
}

state->on_notify();
@@ -364,7 +349,9 @@ inline namespace channel_v2
~read_awaiter()
{//为了不在协程中也能正常使用
if (_channel != nullptr)
_channel->try_read(_value);
{
while(!_channel->try_read(_value));
}
}

bool await_ready()
@@ -402,9 +389,6 @@ inline namespace channel_v2
}
value_type await_resume()
{
if (_state.get() != nullptr)
_state->on_await_resume();

if constexpr (use_optional)
return std::move(_value).value();
else
@@ -430,7 +414,9 @@ inline namespace channel_v2
~write_awaiter()
{//为了不在协程中也能正常使用
if (_channel != nullptr)
_channel->try_write(_value);
{
while(!_channel->try_write(_value));
}
}

bool await_ready()
@@ -468,8 +454,6 @@ inline namespace channel_v2
}
void await_resume()
{
if (_state.get() != nullptr)
_state->on_await_resume();
}
private:
channel_type* _channel;
@@ -478,8 +462,8 @@ inline namespace channel_v2
};

template<class _Ty, bool _Optional, bool _OptimizationThread>
channel_t<_Ty, _Optional, _OptimizationThread>::channel_t(size_t max_counter)
:_chan(std::make_shared<channel_type>(max_counter))
channel_t<_Ty, _Optional, _OptimizationThread>::channel_t(size_t cache_size)
:_chan(std::make_shared<channel_type>(cache_size))
{

}
@@ -505,7 +489,7 @@ inline namespace channel_v2
}

template<class _Ty, bool _Optional, bool _OptimizationThread>
template<class U>
template<class U COMMA_RESUMEF_ENABLE_IF_TYPENAME()> RESUMEF_REQUIRES(std::is_constructible_v<_Ty, U&&>)
typename channel_t<_Ty, _Optional, _OptimizationThread>::write_awaiter
channel_t<_Ty, _Optional, _OptimizationThread>::write(U&& val) const noexcept(std::is_move_constructible_v<U>)
{
@@ -513,7 +497,7 @@ inline namespace channel_v2
}

template<class _Ty, bool _Optional, bool _OptimizationThread>
template<class U>
template<class U COMMA_RESUMEF_ENABLE_IF_TYPENAME()> RESUMEF_REQUIRES(std::is_constructible_v<_Ty, U&&>)
typename channel_t<_Ty, _Optional, _OptimizationThread>::write_awaiter
channel_t<_Ty, _Optional, _OptimizationThread>::operator << (U&& val) const noexcept(std::is_move_constructible_v<U>)
{

+ 2
- 2
librf/src/config.h Bestand weergeven

@@ -18,9 +18,9 @@
#ifndef RESUMEF_ENABLE_CONCEPT
#ifdef __cpp_lib_concepts
#define RESUMEF_ENABLE_CONCEPT 1
/* #undef RESUMEF_ENABLE_CONCEPT */
#else
#define RESUMEF_ENABLE_CONCEPT 1
/* #undef RESUMEF_ENABLE_CONCEPT */
#endif //#ifdef __cpp_lib_concepts
#endif //#ifndef RESUMEF_ENABLE_CONCEPT

+ 1
- 1
librf/src/def.h Bestand weergeven

@@ -1,6 +1,6 @@
#pragma once
#define LIB_RESUMEF_VERSION 20903 // 2.9.3
#define LIB_RESUMEF_VERSION 20905 // 2.9.5
#if defined(RESUMEF_MODULE_EXPORT)
#define RESUMEF_NS export namespace resumef

+ 4
- 4
librf/src/mutex_v2.h Bestand weergeven

@@ -29,7 +29,7 @@ RESUMEF_NS

/**
* @brief 支持递归的锁。
* 锁被本协程所在的跟协程所拥有。支持在跟协程下的所有协程里递归加锁。
* @details 锁被本协程所在的跟协程所拥有。支持在跟协程下的所有协程里递归加锁。
*/
struct mutex_t
{
@@ -53,7 +53,7 @@ RESUMEF_NS

/**
* @brief 在协程中加锁,如果不能立即获得锁,则阻塞当前协程。但不会阻塞当前线程。
* 需要随后调用unlock()函数解锁。lock()/unlock()调用必须在同一个跟协程下配对调用。
* @details 需要随后调用unlock()函数解锁。lock()/unlock()调用必须在同一个跟协程下配对调用。
* @param manual_unlock_tag 提示手工解锁
* @return [co_await] void
*/
@@ -64,7 +64,7 @@ RESUMEF_NS

/**
* @brief 尝试在协程中加锁。此操作无论成功与否都会立即返回,不会有协程切换。
* 如果加锁成功,则需要调用co_await unlock()解锁。或者使用unlock(root_state())解锁。
* @details 如果加锁成功,则需要调用co_await unlock()解锁。或者使用unlock(root_state())解锁。\n
* 如果加锁失败,且要循环尝试加锁,则最好调用co_await yield()让出一次调度。否则,可能造成本调度器死循环。
* @return [co_await] bool
*/
@@ -100,7 +100,7 @@ RESUMEF_NS

/**
* @brief 在非协程中加锁。如果不能立即获得锁,则反复尝试,直到获得锁。故会阻塞当前协程
* @param unique_address 代表获得锁的拥有者。此地址应当与随后的unlock()的地址一致。
* @param unique_address 代表获得锁的拥有者。此地址应当与随后的unlock()的地址一致。\n
* 一般做法,是申明一个跟当前线程关联的局部变量,以此局部变量的地址为参数。
*/
void lock(void* unique_address) const;

+ 3
- 2
test_librf.cpp Bestand weergeven

@@ -38,8 +38,9 @@ int main(int argc, const char* argv[])
//test_ring_queue<resumef::ring_queue_spinlock<int, false, uint32_t>>();
//test_ring_queue<resumef::ring_queue_lockfree<int, uint64_t>>();

//resumable_main_mutex();
//return 0;
resumable_main_channel();
resumable_main_channel_mult_thread();
return 0;

//if (argc > 1)
// resumable_main_benchmark_asio_client(atoi(argv[1]));

+ 2
- 2
tutorial/test_async_channel.cpp Bestand weergeven

@@ -19,8 +19,8 @@ struct move_only_type
_Ty value;
move_only_type() = default;
move_only_type(const _Ty& val) : value(val) {}
move_only_type(_Ty&& val) : value(std::forward<_Ty>(val)) {}
explicit move_only_type(const _Ty& val) : value(val) {}
explicit move_only_type(_Ty&& val) : value(std::forward<_Ty>(val)) {}
move_only_type(const move_only_type&) = delete;
move_only_type& operator =(const move_only_type&) = delete;

Laden…
Annuleren
Opslaan