Procházet zdrojové kódy

重写channel的实现方法

tags/v2.9.7
tearshark před 4 roky
rodič
revize
d854bd00f6

+ 4
- 2
README.md Zobrazit soubor

@@ -1,9 +1,11 @@
# librf 2.0
# librf 2.4

### librf - 协程库

2020-03-08 更新:
更新channel实现,效率提高了近三倍。
channel的新的实现方法,为event/mutex指明了新的修改方向。
2020-02-16 更新:

更新调度器算法,深入应用Coroutines的特性,以期获得更高调度性能。
不再支持C++14。


+ 146
- 145
librf/src/channel_v2.h Zobrazit soubor

@@ -1,4 +1,5 @@
#pragma once
#include "intrusive_link_queue.h"

RESUMEF_NS
{
@@ -8,16 +9,20 @@ namespace detail
struct channel_impl_v2 : public std::enable_shared_from_this<channel_impl_v2<_Ty>>
{
using value_type = _Ty;
struct state_read_t;
struct state_write_t;

struct state_channel_t;
using state_read_t = state_channel_t;
using state_write_t = state_channel_t;

channel_impl_v2(size_t max_counter_);

bool try_read(value_type& val);
bool add_read_list(state_read_t* state);
bool try_read_nolock(value_type& val);
void add_read_list_nolock(state_read_t* state);

bool try_write(value_type& val);
bool add_write_list(state_write_t* state);
bool try_write_nolock(value_type& val);
void add_write_list_nolock(state_write_t* state);

size_t capacity() const noexcept
{
@@ -27,7 +32,7 @@ namespace detail
struct state_channel_t : public state_base_t
{
state_channel_t(channel_impl_v2* ch, value_type& val) noexcept
: m_channel(ch->shared_from_this())
: _channel(ch->shared_from_this())
, _value(&val)
{
}
@@ -78,53 +83,24 @@ namespace detail
std::rethrow_exception(std::make_exception_ptr(channel_exception{ _error }));
}
}

public:
state_channel_t* _next = nullptr;
protected:
friend channel_impl_v2;

std::shared_ptr<channel_impl_v2> m_channel;
state_channel_t* m_next = nullptr;
std::shared_ptr<channel_impl_v2> _channel;
value_type* _value;
error_code _error = error_code::none;
};

struct state_read_t : public state_channel_t
{
using state_channel_t::state_channel_t;

//将自己加入到通知链表里
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
bool on_await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
state_channel_t::on_await_suspend(handler);
if (!this->m_channel->add_read_list(this))
{
this->_coro = nullptr;
return false;
}
return true;
}
};

struct state_write_t : public state_channel_t
{
using state_channel_t::state_channel_t;

//将自己加入到通知链表里
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
bool on_await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
state_channel_t::on_await_suspend(handler);
if (!this->m_channel->add_write_list(this))
{
this->_coro = nullptr;
return false;
}
return true;
}
};
private:
auto try_pop_reader_()->state_read_t*;
auto try_pop_writer_()->state_write_t*;
void awake_one_reader_();
bool awake_one_reader_(value_type& val);

void awake_one_writer_();
bool awake_one_writer_(value_type & val);

channel_impl_v2(const channel_impl_v2&) = delete;
channel_impl_v2(channel_impl_v2&&) = delete;
@@ -133,44 +109,51 @@ namespace detail

static constexpr bool USE_SPINLOCK = true;
static constexpr bool USE_RING_QUEUE = true;
static constexpr bool USE_LINK_QUEUE = true;

using lock_type = std::conditional_t<USE_SPINLOCK, spinlock, std::deque<std::recursive_mutex>>;
//using queue_type = std::conditional_t<USE_RING_QUEUE, ring_queue_spinlock<value_type, false, uint32_t>, std::deque<value_type>>;
using queue_type = std::conditional_t<USE_RING_QUEUE, ring_queue<value_type, false, uint32_t>, std::deque<value_type>>;
using read_queue_type = std::conditional_t<USE_LINK_QUEUE, intrusive_link_queue<state_channel_t>, std::list<state_read_t*>>;
using write_queue_type = std::conditional_t<USE_LINK_QUEUE, intrusive_link_queue<state_channel_t>, std::list<state_write_t*>>;

const size_t _max_counter; //数据队列的容量上限

public:
using lock_type = std::conditional_t<USE_SPINLOCK, spinlock, std::deque<std::recursive_mutex>>;
lock_type _lock; //保证访问本对象是线程安全的
private:
queue_type _values; //数据队列
std::list<state_read_t*> _read_awakes; //读队列
std::list<state_write_t*> _write_awakes; //写队列
read_queue_type _read_awakes; //读队列
write_queue_type _write_awakes; //写队列
};

template<class _Ty>
channel_impl_v2<_Ty>::channel_impl_v2(size_t max_counter_)
: _max_counter(max_counter_)
, _values(USE_RING_QUEUE ? (std::max)((size_t)1, max_counter_) : 0)
, _values(USE_RING_QUEUE ? max_counter_ : 0)
{
}

template<class _Ty>
inline bool channel_impl_v2<_Ty>::try_read(value_type& val)
{
scoped_lock<lock_type> lock_(this->_lock);
return try_read_nolock(val);
}

template<class _Ty>
bool channel_impl_v2<_Ty>::try_read(value_type& val)
bool channel_impl_v2<_Ty>::try_read_nolock(value_type& val)
{
if constexpr (USE_RING_QUEUE)
{
scoped_lock<lock_type> lock_(this->_lock);

if (_values.try_pop(val))
{
awake_one_writer_();
return true;
}
return false;
}
else
{
scoped_lock<lock_type> lock_(this->_lock);

if (_values.size() > 0)
{
val = std::move(_values.front());
@@ -179,68 +162,38 @@ namespace detail

return true;
}
return false;
}

return awake_one_writer_(val);
}

template<class _Ty>
bool channel_impl_v2<_Ty>::add_read_list(state_read_t* state)
inline void channel_impl_v2<_Ty>::add_read_list_nolock(state_read_t* state)
{
assert(state != nullptr);
_read_awakes.push_back(state);
}

if constexpr (USE_RING_QUEUE)
{
scoped_lock<lock_type> lock_(this->_lock);

if (_values.try_pop(*state->_value))
{
awake_one_writer_();
return false;
}

_read_awakes.push_back(state);
awake_one_writer_();

return true;
}
else
{
scoped_lock<lock_type> lock_(this->_lock);

if (_values.size() > 0)
{
*state->_value = std::move(_values.front());
_values.pop_front();
awake_one_writer_();

return false;
}

_read_awakes.push_back(state);
awake_one_writer_();

return true;
}
template<class _Ty>
inline bool channel_impl_v2<_Ty>::try_write(value_type& val)
{
scoped_lock<lock_type> lock_(this->_lock);
return try_write_nolock(val);
}

