Browse Source

channel并没有很好的履行optional选项。改正之。

tags/v2.9.7
tearshark 4 years ago
parent
commit
c8ce2be4d7

+ 7
- 7
benchmark/benchmark_channel_passing_next.cpp View File



void benchmark_main_channel_passing_next() void benchmark_main_channel_passing_next()
{ {
int_channel_ptr head = std::make_shared<channel_t<intptr_t>>(1);
int_channel_ptr in = head;
int_channel_ptr tail = nullptr;
channel_t<intptr_t> head{1};
channel_t<intptr_t> in = head;
channel_t<intptr_t> tail{0};


for (int i = 0; i < MaxNum; ++i) for (int i = 0; i < MaxNum; ++i)
{ {
tail = std::make_shared<channel_t<intptr_t>>(1);
go passing_next(*in, *tail);
tail = channel_t<intptr_t>{ 1 };
go passing_next(in, tail);
in = tail; in = tail;
} }


{ {
auto tstart = high_resolution_clock::now(); auto tstart = high_resolution_clock::now();


co_await (*head << 0);
intptr_t value = co_await *tail;
co_await (head << 0);
intptr_t value = co_await tail;


auto dt = duration_cast<duration<double>>(high_resolution_clock::now() - tstart).count(); auto dt = duration_cast<duration<double>>(high_resolution_clock::now() - tstart).count();
std::cout << value << " cost time " << dt << "s" << std::endl; std::cout << value << " cost time " << dt << "s" << std::endl;

+ 2
- 2
librf/src/channel_v2.h View File

#ifndef DOXYGEN_SKIP_PROPERTY #ifndef DOXYGEN_SKIP_PROPERTY
namespace detail namespace detail
{ {
template<class _Ty, class _Opty>
template<class _Ty, bool _Optional>
struct channel_impl_v2; struct channel_impl_v2;
} //namespace detail } //namespace detail


static constexpr bool optimization_for_multithreading = _OptimizationThread; static constexpr bool optimization_for_multithreading = _OptimizationThread;


using optional_type = std::conditional_t<use_optional, std::optional<value_type>, value_type>; using optional_type = std::conditional_t<use_optional, std::optional<value_type>, value_type>;
using channel_type = detail::channel_impl_v2<value_type, optional_type>;
using channel_type = detail::channel_impl_v2<value_type, use_optional>;
using lock_type = typename channel_type::lock_type; using lock_type = typename channel_type::lock_type;


channel_t(const channel_t&) = default; channel_t(const channel_t&) = default;

+ 31
- 32
librf/src/channel_v2.inl View File



//----------------------------------------------------------------------------------------------------------------------------------------- //-----------------------------------------------------------------------------------------------------------------------------------------


template<class _Ty, class _Opty>
struct channel_impl_v2 : public std::enable_shared_from_this<channel_impl_v2<_Ty, _Opty>>
template<class _Ty, bool _Optional>
struct channel_impl_v2 : public std::enable_shared_from_this<channel_impl_v2<_Ty, _Optional>>
{ {
using value_type = _Ty; using value_type = _Ty;
using optional_type = _Opty;
using this_type = channel_impl_v2<value_type, optional_type>;
using optional_type = std::conditional_t<_Optional, std::optional<value_type>, value_type>;
using this_type = channel_impl_v2<value_type, _Optional>;


using state_read_t = state_channel_t<optional_type, this_type>; using state_read_t = state_channel_t<optional_type, this_type>;
using state_write_t = state_channel_t<value_type, this_type>; using state_write_t = state_channel_t<value_type, this_type>;
static constexpr bool USE_RING_QUEUE = true; static constexpr bool USE_RING_QUEUE = true;
static constexpr bool USE_LINK_QUEUE = true; static constexpr bool USE_LINK_QUEUE = true;


//using queue_type = std::conditional_t<USE_RING_QUEUE, ring_queue_spinlock<value_type, false, uint32_t>, std::deque<value_type>>;
using queue_type = std::conditional_t<USE_RING_QUEUE, ring_queue<value_type, false, uint32_t>, std::deque<value_type>>;
using queue_type = std::conditional_t<USE_RING_QUEUE, ring_queue<value_type, _Optional, uint32_t>, std::deque<value_type>>;
using read_queue_type = std::conditional_t<USE_LINK_QUEUE, intrusive_link_queue<state_read_t>, std::list<state_read_t*>>; using read_queue_type = std::conditional_t<USE_LINK_QUEUE, intrusive_link_queue<state_read_t>, std::list<state_read_t*>>;
using write_queue_type = std::conditional_t<USE_LINK_QUEUE, intrusive_link_queue<state_write_t>, std::list<state_write_t*>>; using write_queue_type = std::conditional_t<USE_LINK_QUEUE, intrusive_link_queue<state_write_t>, std::list<state_write_t*>>;




//----------------------------------------------------------------------------------------------------------------------------------------- //-----------------------------------------------------------------------------------------------------------------------------------------


template<class _Ty, class _Opty>
channel_impl_v2<_Ty, _Opty>::channel_impl_v2(size_t cache_size)
template<class _Ty, bool _Optional>
channel_impl_v2<_Ty, _Optional>::channel_impl_v2(size_t cache_size)
: _max_counter(cache_size) : _max_counter(cache_size)
, _values(USE_RING_QUEUE ? cache_size : 0) , _values(USE_RING_QUEUE ? cache_size : 0)
{ {
} }


template<class _Ty, class _Opty>
inline bool channel_impl_v2<_Ty, _Opty>::try_read(optional_type& val)
template<class _Ty, bool _Optional>
inline bool channel_impl_v2<_Ty, _Optional>::try_read(optional_type& val)
{ {
scoped_lock<lock_type> lock_(this->_lock); scoped_lock<lock_type> lock_(this->_lock);
return try_read_nolock(val); return try_read_nolock(val);
} }


template<class _Ty, class _Opty>
bool channel_impl_v2<_Ty, _Opty>::try_read_nolock(optional_type& val)
template<class _Ty, bool _Optional>
bool channel_impl_v2<_Ty, _Optional>::try_read_nolock(optional_type& val)
{ {
if constexpr (USE_RING_QUEUE) if constexpr (USE_RING_QUEUE)
{ {
return awake_one_writer_(val); return awake_one_writer_(val);
} }


template<class _Ty, class _Opty>
inline void channel_impl_v2<_Ty, _Opty>::add_read_list_nolock(state_read_t* state)
template<class _Ty, bool _Optional>
inline void channel_impl_v2<_Ty, _Optional>::add_read_list_nolock(state_read_t* state)
{ {
assert(state != nullptr); assert(state != nullptr);
_read_awakes.push_back(state); _read_awakes.push_back(state);
} }


template<class _Ty, class _Opty>
inline bool channel_impl_v2<_Ty, _Opty>::try_write(value_type& val)
template<class _Ty, bool _Optional>
inline bool channel_impl_v2<_Ty, _Optional>::try_write(value_type& val)
{ {
scoped_lock<lock_type> lock_(this->_lock); scoped_lock<lock_type> lock_(this->_lock);
return try_write_nolock(val); return try_write_nolock(val);
} }


template<class _Ty, class _Opty>
bool channel_impl_v2<_Ty, _Opty>::try_write_nolock(value_type& val)
template<class _Ty, bool _Optional>
bool channel_impl_v2<_Ty, _Optional>::try_write_nolock(value_type& val)
{ {
if constexpr (USE_RING_QUEUE) if constexpr (USE_RING_QUEUE)
{ {
return awake_one_reader_(val); return awake_one_reader_(val);
} }


template<class _Ty, class _Opty>
inline void channel_impl_v2<_Ty, _Opty>::add_write_list_nolock(state_write_t* state)
template<class _Ty, bool _Optional>
inline void channel_impl_v2<_Ty, _Optional>::add_write_list_nolock(state_write_t* state)
{ {
assert(state != nullptr); assert(state != nullptr);
_write_awakes.push_back(state); _write_awakes.push_back(state);
} }


template<class _Ty, class _Opty>
auto channel_impl_v2<_Ty, _Opty>::try_pop_reader_()->state_read_t*
template<class _Ty, bool _Optional>
auto channel_impl_v2<_Ty, _Optional>::try_pop_reader_()->state_read_t*
{ {
if constexpr (USE_LINK_QUEUE) if constexpr (USE_LINK_QUEUE)
{ {
} }
} }


template<class _Ty, class _Opty>
auto channel_impl_v2<_Ty, _Opty>::try_pop_writer_()->state_write_t*
template<class _Ty, bool _Optional>
auto channel_impl_v2<_Ty, _Optional>::try_pop_writer_()->state_write_t*
{ {
if constexpr (USE_LINK_QUEUE) if constexpr (USE_LINK_QUEUE)
{ {
} }
} }


template<class _Ty, class _Opty>
void channel_impl_v2<_Ty, _Opty>::awake_one_reader_()
template<class _Ty, bool _Optional>
void channel_impl_v2<_Ty, _Optional>::awake_one_reader_()
{ {
state_read_t* state = try_pop_reader_(); state_read_t* state = try_pop_reader_();
if (state != nullptr) if (state != nullptr)
} }
} }


template<class _Ty, class _Opty>
bool channel_impl_v2<_Ty, _Opty>::awake_one_reader_(value_type& val)
template<class _Ty, bool _Optional>
bool channel_impl_v2<_Ty, _Optional>::awake_one_reader_(value_type& val)
{ {
state_read_t* state = try_pop_reader_(); state_read_t* state = try_pop_reader_();
if (state != nullptr) if (state != nullptr)
return false; return false;
} }


template<class _Ty, class _Opty>
void channel_impl_v2<_Ty, _Opty>::awake_one_writer_()
template<class _Ty, bool _Optional>
void channel_impl_v2<_Ty, _Optional>::awake_one_writer_()
{ {
state_write_t* state = try_pop_writer_(); state_write_t* state = try_pop_writer_();
if (state != nullptr) if (state != nullptr)
} }
} }


template<class _Ty, class _Opty>
bool channel_impl_v2<_Ty, _Opty>::awake_one_writer_(optional_type& val)
template<class _Ty, bool _Optional>
bool channel_impl_v2<_Ty, _Optional>::awake_one_writer_(optional_type& val)
{ {
state_write_t* writer = try_pop_writer_(); state_write_t* writer = try_pop_writer_();
if (writer != nullptr) if (writer != nullptr)

+ 1
- 1
librf/src/scheduler.cpp View File

auto iter = this->_ready_task.find(sptr); auto iter = this->_ready_task.find(sptr);
if (iter != this->_ready_task.end()) if (iter != this->_ready_task.end())
{ {
task_ptr = std::move(iter->second);
task_ptr = std::exchange(iter->second, nullptr);
this->_ready_task.erase(iter); this->_ready_task.erase(iter);
} }

+ 1
- 1
test_librf.cpp View File

//test_ring_queue<resumef::ring_queue_spinlock<int, false, uint32_t>>(); //test_ring_queue<resumef::ring_queue_spinlock<int, false, uint32_t>>();
//test_ring_queue<resumef::ring_queue_lockfree<int, uint64_t>>(); //test_ring_queue<resumef::ring_queue_lockfree<int, uint64_t>>();


//resumable_main_event();
//resumable_main_channel();
//return 0; //return 0;


//if (argc > 1) //if (argc > 1)

+ 1
- 1
tutorial/test_async_channel.cpp View File

}; };
//如果channel缓存的元素不能凭空产生,或者产生代价较大,则推荐第二个模板参数使用true。从而减小不必要的开销。 //如果channel缓存的元素不能凭空产生,或者产生代价较大,则推荐第二个模板参数使用true。从而减小不必要的开销。
using string_channel_t = channel_t<move_only_type<std::string>, false, true>;
using string_channel_t = channel_t<move_only_type<std::string>>;
//channel其实内部引用了一个channel实现体,故可以支持复制拷贝操作 //channel其实内部引用了一个channel实现体,故可以支持复制拷贝操作
future_t<> test_channel_read(string_channel_t c) future_t<> test_channel_read(string_channel_t c)

Loading…
Cancel
Save