librf
ring_queue_lockfree.h
1 #pragma once
2 
3 namespace resumef
4 {
5  //目前无法解决三个索引数值回绕导致的问题
6  //如果为了避免索引回绕的问题,索引采用uint64_t类型,
7  //则在与spinlock<T, false, uint32_t>版本的对比中速度反而慢了
8  //pop时无法使用move语义来获取数据。因为算法要求先获取值,且获取后有可能失败,从而重新获取其它值。
9  template<class _Ty, class _Sty = uint32_t>
10  struct ring_queue_lockfree
11  {
12  using value_type = _Ty;
13  using size_type = _Sty;
14  public:
15  ring_queue_lockfree(size_t sz);
16 
17  ring_queue_lockfree(const ring_queue_lockfree&) = delete;
18  ring_queue_lockfree(ring_queue_lockfree&&) = default;
19  ring_queue_lockfree& operator =(const ring_queue_lockfree&) = delete;
20  ring_queue_lockfree& operator =(ring_queue_lockfree&&) = default;
21 
22  auto size() const noexcept->size_type;
23  auto capacity() const noexcept->size_type;
24  bool empty() const noexcept;
25  bool full() const noexcept;
26  template<class U>
27  bool try_push(U&& value) noexcept(std::is_nothrow_move_constructible_v<U>);
28  bool try_pop(value_type& value) noexcept(std::is_nothrow_move_constructible_v<value_type>);
29  private:
30  std::unique_ptr<value_type[]> m_bufferPtr;
31  size_type m_bufferSize;
32 
33  std::atomic<size_type> m_writeIndex; //Where a new element will be inserted to.
34  std::atomic<size_type> m_readIndex; //Where the next element where be extracted from.
35  std::atomic<size_type> m_maximumReadIndex; //It points to the place where the latest "commited" data has been inserted.
36  //If it's not the same as writeIndex it means there are writes pending to be "commited" to the queue,
37  //that means that the place for the data was reserved (the index in the array)
38  //but the data is still not in the queue,
39  //so the thread trying to read will have to wait for those other threads to
40  //save the data into the queue.
41  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
42  std::atomic<size_type> m_count;
43  #endif
44 
45  auto countToIndex(size_type a_count) const noexcept->size_type;
46  auto nextIndex(size_type a_count) const noexcept->size_type;
47  };
48 
49  template<class _Ty, class _Sty>
50  ring_queue_lockfree<_Ty, _Sty>::ring_queue_lockfree(size_t sz)
51  : m_bufferPtr(new value_type[sz + 1])
52  , m_bufferSize(static_cast<size_type>(sz + 1))
53  , m_writeIndex(0)
54  , m_readIndex(0)
55  , m_maximumReadIndex(0)
56  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
57  , m_count(0)
58  #endif
59  {
60  assert(sz < (std::numeric_limits<size_type>::max)());
61  }
62 
63  template<class _Ty, class _Sty>
64  auto ring_queue_lockfree<_Ty, _Sty>::countToIndex(size_type a_count) const noexcept->size_type
65  {
66  //return (a_count % m_bufferSize);
67  return a_count;
68  }
69 
70  template<class _Ty, class _Sty>
71  auto ring_queue_lockfree<_Ty, _Sty>::nextIndex(size_type a_count) const noexcept->size_type
72  {
73  //return static_cast<size_type>((a_count + 1));
74  return static_cast<size_type>((a_count + 1) % m_bufferSize);
75  }
76 
77  template<class _Ty, class _Sty>
78  auto ring_queue_lockfree<_Ty, _Sty>::size() const noexcept->size_type
79  {
80  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
81  return m_count.load();
82  #else
83  auto currentWriteIndex = m_maximumReadIndex.load(std::memory_order_acquire);
84  currentWriteIndex = countToIndex(currentWriteIndex);
85 
86  auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
87  currentReadIndex = countToIndex(currentReadIndex);
88 
89  if (currentWriteIndex >= currentReadIndex)
90  return (currentWriteIndex - currentReadIndex);
91  else
92  return (m_bufferSize + currentWriteIndex - currentReadIndex);
93  #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
94  }
95 
96  template<class _Ty, class _Sty>
97  auto ring_queue_lockfree<_Ty, _Sty>::capacity() const noexcept->size_type
98  {
99  return m_bufferSize - 1;
100  }
101 
102  template<class _Ty, class _Sty>
103  bool ring_queue_lockfree<_Ty, _Sty>::empty() const noexcept
104  {
105  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
106  return m_count.load() == 0;
107  #else
108  auto currentWriteIndex = m_maximumReadIndex.load(std::memory_order_acquire);
109  auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
110  return countToIndex(currentWriteIndex) == countToIndex(currentReadIndex);
111  #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
112  }
113 
114  template<class _Ty, class _Sty>
115  bool ring_queue_lockfree<_Ty, _Sty>::full() const noexcept
116  {
117  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
118  return (m_count.load() == (m_bufferSize - 1));
119  #else
120  auto currentWriteIndex = m_writeIndex.load(std::memory_order_acquire);
121  auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
122  return countToIndex(nextIndex(currentWriteIndex)) == countToIndex(currentReadIndex);
123  #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
124  }
125 
126  template<class _Ty, class _Sty>
127  template<class U>
128  bool ring_queue_lockfree<_Ty, _Sty>::try_push(U&& value) noexcept(std::is_nothrow_move_constructible_v<U>)
129  {
130  auto currentWriteIndex = m_writeIndex.load(std::memory_order_acquire);
131 
132  do
133  {
134  if (countToIndex(nextIndex(currentWriteIndex)) == countToIndex(m_readIndex.load(std::memory_order_acquire)))
135  {
136  // the queue is full
137  return false;
138  }
139 
140  // There is more than one producer. Keep looping till this thread is able
141  // to allocate space for current piece of data
142  //
143  // using compare_exchange_strong because it isn't allowed to fail spuriously
144  // When the compare_exchange operation is in a loop the weak version
145  // will yield better performance on some platforms, but here we'd have to
146  // load m_writeIndex all over again
147  } while (!m_writeIndex.compare_exchange_strong(currentWriteIndex, nextIndex(currentWriteIndex), std::memory_order_acq_rel));
148 
149  // Just made sure this index is reserved for this thread.
150  m_bufferPtr[countToIndex(currentWriteIndex)] = std::move(value);
151 
152  // update the maximum read index after saving the piece of data. It can't
153  // fail if there is only one thread inserting in the queue. It might fail
154  // if there is more than 1 producer thread because this operation has to
155  // be done in the same order as the previous CAS
156  //
157  // using compare_exchange_weak because they are allowed to fail spuriously
158  // (act as if *this != expected, even if they are equal), but when the
159  // compare_exchange operation is in a loop the weak version will yield
160  // better performance on some platforms.
161  auto savedWriteIndex = currentWriteIndex;
162  while (!m_maximumReadIndex.compare_exchange_weak(currentWriteIndex, nextIndex(currentWriteIndex), std::memory_order_acq_rel))
163  {
164  currentWriteIndex = savedWriteIndex;
165  // this is a good place to yield the thread in case there are more
166  // software threads than hardware processors and you have more
167  // than 1 producer thread
168  // have a look at sched_yield (POSIX.1b)
169  std::this_thread::yield();
170  }
171 
172  // The value was successfully inserted into the queue
173  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
174  m_count.fetch_add(1);
175  #endif
176  return true;
177  }
178 
179  template<class _Ty, class _Sty>
180  bool ring_queue_lockfree<_Ty, _Sty>::try_pop(value_type& value) noexcept(std::is_nothrow_move_constructible_v<value_type>)
181  {
182  auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
183 
184  for (;;)
185  {
186  auto idx = countToIndex(currentReadIndex);
187 
188  // to ensure thread-safety when there is more than 1 producer
189  // thread a second index is defined (m_maximumReadIndex)
190  if (idx == countToIndex(m_maximumReadIndex.load(std::memory_order_acquire)))
191  {
192  // the queue is empty or
193  // a producer thread has allocate space in the queue but is
194  // waiting to commit the data into it
195  return false;
196  }
197 
198  // retrieve the data from the queue
199  value = m_bufferPtr[idx]; //但是,这里的方法不适合。如果只支持移动怎么办?
200 
201  // try to perfrom now the CAS operation on the read index. If we succeed
202  // a_data already contains what m_readIndex pointed to before we
203  // increased it
204  if (m_readIndex.compare_exchange_strong(currentReadIndex, nextIndex(currentReadIndex), std::memory_order_acq_rel))
205  {
206  // got here. The value was retrieved from the queue. Note that the
207  // data inside the m_queue array is not deleted nor reseted
208  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
209  m_count.fetch_sub(1);
210  #endif
211  return true;
212  }
213 
214  // it failed retrieving the element off the queue. Someone else must
215  // have read the element stored at countToIndex(currentReadIndex)
216  // before we could perform the CAS operation
217  } // keep looping to try again!
218  }
219 }