template<class _Ty>
bool channel_impl_v2<_Ty>::try_write(value_type& val)
bool channel_impl_v2<_Ty>::try_write_nolock(value_type& val)
{
if constexpr (USE_RING_QUEUE)
{
scoped_lock<lock_type> lock_(this->_lock);

if (_values.try_push(std::move(val)))
{
awake_one_reader_();
return true;
}
return false;
}
else
{
scoped_lock<lock_type> lock_(this->_lock);

if (_values.size() < _max_counter)
{
_values.push_back(std::move(val));
@@ -248,57 +201,62 @@ namespace detail

return true;
}
return false;
}

return awake_one_reader_(val);
}

template<class _Ty>
bool channel_impl_v2<_Ty>::add_write_list(state_write_t* state)
inline void channel_impl_v2<_Ty>::add_write_list_nolock(state_write_t* state)
{
assert(state != nullptr);
_write_awakes.push_back(state);
}

if constexpr (USE_RING_QUEUE)
template<class _Ty>
auto channel_impl_v2<_Ty>::try_pop_reader_()->state_read_t*
{
if constexpr (USE_LINK_QUEUE)
{
scoped_lock<lock_type> lock_(this->_lock);

if (_values.try_push(std::move(*state->_value)))
return reinterpret_cast<state_read_t*>(_read_awakes.try_pop());
}
else
{
if (!_read_awakes.empty())
{
awake_one_reader_();
return false;
state_write_t* state = _read_awakes.front();
_read_awakes.pop_front();
return state;
}
return nullptr;
}
}

_write_awakes.push_back(state);
awake_one_reader_();

return true;
template<class _Ty>
auto channel_impl_v2<_Ty>::try_pop_writer_()->state_write_t*
{
if constexpr (USE_LINK_QUEUE)
{
return reinterpret_cast<state_write_t*>(_write_awakes.try_pop());
}
else
{
scoped_lock<lock_type> lock_(this->_lock);

if (_values.size() < _max_counter)
if (!_write_awakes.empty())
{
_values.push_back(std::move(*state->_value));
awake_one_reader_();

return false;
state_write_t* state = _write_awakes.front();
_write_awakes.pop_front();
return state;
}

_write_awakes.push_back(state);
awake_one_reader_();

return true;
return nullptr;
}
}

template<class _Ty>
void channel_impl_v2<_Ty>::awake_one_reader_()
{
for (auto iter = _read_awakes.begin(); iter != _read_awakes.end(); )
state_read_t* state = try_pop_reader_();
if (state != nullptr)
{
state_read_t* state = *iter;
iter = _read_awakes.erase(iter);

if constexpr (USE_RING_QUEUE)
{
if (!_values.try_pop(*state->_value))
@@ -316,20 +274,31 @@ namespace detail
state->_error = error_code::read_before_write;
}
}

state->on_notify();
}
}

template<class _Ty>
bool channel_impl_v2<_Ty>::awake_one_reader_(value_type& val)
{
state_read_t* state = try_pop_reader_();
if (state != nullptr)
{
*state->_value = std::move(val);

break;
state->on_notify();
return true;
}
return false;
}

template<class _Ty>
void channel_impl_v2<_Ty>::awake_one_writer_()
{
for (auto iter = _write_awakes.begin(); iter != _write_awakes.end(); )
state_write_t* state = try_pop_writer_();
if (state != nullptr)
{
state_write_t* state = std::move(*iter);
iter = _write_awakes.erase(iter);

if constexpr (USE_RING_QUEUE)
{
bool ret = _values.try_push(std::move(*state->_value));
@@ -341,20 +310,35 @@ namespace detail
assert(_values.size() < _max_counter);
_values.push_back(*state->_value);
}

state->on_notify();
}
}

break;
template<class _Ty>
bool channel_impl_v2<_Ty>::awake_one_writer_(value_type& val)
{
state_write_t* writer = try_pop_writer_();
if (writer != nullptr)
{
val = std::move(*writer->_value);

writer->on_notify();
return true;
}
return false;
}

} //namespace detail

inline namespace channel_v2
{
template<class _Ty>
template<class _Ty = bool>
struct channel_t
{
using value_type = _Ty;
using channel_type = detail::channel_impl_v2<value_type>;
using lock_type = typename channel_type::lock_type;

channel_t(size_t max_counter = 0)
:_chan(std::make_shared<channel_type>(max_counter))
@@ -367,7 +351,7 @@ inline namespace channel_v2
return _chan->capacity();
}

struct read_awaiter
struct [[nodiscard]] read_awaiter
{
using state_type = typename channel_type::state_read_t;

@@ -381,21 +365,27 @@ inline namespace channel_v2
_channel->try_read(_value);
}

bool await_ready()
bool await_ready() const noexcept
{
if (_channel->try_read(static_cast<value_type&>(_value)))
{
_channel = nullptr;
return true;
}
return false;
}
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
scoped_lock<lock_type> lock_(_channel->_lock);

if (_channel->try_read_nolock(_value))
{
_channel = nullptr;
return false;
}

_state = new state_type(_channel, static_cast<value_type&>(_value));
_state->on_await_suspend(handler);
_channel->add_read_list_nolock(_state.get());
_channel = nullptr;
return _state->on_await_suspend(handler);

return true;
}
value_type await_resume()
{
@@ -418,7 +408,7 @@ inline namespace channel_v2
return { _chan.get() };
}

struct write_awaiter
struct [[nodiscard]] write_awaiter
{
using state_type = typename channel_type::state_write_t;

@@ -433,21 +423,27 @@ inline namespace channel_v2
_channel->try_write(static_cast<value_type&>(_value));
}

bool await_ready()
bool await_ready() const noexcept
{
if (_channel->try_write(static_cast<value_type&>(_value)))
{
_channel = nullptr;
return true;
}
return false;
}
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
scoped_lock<lock_type> lock_(_channel->_lock);

if (_channel->try_write_nolock(static_cast<value_type&>(_value)))
{
_channel = nullptr;
return false;
}

_state = new state_type(_channel, static_cast<value_type&>(_value));
_state->on_await_suspend(handler);
_channel->add_write_list_nolock(_state.get());
_channel = nullptr;
return _state->on_await_suspend(handler);

return true;
}
void await_resume()
{
@@ -480,6 +476,11 @@ inline namespace channel_v2
std::shared_ptr<channel_type> _chan;
};

//不支持channel_t<void>
template<>
struct channel_t<void>
{
};

using semaphore_t = channel_t<bool>;


+ 2
- 2
librf/src/def.h Zobrazit soubor

@@ -1,6 +1,6 @@
#pragma once
#define LIB_RESUMEF_VERSION 20304 // 2.3.4
#define LIB_RESUMEF_VERSION 20400 // 2.4.0
#if defined(RESUMEF_MODULE_EXPORT)
#define RESUMEF_NS export namespace resumef
@@ -20,7 +20,7 @@ RESUMEF_NS
struct scheduler_t;
template<class _Ty = void>
struct future_t;
struct [[nodiscard]] future_t;
using future_vt [[deprecated]] = future_t<>;

+ 103
- 0
librf/src/intrusive_link_queue.h Zobrazit soubor

@@ -0,0 +1,103 @@
#pragma once

