Browse Source

修改channel的实现细节,便于阅读理解

tags/v2.9.7
tearshark 4 years ago
parent
commit
c93e99bea7
1 changed files with 78 additions and 76 deletions
  1. 78
    76
      librf/src/channel_v2.inl

+ 78
- 76
librf/src/channel_v2.inl View File

@@ -4,97 +4,91 @@ RESUMEF_NS
{
namespace detail
{
template<class _Ty, class _Opty>
struct channel_impl_v2 : public std::enable_shared_from_this<channel_impl_v2<_Ty, _Opty>>
template<class _Ty, class _Chty>
struct state_channel_t : public state_base_t
{
using value_type = _Ty;
using optional_type = _Opty;

struct state_channel_t : public state_base_t
state_channel_t(_Chty* ch, value_type& val) noexcept
: _channel(ch->shared_from_this())
, _value(&val)
{
state_channel_t(channel_impl_v2* ch) noexcept
: _channel(ch->shared_from_this())
{
}
}

virtual void resume() override
virtual void resume() override
{
coroutine_handle<> handler = _coro;
if (handler)
{
coroutine_handle<> handler = _coro;
if (handler)
{
_coro = nullptr;
_scheduler->del_final(this);
handler.resume();
}
_coro = nullptr;
_scheduler->del_final(this);
handler.resume();
}
}

virtual bool has_handler() const noexcept override
{
return (bool)_coro;
}
virtual bool has_handler() const noexcept override
{
return (bool)_coro;
}

void on_notify()
{
assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);
}
void on_notify()
{
assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);
}

void on_cancel() noexcept
{
this->_coro = nullptr;
}
void on_cancel() noexcept
{
this->_coro = nullptr;
}

template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void on_await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
_PromiseT& promise = handler.promise();
auto* parent_state = promise.get_state();
scheduler_t* sch = parent_state->get_scheduler();
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void on_await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
_PromiseT& promise = handler.promise();
auto* parent_state = promise.get_state();
scheduler_t* sch = parent_state->get_scheduler();

this->_scheduler = sch;
this->_coro = handler;
}
this->_scheduler = sch;
this->_coro = handler;
}

void on_await_resume()
void on_await_resume()
{
if (_error != error_code::none)
{
if (_error != error_code::none)
{
std::rethrow_exception(std::make_exception_ptr(channel_exception{ _error }));
}
std::rethrow_exception(std::make_exception_ptr(channel_exception{ _error }));
}
}

friend channel_impl_v2;
public:
state_channel_t* _next = nullptr;
protected:
std::shared_ptr<channel_impl_v2> _channel;
error_code _error = error_code::none;
};
friend _Chty;
public:
//为浸入式单向链表提供的next指针
state_channel_t* _next = nullptr;
protected:
//co_await产生的临时awaitor会引用state,管理state的生命周期
//state再引用channel
//从而在co_await channel.xxx()这条语句内,awaitor/state/channel均处于正确的生命周期
std::shared_ptr<_Chty> _channel;
protected:
value_type* _value;
error_code _error = error_code::none;
};

struct state_read_t : public state_channel_t
{
state_read_t(channel_impl_v2* ch, optional_type& val) noexcept
: state_channel_t(ch)
, _value(&val)
{
}
protected:
friend channel_impl_v2;
optional_type* _value;
};

struct state_write_t : public state_channel_t
{
state_write_t(channel_impl_v2* ch, value_type& val) noexcept
: state_channel_t(ch)
, _value(&val)
{
}
protected:
friend channel_impl_v2;
value_type* _value;
};

//-----------------------------------------------------------------------------------------------------------------------------------------

template<class _Ty, class _Opty>
struct channel_impl_v2 : public std::enable_shared_from_this<channel_impl_v2<_Ty, _Opty>>
{
using value_type = _Ty;
using optional_type = _Opty;
using this_type = channel_impl_v2<value_type, optional_type>;

using state_read_t = state_channel_t<optional_type, this_type>;
using state_write_t = state_channel_t<value_type, this_type>;

channel_impl_v2(size_t max_counter_);

@@ -130,11 +124,10 @@ namespace detail

//using queue_type = std::conditional_t<USE_RING_QUEUE, ring_queue_spinlock<value_type, false, uint32_t>, std::deque<value_type>>;
using queue_type = std::conditional_t<USE_RING_QUEUE, ring_queue<value_type, false, uint32_t>, std::deque<value_type>>;
using read_queue_type = std::conditional_t<USE_LINK_QUEUE, intrusive_link_queue<state_channel_t>, std::list<state_read_t*>>;
using write_queue_type = std::conditional_t<USE_LINK_QUEUE, intrusive_link_queue<state_channel_t>, std::list<state_write_t*>>;
using read_queue_type = std::conditional_t<USE_LINK_QUEUE, intrusive_link_queue<state_read_t>, std::list<state_read_t*>>;
using write_queue_type = std::conditional_t<USE_LINK_QUEUE, intrusive_link_queue<state_write_t>, std::list<state_write_t*>>;

const size_t _max_counter; //数据队列的容量上限

public:
using lock_type = std::conditional_t<USE_SPINLOCK, spinlock, std::deque<std::recursive_mutex>>;
lock_type _lock; //保证访问本对象是线程安全的
@@ -144,6 +137,10 @@ namespace detail
write_queue_type _write_awakes; //写队列
};



//-----------------------------------------------------------------------------------------------------------------------------------------

template<class _Ty, class _Opty>
channel_impl_v2<_Ty, _Opty>::channel_impl_v2(size_t max_counter_)
: _max_counter(max_counter_)
@@ -347,6 +344,11 @@ namespace detail
}
} //namespace detail




//-----------------------------------------------------------------------------------------------------------------------------------------

inline namespace channel_v2
{
template<class _Ty, bool _Option>

Loading…
Cancel
Save