librf
channel_v1.h
1 #pragma once
2 
3 namespace resumef
4 {
5  namespace detail
6  {
7  template<class _Ty>
8  struct channel_impl : public std::enable_shared_from_this<channel_impl<_Ty>>
9  {
10  typedef _awaker<channel_impl<_Ty>, _Ty*, error_code> channel_read_awaker;
11  typedef std::shared_ptr<channel_read_awaker> channel_read_awaker_ptr;
12 
13  typedef _awaker<channel_impl<_Ty>> channel_write_awaker;
14  typedef std::shared_ptr<channel_write_awaker> channel_write_awaker_ptr;
15  typedef std::pair<channel_write_awaker_ptr, _Ty> write_tuple_type;
16  private:
17  //typedef spinlock lock_type;
18  typedef std::recursive_mutex lock_type;
19 
20  lock_type _lock; //保证访问本对象是线程安全的
21  const size_t _max_counter; //数据队列的容量上限
22  std::deque<_Ty> _values; //数据队列
23  std::list<channel_read_awaker_ptr> _read_awakes; //读队列
24  std::list<write_tuple_type> _write_awakes; //写队列
25  public:
26  channel_impl(size_t max_counter_)
27  :_max_counter(max_counter_)
28  {
29  }
30 
31 #if _DEBUG
32  const std::deque<_Ty>& debug_queue() const
33  {
34  return _values;
35  }
36 #endif
37 
38  template<class callee_t, class = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, channel_read_awaker_ptr>::value>>
39  decltype(auto) read(callee_t&& awaker)
40  {
41  return read_(std::make_shared<channel_read_awaker>(std::forward<callee_t>(awaker)));
42  }
43  template<class callee_t, class _Ty2, class = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, channel_write_awaker_ptr>::value>>
44  decltype(auto) write(callee_t&& awaker, _Ty2&& val)
45  {
46  return write_(std::make_shared<channel_write_awaker>(std::forward<callee_t>(awaker)), std::forward<_Ty2>(val));
47  }
48 
49  //如果已经触发了awaker,则返回true
50  //设计目标是线程安全的,实际情况待考察
51  bool read_(channel_read_awaker_ptr&& r_awaker)
52  {
53  assert(r_awaker);
54 
55  scoped_lock<lock_type> lock_(this->_lock);
56 
57  bool ret_value;
58  if (_values.size() > 0)
59  {
60  //如果数据队列有数据,则可以直接读数据
61  auto val = std::move(_values.front());
62  _values.pop_front();
63 
64  r_awaker->awake(this, 1, &val, error_code::none);
65  ret_value = true;
66  }
67  else
68  {
69  //否则,将“读等待”放入“读队列”
70  _read_awakes.push_back(r_awaker);
71  ret_value = false;
72  }
73 
74  //如果已有写队列,则唤醒一个“写等待”
75  awake_one_writer_();
76 
77  return ret_value;
78  }
79 
80  //设计目标是线程安全的,实际情况待考察
81  template<class _Ty2>
82  void write_(channel_write_awaker_ptr&& w_awaker, _Ty2&& val)
83  {
84  assert(w_awaker);
85  scoped_lock<lock_type> lock_(this->_lock);
86 
87  //如果满了,则不添加到数据队列,而是将“写等待”和值,放入“写队列”
88  bool is_full = _values.size() >= _max_counter;
89  if (is_full)
90  _write_awakes.push_back(std::make_pair(std::forward<channel_write_awaker_ptr>(w_awaker), std::forward<_Ty2>(val)));
91  else
92  _values.push_back(std::forward<_Ty2>(val));
93 
94  //如果已有读队列,则唤醒一个“读等待”
95  awake_one_reader_();
96 
97  //触发 没有放入“写队列”的“写等待”
98  if (!is_full) w_awaker->awake(this, 1);
99  }
100 
101  private:
102  //只能被write_函数调用,内部不再需要加锁
103  void awake_one_reader_()
104  {
105  //assert(!(_read_awakes.size() >= 0 && _values.size() == 0));
106 
107  for (auto iter = _read_awakes.begin(); iter != _read_awakes.end(); )
108  {
109  auto r_awaker = *iter;
110  iter = _read_awakes.erase(iter);
111 
112  if (r_awaker->awake(this, 1, _values.size() ? &_values.front() : nullptr, error_code::read_before_write))
113  {
114  if (_values.size()) _values.pop_front();
115 
116  //唤醒一个“读等待”后,尝试唤醒一个“写等待”,以处理“数据队列”满后的“写等待”
117  awake_one_writer_();
118  break;
119  }
120  }
121  }
122 
123  //只能被read_函数调用,内部不再需要加锁
124  void awake_one_writer_()
125  {
126  for (auto iter = _write_awakes.begin(); iter != _write_awakes.end(); )
127  {
128  auto w_awaker = std::move(*iter);
129  iter = _write_awakes.erase(iter);
130 
131  if (w_awaker.first->awake(this, 1))
132  {
133  //一个“写等待”唤醒后,将“写等待”绑定的值,放入“数据队列”
134  _values.push_back(std::move(w_awaker.second));
135  break;
136  }
137  }
138  }
139 
140  size_t capacity() const noexcept
141  {
142  return _max_counter;
143  }
144 
145  channel_impl(const channel_impl&) = delete;
146  channel_impl(channel_impl&&) = delete;
147  channel_impl& operator = (const channel_impl&) = delete;
148  channel_impl& operator = (channel_impl&&) = delete;
149  };
150  } //namespace detail
151 
152 namespace channel_v1
153 {
154 
155  template<class _Ty>
156  struct channel_t
157  {
158  typedef detail::channel_impl<_Ty> channel_impl_type;
159  typedef typename channel_impl_type::channel_read_awaker channel_read_awaker;
160  typedef typename channel_impl_type::channel_write_awaker channel_write_awaker;
161 
162  typedef std::shared_ptr<channel_impl_type> channel_impl_ptr;
163  typedef std::weak_ptr<channel_impl_type> channel_impl_wptr;
164  typedef std::chrono::system_clock clock_type;
165  private:
166  channel_impl_ptr _chan;
167  public:
168  channel_t(size_t max_counter = 0)
169  :_chan(std::make_shared<channel_impl_type>(max_counter))
170  {
171 
172  }
173 
174  template<class _Ty2>
175  future_t<bool> write(_Ty2&& val) const
176  {
177  awaitable_t<bool> awaitable;
178 
179  auto awaker = std::make_shared<channel_write_awaker>(
180  [st = awaitable._state](channel_impl_type* chan) -> bool
181  {
182  st->set_value(chan ? true : false);
183  return true;
184  });
185  _chan->write_(std::move(awaker), std::forward<_Ty2>(val));
186 
187  return awaitable.get_future();
188  }
189 
190  future_t<_Ty> read() const
191  {
192  awaitable_t<_Ty> awaitable;
193 
194  auto awaker = std::make_shared<channel_read_awaker>(
195  [st = awaitable._state](channel_impl_type*, _Ty* val, error_code fe) -> bool
196  {
197  if (val)
198  st->set_value(std::move(*val));
199  else
200  st->throw_exception(channel_exception{ fe });
201 
202  return true;
203  });
204  _chan->read_(std::move(awaker));
205 
206  return awaitable.get_future();
207  }
208 
209  template<class _Ty2>
210  future_t<bool> operator << (_Ty2&& val) const
211  {
212  return std::move(write(std::forward<_Ty2>(val)));
213  }
214 
215  future_t<_Ty> operator co_await () const
216  {
217  return read();
218  }
219 
220 #if _DEBUG
221  //非线程安全,返回的队列也不是线程安全的
222  const auto& debug_queue() const
223  {
224  return _chan->debug_queue();
225  }
226 #endif
227 
228  size_t capacity() const noexcept
229  {
230  return _chan->capacity();
231  }
232 
233  channel_t(const channel_t&) = default;
234  channel_t(channel_t&&) = default;
235  channel_t& operator = (const channel_t&) = default;
236  channel_t& operator = (channel_t&&) = default;
237  };
238 
239 
240  using semaphore_t = channel_t<bool>;
241 
242 } //namespace v1
243 } //namespace resumef