RESUMEF_NS
{
template<class _Node, class _Sty = uint32_t>
struct intrusive_link_queue
{
using node_type = _Node;
using size_type = _Sty;
public:
intrusive_link_queue();

intrusive_link_queue(const intrusive_link_queue&) = delete;
intrusive_link_queue(intrusive_link_queue&&) = default;
intrusive_link_queue& operator =(const intrusive_link_queue&) = delete;
intrusive_link_queue& operator =(intrusive_link_queue&&) = default;

auto size() const noexcept->size_type;
bool empty() const noexcept;
void push_back(node_type* node) noexcept;
auto try_pop() noexcept->node_type*;
private:
node_type* _head;
node_type* _tail;

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
std::atomic<size_type> m_count;
#endif
};

template<class _Node, class _Sty>
intrusive_link_queue<_Node, _Sty>::intrusive_link_queue()
: _head(nullptr)
, _tail(nullptr)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
, m_count(0)
#endif
{
}

template<class _Node, class _Sty>
auto intrusive_link_queue<_Node, _Sty>::size() const noexcept->size_type
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire);
#else
size_type count = 0;
for (node_type* node = _head; node != nullptr; node = node->next)
++count;
return count;
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Node, class _Sty>
bool intrusive_link_queue<_Node, _Sty>::empty() const noexcept
{
return _head == nullptr;
}

template<class _Node, class _Sty>
void intrusive_link_queue<_Node, _Sty>::push_back(node_type* node) noexcept
{
assert(node != nullptr);

node->_next = nullptr;
if (_head == nullptr)
{
_head = _tail = node;
}
else
{
_tail->_next = node;
_tail = node;
}

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_add(1, std::memory_order_acq_rel);
#endif
}

template<class _Node, class _Sty>
auto intrusive_link_queue<_Node, _Sty>::try_pop() noexcept->node_type*
{
if (_head == nullptr)
return nullptr;

node_type* node = _head;
_head = node->_next;
node->_next = nullptr;

if (_tail == node)
{
assert(node->_next == nullptr);
_tail = nullptr;
}

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_sub(1, std::memory_order_acq_rel);
#endif

return node;
}
}

+ 136
- 139
librf/src/ring_queue.h Zobrazit soubor

@@ -1,154 +1,151 @@
#pragma once

#include <memory>
#include <atomic>
#include <cassert>
#include <optional>
#include "src/spinlock.h"

//使用自旋锁完成的线程安全的环形队列。
//支持多个线程同时push和pop。
//_Option : 如果队列保存的数据不支持拷贝只支持移动,则需要设置为true;或者数据希望pop后销毁,都需要设置为true。
//_Sty : 内存保持数量和索引的整数类型。用于外部控制队列的结构体大小。
template<class _Ty, bool _Option = false, class _Sty = uint32_t>
struct ring_queue
RESUMEF_NS
{
using value_type = _Ty;
using size_type = _Sty;

static constexpr bool use_option = _Option;
using optional_type = std::conditional_t<use_option, std::optional<value_type>, value_type>;
public:
ring_queue(size_t sz);

ring_queue(const ring_queue&) = delete;
ring_queue(ring_queue&&) = default;
ring_queue& operator =(const ring_queue&) = delete;
ring_queue& operator =(ring_queue&&) = default;

auto size() const noexcept->size_type;
auto capacity() const noexcept->size_type;
bool empty() const noexcept;
bool full() const noexcept;
template<class U>
bool try_push(U&& value) noexcept(std::is_nothrow_move_assignable_v<U>);
bool try_pop(value_type& value) noexcept(std::is_nothrow_move_assignable_v<value_type>);
private:
using container_type = std::conditional_t<std::is_same_v<value_type, bool>, std::unique_ptr<optional_type[]>, std::vector<optional_type>>;
container_type m_bufferPtr;
size_type m_bufferSize;

size_type m_writeIndex;
size_type m_readIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
std::atomic<size_type> m_count;
#endif

auto nextIndex(size_type a_count) const noexcept->size_type;
};

template<class _Ty, bool _Option, class _Sty>
ring_queue<_Ty, _Option, _Sty>::ring_queue(size_t sz)
: m_bufferSize(static_cast<size_type>(sz + 1))
, m_writeIndex(0)
, m_readIndex(0)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
, m_count(0)
#endif
{
if constexpr (std::is_same_v<value_type, bool>)
m_bufferPtr = container_type{ new optional_type[sz + 1] };
else
m_bufferPtr.resize(sz + 1);
//使用自旋锁完成的线程安全的环形队列。
//支持多个线程同时push和pop。
//_Option : 如果队列保存的数据不支持拷贝只支持移动,则需要设置为true;或者数据希望pop后销毁,都需要设置为true。
//_Sty : 内存保持数量和索引的整数类型。用于外部控制队列的结构体大小。
template<class _Ty, bool _Option = false, class _Sty = uint32_t>
struct ring_queue
{
using value_type = _Ty;
using size_type = _Sty;

static constexpr bool use_option = _Option;
using optional_type = std::conditional_t<use_option, std::optional<value_type>, value_type>;
public:
ring_queue(size_t sz);

ring_queue(const ring_queue&) = delete;
ring_queue(ring_queue&&) = default;
ring_queue& operator =(const ring_queue&) = delete;
ring_queue& operator =(ring_queue&&) = default;

auto size() const noexcept->size_type;
auto capacity() const noexcept->size_type;
bool empty() const noexcept;
bool full() const noexcept;
template<class U>
bool try_push(U&& value) noexcept(std::is_nothrow_move_assignable_v<U>);
bool try_pop(value_type& value) noexcept(std::is_nothrow_move_assignable_v<value_type>);
private:
using container_type = std::conditional_t<std::is_same_v<value_type, bool>, std::unique_ptr<optional_type[]>, std::vector<optional_type>>;
container_type m_bufferPtr;
size_type m_bufferSize;

size_type m_writeIndex;
size_type m_readIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
std::atomic<size_type> m_count;
#endif

auto nextIndex(size_type a_count) const noexcept->size_type;
};

template<class _Ty, bool _Option, class _Sty>
ring_queue<_Ty, _Option, _Sty>::ring_queue(size_t sz)
: m_bufferSize(static_cast<size_type>(sz + 1))
, m_writeIndex(0)
, m_readIndex(0)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
, m_count(0)
#endif
{
if constexpr (std::is_same_v<value_type, bool>)
m_bufferPtr = container_type{ new optional_type[sz + 1] };
else
m_bufferPtr.resize(sz + 1);

assert(sz < (std::numeric_limits<size_type>::max)());
}
assert(sz < (std::numeric_limits<size_type>::max)());
}

template<class _Ty, bool _Option, class _Sty>
auto ring_queue<_Ty, _Option, _Sty>::nextIndex(size_type a_count) const noexcept->size_type
{
return static_cast<size_type>((a_count + 1) % m_bufferSize);
}
template<class _Ty, bool _Option, class _Sty>
auto ring_queue<_Ty, _Option, _Sty>::nextIndex(size_type a_count) const noexcept->size_type
{
return static_cast<size_type>((a_count + 1) % m_bufferSize);
}

template<class _Ty, bool _Option, class _Sty>
auto ring_queue<_Ty, _Option, _Sty>::size() const noexcept->size_type
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire);
#else
if (m_writeIndex >= m_readIndex)
return (m_writeIndex - m_readIndex);
else
return (m_bufferSize + m_writeIndex - m_readIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
auto ring_queue<_Ty, _Option, _Sty>::capacity() const noexcept->size_type
{
return m_bufferSize - 1;
}
template<class _Ty, bool _Option, class _Sty>
auto ring_queue<_Ty, _Option, _Sty>::size() const noexcept->size_type
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire);
#else
if (m_writeIndex >= m_readIndex)
return (m_writeIndex - m_readIndex);
else
return (m_bufferSize + m_writeIndex - m_readIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
bool ring_queue<_Ty, _Option, _Sty>::empty() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire) == 0;
#else
return m_writeIndex == m_readIndex;
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
bool ring_queue<_Ty, _Option, _Sty>::full() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return (m_count.load(std::memory_order_acquire) == (m_bufferSize - 1));
#else
return nextIndex(m_writeIndex) == m_readIndex;
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
template<class U>
bool ring_queue<_Ty, _Option, _Sty>::try_push(U&& value) noexcept(std::is_nothrow_move_assignable_v<U>)
{
auto nextWriteIndex = nextIndex(m_writeIndex);
if (nextWriteIndex == m_readIndex)
return false;
template<class _Ty, bool _Option, class _Sty>
auto ring_queue<_Ty, _Option, _Sty>::capacity() const noexcept->size_type
{
return m_bufferSize - 1;
}

assert(m_writeIndex < m_bufferSize);
template<class _Ty, bool _Option, class _Sty>
bool ring_queue<_Ty, _Option, _Sty>::empty() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire) == 0;
#else
return m_writeIndex == m_readIndex;
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
bool ring_queue<_Ty, _Option, _Sty>::full() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return (m_count.load(std::memory_order_acquire) == (m_bufferSize - 1));
#else
return nextIndex(m_writeIndex) == m_readIndex;
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

m_bufferPtr[m_writeIndex] = std::move(value);
m_writeIndex = nextWriteIndex;
template<class _Ty, bool _Option, class _Sty>
template<class U>
bool ring_queue<_Ty, _Option, _Sty>::try_push(U&& value) noexcept(std::is_nothrow_move_assignable_v<U>)
{
auto nextWriteIndex = nextIndex(m_writeIndex);
if (nextWriteIndex == m_readIndex)
return false;

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_add(1, std::memory_order_acq_rel);
#endif
return true;
}
assert(m_writeIndex < m_bufferSize);

template<class _Ty, bool _Option, class _Sty>
bool ring_queue<_Ty, _Option, _Sty>::try_pop(value_type& value) noexcept(std::is_nothrow_move_assignable_v<value_type>)
{
if (m_readIndex == m_writeIndex)
return false;
m_bufferPtr[m_writeIndex] = std::move(value);
m_writeIndex = nextWriteIndex;

optional_type& ov = m_bufferPtr[m_readIndex];
if constexpr (use_option)
{
value = std::move(ov.value());
ov = std::nullopt;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_add(1, std::memory_order_acq_rel);
#endif
return true;
}
else

template<class _Ty, bool _Option, class _Sty>
bool ring_queue<_Ty, _Option, _Sty>::try_pop(value_type& value) noexcept(std::is_nothrow_move_assignable_v<value_type>)
{
value = std::move(ov);
if (m_readIndex == m_writeIndex)
return false;

optional_type& ov = m_bufferPtr[m_readIndex];
if constexpr (use_option)
{
value = std::move(ov.value());
ov = std::nullopt;
}
else
{
value = std::move(ov);
}

m_readIndex = nextIndex(m_readIndex);

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_sub(1, std::memory_order_acq_rel);
#endif
return true;
}

m_readIndex = nextIndex(m_readIndex);

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_sub(1, std::memory_order_acq_rel);
#endif
return true;
}
}

+ 199
- 201
librf/src/ring_queue_lockfree.h Zobrazit soubor

@@ -1,221 +1,219 @@
#pragma once

#include <memory>
#include <atomic>
#include <cassert>
#include <thread>

//目前无法解决三个索引数值回绕导致的问题
//如果为了避免索引回绕的问题,索引采用uint64_t类型,
//则在与spinlock<T, false, uint32_t>版本的对比中速度反而慢了
//pop时无法使用move语义来获取数据。因为算法要求先获取值,且获取后有可能失败,从而重新获取其它值。
template<class _Ty, class _Sty = uint32_t>
struct ring_queue_lockfree
RESUMEF_NS
{
using value_type = _Ty;
using size_type = _Sty;
public:
ring_queue_lockfree(size_t sz);

ring_queue_lockfree(const ring_queue_lockfree&) = delete;
ring_queue_lockfree(ring_queue_lockfree&&) = default;
ring_queue_lockfree& operator =(const ring_queue_lockfree&) = delete;
ring_queue_lockfree& operator =(ring_queue_lockfree&&) = default;

auto size() const noexcept->size_type;
auto capacity() const noexcept->size_type;
bool empty() const noexcept;
bool full() const noexcept;
template<class U>
bool try_push(U&& value) noexcept(std::is_nothrow_move_constructible_v<U>);
bool try_pop(value_type& value) noexcept(std::is_nothrow_move_constructible_v<value_type>);
private:
std::unique_ptr<value_type[]> m_bufferPtr;
size_type m_bufferSize;

std::atomic<size_type> m_writeIndex; //Where a new element will be inserted to.
std::atomic<size_type> m_readIndex; //Where the next element where be extracted from.
std::atomic<size_type> m_maximumReadIndex; //It points to the place where the latest "commited" data has been inserted.
//If it's not the same as writeIndex it means there are writes pending to be "commited" to the queue,
//that means that the place for the data was reserved (the index in the array)
//but the data is still not in the queue,
//so the thread trying to read will have to wait for those other threads to
//save the data into the queue.
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
std::atomic<size_type> m_count;
#endif

auto countToIndex(size_type a_count) const noexcept->size_type;
auto nextIndex(size_type a_count) const noexcept->size_type;
};

template<class _Ty, class _Sty>
ring_queue_lockfree<_Ty, _Sty>::ring_queue_lockfree(size_t sz)
: m_bufferPtr(new value_type[sz + 1])
, m_bufferSize(static_cast<size_type>(sz + 1))
, m_writeIndex(0)
, m_readIndex(0)
, m_maximumReadIndex(0)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
, m_count(0)
#endif
{
assert(sz < (std::numeric_limits<size_type>::max)());
}

template<class _Ty, class _Sty>
auto ring_queue_lockfree<_Ty, _Sty>::countToIndex(size_type a_count) const noexcept->size_type
{
//return (a_count % m_bufferSize);
return a_count;
}

template<class _Ty, class _Sty>
auto ring_queue_lockfree<_Ty, _Sty>::nextIndex(size_type a_count) const noexcept->size_type
{
//return static_cast<size_type>((a_count + 1));
return static_cast<size_type>((a_count + 1) % m_bufferSize);
}
//目前无法解决三个索引数值回绕导致的问题
//如果为了避免索引回绕的问题,索引采用uint64_t类型,
//则在与spinlock<T, false, uint32_t>版本的对比中速度反而慢了
//pop时无法使用move语义来获取数据。因为算法要求先获取值,且获取后有可能失败,从而重新获取其它值。
template<class _Ty, class _Sty = uint32_t>
struct ring_queue_lockfree
{
using value_type = _Ty;
using size_type = _Sty;
public:
ring_queue_lockfree(size_t sz);

ring_queue_lockfree(const ring_queue_lockfree&) = delete;
ring_queue_lockfree(ring_queue_lockfree&&) = default;
ring_queue_lockfree& operator =(const ring_queue_lockfree&) = delete;
ring_queue_lockfree& operator =(ring_queue_lockfree&&) = default;

auto size() const noexcept->size_type;
auto capacity() const noexcept->size_type;
bool empty() const noexcept;
bool full() const noexcept;
template<class U>
bool try_push(U&& value) noexcept(std::is_nothrow_move_constructible_v<U>);
bool try_pop(value_type& value) noexcept(std::is_nothrow_move_constructible_v<value_type>);
private:
std::unique_ptr<value_type[]> m_bufferPtr;
size_type m_bufferSize;

std::atomic<size_type> m_writeIndex; //Where a new element will be inserted to.
std::atomic<size_type> m_readIndex; //Where the next element where be extracted from.
std::atomic<size_type> m_maximumReadIndex; //It points to the place where the latest "commited" data has been inserted.
//If it's not the same as writeIndex it means there are writes pending to be "commited" to the queue,
//that means that the place for the data was reserved (the index in the array)
//but the data is still not in the queue,
//so the thread trying to read will have to wait for those other threads to
//save the data into the queue.
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
std::atomic<size_type> m_count;
#endif

auto countToIndex(size_type a_count) const noexcept->size_type;
auto nextIndex(size_type a_count) const noexcept->size_type;
};

template<class _Ty, class _Sty>
ring_queue_lockfree<_Ty, _Sty>::ring_queue_lockfree(size_t sz)
: m_bufferPtr(new value_type[sz + 1])
, m_bufferSize(static_cast<size_type>(sz + 1))
, m_writeIndex(0)
, m_readIndex(0)
, m_maximumReadIndex(0)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
, m_count(0)
#endif
{
assert(sz < (std::numeric_limits<size_type>::max)());
}

template<class _Ty, class _Sty>
auto ring_queue_lockfree<_Ty, _Sty>::size() const noexcept->size_type
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load();
#else
auto currentWriteIndex = m_maximumReadIndex.load(std::memory_order_acquire);
currentWriteIndex = countToIndex(currentWriteIndex);

auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
currentReadIndex = countToIndex(currentReadIndex);

if (currentWriteIndex >= currentReadIndex)
return (currentWriteIndex - currentReadIndex);
else
return (m_bufferSize + currentWriteIndex - currentReadIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, class _Sty>
auto ring_queue_lockfree<_Ty, _Sty>::capacity() const noexcept->size_type
{
return m_bufferSize - 1;
}
template<class _Ty, class _Sty>
auto ring_queue_lockfree<_Ty, _Sty>::countToIndex(size_type a_count) const noexcept->size_type
{
//return (a_count % m_bufferSize);
return a_count;
}

template<class _Ty, class _Sty>
bool ring_queue_lockfree<_Ty, _Sty>::empty() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load() == 0;
#else
auto currentWriteIndex = m_maximumReadIndex.load(std::memory_order_acquire);
auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
return countToIndex(currentWriteIndex) == countToIndex(currentReadIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, class _Sty>
bool ring_queue_lockfree<_Ty, _Sty>::full() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return (m_count.load() == (m_bufferSize - 1));
#else
auto currentWriteIndex = m_writeIndex.load(std::memory_order_acquire);
auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
return countToIndex(nextIndex(currentWriteIndex)) == countToIndex(currentReadIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, class _Sty>
template<class U>
bool ring_queue_lockfree<_Ty, _Sty>::try_push(U&& value) noexcept(std::is_nothrow_move_constructible_v<U>)
{
auto currentWriteIndex = m_writeIndex.load(std::memory_order_acquire);
template<class _Ty, class _Sty>
auto ring_queue_lockfree<_Ty, _Sty>::nextIndex(size_type a_count) const noexcept->size_type
{
//return static_cast<size_type>((a_count + 1));
return static_cast<size_type>((a_count + 1) % m_bufferSize);
}

do
template<class _Ty, class _Sty>
auto ring_queue_lockfree<_Ty, _Sty>::size() const noexcept->size_type
{
if (countToIndex(nextIndex(currentWriteIndex)) == countToIndex(m_readIndex.load(std::memory_order_acquire)))
{
// the queue is full
return false;
}
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load();
#else
auto currentWriteIndex = m_maximumReadIndex.load(std::memory_order_acquire);
currentWriteIndex = countToIndex(currentWriteIndex);

auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
currentReadIndex = countToIndex(currentReadIndex);

if (currentWriteIndex >= currentReadIndex)
return (currentWriteIndex - currentReadIndex);
else
return (m_bufferSize + currentWriteIndex - currentReadIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

// There is more than one producer. Keep looping till this thread is able
// to allocate space for current piece of data
//
// using compare_exchange_strong because it isn't allowed to fail spuriously
// When the compare_exchange operation is in a loop the weak version
// will yield better performance on some platforms, but here we'd have to
// load m_writeIndex all over again
} while (!m_writeIndex.compare_exchange_strong(currentWriteIndex, nextIndex(currentWriteIndex), std::memory_order_acq_rel));

// Just made sure this index is reserved for this thread.
m_bufferPtr[countToIndex(currentWriteIndex)] = std::move(value);

// update the maximum read index after saving the piece of data. It can't
// fail if there is only one thread inserting in the queue. It might fail
// if there is more than 1 producer thread because this operation has to
// be done in the same order as the previous CAS
//
// using compare_exchange_weak because they are allowed to fail spuriously
// (act as if *this != expected, even if they are equal), but when the
// compare_exchange operation is in a loop the weak version will yield
// better performance on some platforms.
auto savedWriteIndex = currentWriteIndex;
while (!m_maximumReadIndex.compare_exchange_weak(currentWriteIndex, nextIndex(currentWriteIndex), std::memory_order_acq_rel))
template<class _Ty, class _Sty>
auto ring_queue_lockfree<_Ty, _Sty>::capacity() const noexcept->size_type
{
currentWriteIndex = savedWriteIndex;
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield (POSIX.1b)
std::this_thread::yield();
return m_bufferSize - 1;
}

// The value was successfully inserted into the queue
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_add(1);
#endif
return true;
}
template<class _Ty, class _Sty>
bool ring_queue_lockfree<_Ty, _Sty>::empty() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load() == 0;
#else
auto currentWriteIndex = m_maximumReadIndex.load(std::memory_order_acquire);
auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
return countToIndex(currentWriteIndex) == countToIndex(currentReadIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, class _Sty>
bool ring_queue_lockfree<_Ty, _Sty>::try_pop(value_type & value) noexcept(std::is_nothrow_move_constructible_v<value_type>)
{
auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
template<class _Ty, class _Sty>
bool ring_queue_lockfree<_Ty, _Sty>::full() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return (m_count.load() == (m_bufferSize - 1));
#else
auto currentWriteIndex = m_writeIndex.load(std::memory_order_acquire);
auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
return countToIndex(nextIndex(currentWriteIndex)) == countToIndex(currentReadIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

for(;;)
template<class _Ty, class _Sty>
template<class U>
bool ring_queue_lockfree<_Ty, _Sty>::try_push(U&& value) noexcept(std::is_nothrow_move_constructible_v<U>)
{
auto idx = countToIndex(currentReadIndex);
auto currentWriteIndex = m_writeIndex.load(std::memory_order_acquire);

// to ensure thread-safety when there is more than 1 producer
// thread a second index is defined (m_maximumReadIndex)
if (idx == countToIndex(m_maximumReadIndex.load(std::memory_order_acquire)))
do
{
if (countToIndex(nextIndex(currentWriteIndex)) == countToIndex(m_readIndex.load(std::memory_order_acquire)))
{
// the queue is full
return false;
}

// There is more than one producer. Keep looping till this thread is able
// to allocate space for current piece of data
//
// using compare_exchange_strong because it isn't allowed to fail spuriously
// When the compare_exchange operation is in a loop the weak version
// will yield better performance on some platforms, but here we'd have to
// load m_writeIndex all over again
} while (!m_writeIndex.compare_exchange_strong(currentWriteIndex, nextIndex(currentWriteIndex), std::memory_order_acq_rel));

// Just made sure this index is reserved for this thread.
m_bufferPtr[countToIndex(currentWriteIndex)] = std::move(value);

// update the maximum read index after saving the piece of data. It can't
// fail if there is only one thread inserting in the queue. It might fail
// if there is more than 1 producer thread because this operation has to
// be done in the same order as the previous CAS
//
// using compare_exchange_weak because they are allowed to fail spuriously
// (act as if *this != expected, even if they are equal), but when the
// compare_exchange operation is in a loop the weak version will yield
// better performance on some platforms.
auto savedWriteIndex = currentWriteIndex;
while (!m_maximumReadIndex.compare_exchange_weak(currentWriteIndex, nextIndex(currentWriteIndex), std::memory_order_acq_rel))
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
currentWriteIndex = savedWriteIndex;
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield (POSIX.1b)
std::this_thread::yield();
}

// retrieve the data from the queue
value = m_bufferPtr[idx]; //但是,这里的方法不适合。如果只支持移动怎么办?
// The value was successfully inserted into the queue
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_add(1);
#endif
return true;
}

// try to perfrom now the CAS operation on the read index. If we succeed
// a_data already contains what m_readIndex pointed to before we
// increased it
if (m_readIndex.compare_exchange_strong(currentReadIndex, nextIndex(currentReadIndex), std::memory_order_acq_rel))
{
// got here. The value was retrieved from the queue. Note that the
// data inside the m_queue array is not deleted nor reseted
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_sub(1);
#endif
return true;
}
template<class _Ty, class _Sty>
bool ring_queue_lockfree<_Ty, _Sty>::try_pop(value_type& value) noexcept(std::is_nothrow_move_constructible_v<value_type>)
{
auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);

// it failed retrieving the element off the queue. Someone else must
// have read the element stored at countToIndex(currentReadIndex)
// before we could perform the CAS operation
} // keep looping to try again!
}
for (;;)
{
auto idx = countToIndex(currentReadIndex);

// to ensure thread-safety when there is more than 1 producer
// thread a second index is defined (m_maximumReadIndex)
if (idx == countToIndex(m_maximumReadIndex.load(std::memory_order_acquire)))
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
}

// retrieve the data from the queue
value = m_bufferPtr[idx]; //但是,这里的方法不适合。如果只支持移动怎么办?

// try to perfrom now the CAS operation on the read index. If we succeed
// a_data already contains what m_readIndex pointed to before we
// increased it
if (m_readIndex.compare_exchange_strong(currentReadIndex, nextIndex(currentReadIndex), std::memory_order_acq_rel))
{
// got here. The value was retrieved from the queue. Note that the
// data inside the m_queue array is not deleted nor reseted
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_sub(1);
#endif
return true;
}

// it failed retrieving the element off the queue. Someone else must
// have read the element stored at countToIndex(currentReadIndex)
// before we could perform the CAS operation
} // keep looping to try again!
}
}

+ 143
- 147
librf/src/ring_queue_spinlock.h Zobrazit soubor

@@ -1,166 +1,162 @@
#pragma once

#include <memory>
#include <atomic>
#include <cassert>
#include <optional>
#include "src/spinlock.h"

//使用自旋锁完成的线程安全的环形队列。
//支持多个线程同时push和pop。
//_Option : 如果队列保存的数据不支持拷贝只支持移动,则需要设置为true;或者数据希望pop后销毁,都需要设置为true。
//_Sty : 内存保持数量和索引的整数类型。用于外部控制队列的结构体大小。
template<class _Ty, bool _Option = false, class _Sty = uint32_t>
struct ring_queue_spinlock
RESUMEF_NS
{
using value_type = _Ty;
using size_type = _Sty;

static constexpr bool use_option = _Option;
using optional_type = std::conditional_t<use_option, std::optional<value_type>, value_type>;
public:
ring_queue_spinlock(size_t sz);

ring_queue_spinlock(const ring_queue_spinlock&) = delete;
ring_queue_spinlock(ring_queue_spinlock&&) = default;
ring_queue_spinlock& operator =(const ring_queue_spinlock&) = delete;
ring_queue_spinlock& operator =(ring_queue_spinlock&&) = default;

auto size() const noexcept->size_type;
auto capacity() const noexcept->size_type;
bool empty() const noexcept;
bool full() const noexcept;
template<class U>
bool try_push(U&& value) noexcept(std::is_nothrow_move_assignable_v<U>);
bool try_pop(value_type& value) noexcept(std::is_nothrow_move_assignable_v<value_type>);
private:
using container_type = std::conditional_t<std::is_same_v<value_type, bool>, std::unique_ptr<optional_type[]>, std::vector<optional_type>>;
container_type m_bufferPtr;
size_type m_bufferSize;

size_type m_writeIndex;
size_type m_readIndex;
mutable resumef::spinlock m_lock;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
std::atomic<size_type> m_count;
#endif

auto nextIndex(size_type a_count) const noexcept->size_type;
};

template<class _Ty, bool _Option, class _Sty>
ring_queue_spinlock<_Ty, _Option, _Sty>::ring_queue_spinlock(size_t sz)
: m_bufferSize(static_cast<size_type>(sz + 1))
, m_writeIndex(0)
, m_readIndex(0)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
, m_count(0)
#endif
{
if constexpr (std::is_same_v<value_type, bool>)
m_bufferPtr = container_type{ new optional_type[sz + 1] };
else
m_bufferPtr.resize(sz + 1);
//使用自旋锁完成的线程安全的环形队列。
//支持多个线程同时push和pop。
//_Option : 如果队列保存的数据不支持拷贝只支持移动,则需要设置为true;或者数据希望pop后销毁,都需要设置为true。
//_Sty : 内存保持数量和索引的整数类型。用于外部控制队列的结构体大小。
template<class _Ty, bool _Option = false, class _Sty = uint32_t>
struct ring_queue_spinlock
{
using value_type = _Ty;
using size_type = _Sty;

static constexpr bool use_option = _Option;
using optional_type = std::conditional_t<use_option, std::optional<value_type>, value_type>;
public:
ring_queue_spinlock(size_t sz);

ring_queue_spinlock(const ring_queue_spinlock&) = delete;
ring_queue_spinlock(ring_queue_spinlock&&) = default;
ring_queue_spinlock& operator =(const ring_queue_spinlock&) = delete;
ring_queue_spinlock& operator =(ring_queue_spinlock&&) = default;

auto size() const noexcept->size_type;
auto capacity() const noexcept->size_type;
bool empty() const noexcept;
bool full() const noexcept;
template<class U>
bool try_push(U&& value) noexcept(std::is_nothrow_move_assignable_v<U>);
bool try_pop(value_type& value) noexcept(std::is_nothrow_move_assignable_v<value_type>);
private:
using container_type = std::conditional_t<std::is_same_v<value_type, bool>, std::unique_ptr<optional_type[]>, std::vector<optional_type>>;
container_type m_bufferPtr;
size_type m_bufferSize;

size_type m_writeIndex;
size_type m_readIndex;
mutable resumef::spinlock m_lock;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
std::atomic<size_type> m_count;
#endif

auto nextIndex(size_type a_count) const noexcept->size_type;
};

template<class _Ty, bool _Option, class _Sty>
ring_queue_spinlock<_Ty, _Option, _Sty>::ring_queue_spinlock(size_t sz)
: m_bufferSize(static_cast<size_type>(sz + 1))
, m_writeIndex(0)
, m_readIndex(0)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
, m_count(0)
#endif
{
if constexpr (std::is_same_v<value_type, bool>)
m_bufferPtr = container_type{ new optional_type[sz + 1] };
else
m_bufferPtr.resize(sz + 1);
assert(sz < (std::numeric_limits<size_type>::max)());
}
assert(sz < (std::numeric_limits<size_type>::max)());
}

template<class _Ty, bool _Option, class _Sty>
auto ring_queue_spinlock<_Ty, _Option, _Sty>::nextIndex(size_type a_count) const noexcept->size_type
{
return static_cast<size_type>((a_count + 1) % m_bufferSize);
}
template<class _Ty, bool _Option, class _Sty>
auto ring_queue_spinlock<_Ty, _Option, _Sty>::nextIndex(size_type a_count) const noexcept->size_type
{
return static_cast<size_type>((a_count + 1) % m_bufferSize);
}

template<class _Ty, bool _Option, class _Sty>
auto ring_queue_spinlock<_Ty, _Option, _Sty>::size() const noexcept->size_type
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire);
#else
std::scoped_lock __guard(this->m_lock);

if (m_writeIndex >= m_readIndex)
return (m_writeIndex - m_readIndex);
else
return (m_bufferSize + m_writeIndex - m_readIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
auto ring_queue_spinlock<_Ty, _Option, _Sty>::capacity() const noexcept->size_type
{
return m_bufferSize - 1;
}
template<class _Ty, bool _Option, class _Sty>
auto ring_queue_spinlock<_Ty, _Option, _Sty>::size() const noexcept->size_type
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire);
#else
std::scoped_lock __guard(this->m_lock);

if (m_writeIndex >= m_readIndex)
return (m_writeIndex - m_readIndex);
else
return (m_bufferSize + m_writeIndex - m_readIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
bool ring_queue_spinlock<_Ty, _Option, _Sty>::empty() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire) == 0;
#else
std::scoped_lock __guard(this->m_lock);
template<class _Ty, bool _Option, class _Sty>
auto ring_queue_spinlock<_Ty, _Option, _Sty>::capacity() const noexcept->size_type
{
return m_bufferSize - 1;
}

return m_writeIndex == m_readIndex;
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template<class _Ty, bool _Option, class _Sty>
bool ring_queue_spinlock<_Ty, _Option, _Sty>::empty() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire) == 0;
#else
std::scoped_lock __guard(this->m_lock);

template<class _Ty, bool _Option, class _Sty>
bool ring_queue_spinlock<_Ty, _Option, _Sty>::full() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return (m_count.load(std::memory_order_acquire) == (m_bufferSize - 1));
#else
std::scoped_lock __guard(this->m_lock);

return nextIndex(m_writeIndex) == m_readIndex;
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
template<class U>
bool ring_queue_spinlock<_Ty, _Option, _Sty>::try_push(U&& value) noexcept(std::is_nothrow_move_assignable_v<U>)
{
std::scoped_lock __guard(this->m_lock);
return m_writeIndex == m_readIndex;
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

auto nextWriteIndex = nextIndex(m_writeIndex);
if (nextWriteIndex == m_readIndex)
return false;
template<class _Ty, bool _Option, class _Sty>
bool ring_queue_spinlock<_Ty, _Option, _Sty>::full() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return (m_count.load(std::memory_order_acquire) == (m_bufferSize - 1));
#else
std::scoped_lock __guard(this->m_lock);

assert(m_writeIndex < m_bufferSize);
return nextIndex(m_writeIndex) == m_readIndex;
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

m_bufferPtr[m_writeIndex] = std::move(value);
m_writeIndex = nextWriteIndex;
template<class _Ty, bool _Option, class _Sty>
template<class U>
bool ring_queue_spinlock<_Ty, _Option, _Sty>::try_push(U&& value) noexcept(std::is_nothrow_move_assignable_v<U>)
{
std::scoped_lock __guard(this->m_lock);

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_add(1, std::memory_order_acq_rel);
#endif
return true;
}
auto nextWriteIndex = nextIndex(m_writeIndex);
if (nextWriteIndex == m_readIndex)
return false;

template<class _Ty, bool _Option, class _Sty>
bool ring_queue_spinlock<_Ty, _Option, _Sty>::try_pop(value_type& value) noexcept(std::is_nothrow_move_assignable_v<value_type>)
{
std::scoped_lock __guard(this->m_lock);
assert(m_writeIndex < m_bufferSize);

if (m_readIndex == m_writeIndex)
return false;
m_bufferPtr[m_writeIndex] = std::move(value);
m_writeIndex = nextWriteIndex;

optional_type& ov = m_bufferPtr[m_readIndex];
if constexpr (use_option)
{
value = std::move(ov.value());
ov = std::nullopt;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_add(1, std::memory_order_acq_rel);
#endif
return true;
}
else

template<class _Ty, bool _Option, class _Sty>
bool ring_queue_spinlock<_Ty, _Option, _Sty>::try_pop(value_type& value) noexcept(std::is_nothrow_move_assignable_v<value_type>)
{
value = std::move(ov);
std::scoped_lock __guard(this->m_lock);

if (m_readIndex == m_writeIndex)
return false;

optional_type& ov = m_bufferPtr[m_readIndex];
if constexpr (use_option)
{
value = std::move(ov.value());
ov = std::nullopt;
}
else
{
value = std::move(ov);
}

m_readIndex = nextIndex(m_readIndex);

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_sub(1, std::memory_order_acq_rel);
#endif
return true;
}

m_readIndex = nextIndex(m_readIndex);

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_sub(1, std::memory_order_acq_rel);
#endif
return true;
}

}

+ 14
- 7
librf/src/spinlock.h Zobrazit soubor

@@ -6,7 +6,8 @@ RESUMEF_NS
{
struct spinlock
{
static const size_t MAX_ACTIVE_SPIN = 4000;
static const size_t MAX_ACTIVE_SPIN = 1000;
static const size_t MAX_YIELD_SPIN = 4000;
static const int FREE_VALUE = 0;
static const int LOCKED_VALUE = 1;
@@ -22,23 +23,29 @@ RESUMEF_NS
void lock() noexcept
{
using namespace std::chrono;
int val = FREE_VALUE;
if (!lck.compare_exchange_weak(val, LOCKED_VALUE, std::memory_order_acquire))
if (!lck.compare_exchange_weak(val, LOCKED_VALUE, std::memory_order_acq_rel))
{
#if _DEBUG
//诊断错误的用法:进行了递归锁调用
assert(owner_thread_id != std::this_thread::get_id());
#endif
size_t spinCount = 0;
auto dt = 1ms;
auto dt = std::chrono::milliseconds{ 1 };
do
{
while (lck.load(std::memory_order_relaxed) != FREE_VALUE)
{
if (spinCount < MAX_ACTIVE_SPIN)
{
++spinCount;
}
else if (spinCount < MAX_YIELD_SPIN)
{
++spinCount;
std::this_thread::yield();
}
else
{
std::this_thread::sleep_for(dt);
@@ -47,7 +54,7 @@ RESUMEF_NS
}
val = FREE_VALUE;
} while (!lck.compare_exchange_weak(val, LOCKED_VALUE, std::memory_order_acquire));
} while (!lck.compare_exchange_weak(val, LOCKED_VALUE, std::memory_order_acq_rel));
}
#if _DEBUG
@@ -58,7 +65,7 @@ RESUMEF_NS
bool try_lock() noexcept
{
int val = FREE_VALUE;
bool ret = lck.compare_exchange_weak(val, LOCKED_VALUE, std::memory_order_acquire);
bool ret = lck.compare_exchange_weak(val, LOCKED_VALUE, std::memory_order_acq_rel);
#if _DEBUG
if (ret) owner_thread_id = std::this_thread::get_id();

+ 24
- 17
tutorial/test_async_channel_mult_thread.cpp Zobrazit soubor

@@ -64,36 +64,41 @@ future_t<> test_channel_producer(const channel_t<std::string> & c, size_t cnt)
}
}
const size_t THREAD = 12;
const size_t BATCH = 10000;
const size_t MAX_CHANNEL_QUEUE = THREAD + 1; //0, 1, 5, 10, -1
const size_t WRITE_THREAD = 6;
const size_t READ_THREAD = 6;
const size_t READ_BATCH = 1000000;
const size_t MAX_CHANNEL_QUEUE = 5; //0, 1, 5, 10, -1
void resumable_main_channel_mult_thread()
{
channel_t<std::string> c(MAX_CHANNEL_QUEUE);
std::thread write_th([&]
std::thread write_th[WRITE_THREAD];
for (size_t i = 0; i < WRITE_THREAD; ++i)
{
local_scheduler my_scheduler; //2017/12/14日,仍然存在BUG。真多线程下调度,存在有协程无法被调度完成的BUG
go test_channel_producer(c, BATCH * THREAD);
write_th[i] = std::thread([&]
{
local_scheduler my_scheduler;
go test_channel_producer(c, READ_BATCH * READ_THREAD / WRITE_THREAD);
#if RESUMEF_ENABLE_MULT_SCHEDULER
this_scheduler()->run_until_notask();
this_scheduler()->run_until_notask();
#endif
{
scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << "Write OK\r\n";
}
});
{
scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << "Write OK\r\n";
}
});
}
std::this_thread::sleep_for(100ms);
std::thread read_th[THREAD];
for (size_t i = 0; i < THREAD; ++i)
std::thread read_th[READ_THREAD];
for (size_t i = 0; i < READ_THREAD; ++i)
{
read_th[i] = std::thread([&]
{
local_scheduler my_scheduler; //2017/12/14日,仍然存在BUG。真多线程下调度,存在有协程无法被调度完成的BUG
go test_channel_consumer(c, BATCH);
local_scheduler my_scheduler;
go test_channel_consumer(c, READ_BATCH);
#if RESUMEF_ENABLE_MULT_SCHEDULER
this_scheduler()->run_until_notask();
#endif
@@ -108,9 +113,11 @@ void resumable_main_channel_mult_thread()
std::this_thread::sleep_for(100ms);
scheduler_t::g_scheduler.run_until_notask();
#endif
for(auto & th : read_th)
th.join();
write_th.join();
for (auto& th : write_th)
th.join();
std::cout << "OK: counter = " << gcounter.load() << std::endl;
}

+ 3
- 3
vs_proj/librf.cpp Zobrazit soubor

@@ -39,9 +39,9 @@ int main(int argc, const char* argv[])
(void)argc;
(void)argv;
//test_ring_queue_simple<ring_queue_single_thread<int>>();
//test_ring_queue<ring_queue_spinlock<int, false, uint32_t>>();
//test_ring_queue<ring_queue_lockfree<int, uint64_t>>();
//test_ring_queue_simple<resumef::ring_queue<int>>();
//test_ring_queue<resumef::ring_queue_spinlock<int, false, uint32_t>>();
//test_ring_queue<resumef::ring_queue_lockfree<int, uint64_t>>();
resumable_main_channel();
resumable_main_channel_mult_thread();

+ 3
- 2
vs_proj/librf.vcxproj Zobrazit soubor

@@ -40,13 +40,13 @@
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<PlatformToolset>ClangCL</PlatformToolset>
<PlatformToolset>v142</PlatformToolset>
<UseDebugLibraries>true</UseDebugLibraries>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>ClangCL</PlatformToolset>
<PlatformToolset>v142</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>NotSet</CharacterSet>
</PropertyGroup>
@@ -231,6 +231,7 @@
<ClInclude Include="..\librf\src\event_v2.h" />
<ClInclude Include="..\librf\src\future.h" />
<ClInclude Include="..\librf\src\generator.h" />
<ClInclude Include="..\librf\src\intrusive_link_queue.h" />
<ClInclude Include="..\librf\src\promise.h" />
<ClInclude Include="..\librf\src\mutex.h" />
<ClInclude Include="..\librf\src\rf_task.h" />

+ 3
- 0
vs_proj/librf.vcxproj.filters Zobrazit soubor

@@ -222,6 +222,9 @@
<ClInclude Include="..\librf\src\ring_queue.h">
<Filter>librf\src</Filter>
</ClInclude>
<ClInclude Include="..\librf\src\intrusive_link_queue.h">
<Filter>librf\src</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<None Include="..\librf\src\asio_task_1.12.0.inl">

Načítá se…
Zrušit
Uložit