基于C++ Coroutines提案 ‘Stackless Resumable Functions’编写的协程库
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

mutex_v2.cpp 4.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. #include "librf/librf.h"
  2. namespace librf
  3. {
  4. namespace detail
  5. {
  6. LIBRF_API void state_mutex_t::resume()
  7. {
  8. coroutine_handle<> handler = _coro;
  9. if (handler)
  10. {
  11. _coro = nullptr;
  12. _scheduler->del_final(this);
  13. handler.resume();
  14. }
  15. }
  16. LIBRF_API bool state_mutex_t::has_handler() const noexcept
  17. {
  18. return (bool)_coro;
  19. }
  20. LIBRF_API state_base_t* state_mutex_t::get_parent() const noexcept
  21. {
  22. return _root;
  23. }
  24. LIBRF_API void state_mutex_t::on_cancel() noexcept
  25. {
  26. mutex_v2_impl** oldValue = _value.load(std::memory_order_acquire);
  27. if (oldValue != nullptr && _value.compare_exchange_strong(oldValue, nullptr, std::memory_order_acq_rel))
  28. {
  29. *oldValue = nullptr;
  30. _thandler.stop();
  31. this->_coro = nullptr;
  32. }
  33. }
  34. LIBRF_API bool state_mutex_t::on_notify(mutex_v2_impl* eptr)
  35. {
  36. assert(eptr != nullptr);
  37. mutex_v2_impl** oldValue = _value.load(std::memory_order_acquire);
  38. if (oldValue != nullptr && _value.compare_exchange_strong(oldValue, nullptr, std::memory_order_acq_rel))
  39. {
  40. *oldValue = eptr;
  41. _thandler.stop();
  42. assert(this->_scheduler != nullptr);
  43. if (this->_coro)
  44. this->_scheduler->add_generator(this);
  45. return true;
  46. }
  47. return false;
  48. }
  49. LIBRF_API bool state_mutex_t::on_timeout()
  50. {
  51. mutex_v2_impl** oldValue = _value.load(std::memory_order_acquire);
  52. if (oldValue != nullptr && _value.compare_exchange_strong(oldValue, nullptr, std::memory_order_acq_rel))
  53. {
  54. *oldValue = nullptr;
  55. _thandler.reset();
  56. assert(this->_scheduler != nullptr);
  57. if (this->_coro)
  58. this->_scheduler->add_generator(this);
  59. return true;
  60. }
  61. return false;
  62. }
  63. LIBRF_API void state_mutex_t::add_timeout_timer(std::chrono::system_clock::time_point tp)
  64. {
  65. this->_thandler = this->_scheduler->timer()->add_handler(tp,
  66. [st = counted_ptr<state_mutex_t>{ this }](bool canceld)
  67. {
  68. if (!canceld)
  69. st->on_timeout();
  70. });
  71. }
  72. LIBRF_API void mutex_v2_impl::lock_until_succeed(void* sch)
  73. {
  74. assert(sch != nullptr);
  75. for (;;)
  76. {
  77. if (try_lock(sch))
  78. break;
  79. std::this_thread::yield();
  80. }
  81. }
  82. LIBRF_API bool mutex_v2_impl::try_lock(void* sch)
  83. {
  84. assert(sch != nullptr);
  85. scoped_lock<detail::mutex_v2_impl::lock_type> lock_(_lock);
  86. return try_lock_lockless(sch);
  87. }
  88. LIBRF_API bool mutex_v2_impl::try_lock_until(clock_type::time_point tp, void* sch)
  89. {
  90. assert(sch != nullptr);
  91. do
  92. {
  93. if (try_lock(sch))
  94. return true;
  95. std::this_thread::yield();
  96. } while (clock_type::now() <= tp);
  97. return false;
  98. }
  99. LIBRF_API bool mutex_v2_impl::try_lock_lockless(void* sch) noexcept
  100. {
  101. assert(sch != nullptr);
  102. void* oldValue = _owner.load(std::memory_order_relaxed);
  103. if (oldValue == nullptr)
  104. {
  105. _owner.store(sch, std::memory_order_relaxed);
  106. _counter.fetch_add(1, std::memory_order_relaxed);
  107. return true;
  108. }
  109. if (oldValue == sch)
  110. {
  111. _counter.fetch_add(1, std::memory_order_relaxed);
  112. return true;
  113. }
  114. return false;
  115. }
  116. LIBRF_API bool mutex_v2_impl::unlock(void* sch)
  117. {
  118. assert(sch != nullptr);
  119. scoped_lock<lock_type> lock_(_lock);
  120. void* oldValue = _owner.load(std::memory_order_relaxed);
  121. if (oldValue == sch)
  122. {
  123. if (_counter.fetch_sub(1, std::memory_order_relaxed) == 1)
  124. {
  125. _owner.store(nullptr, std::memory_order_relaxed);
  126. while (!_wait_awakes.empty())
  127. {
  128. state_mutex_ptr state = _wait_awakes.front();
  129. _wait_awakes.pop_front();
  130. //先将锁定状态转移到新的state上
  131. _owner.store(state->get_root(), std::memory_order_release);
  132. _counter.fetch_add(1, std::memory_order_acq_rel);
  133. if (state->on_notify(this))
  134. break;
  135. //转移状态失败,恢复成空
  136. _owner.store(nullptr, std::memory_order_relaxed);
  137. _counter.fetch_sub(1, std::memory_order_relaxed);
  138. }
  139. }
  140. return true;
  141. }
  142. return false;
  143. }
  144. LIBRF_API void mutex_v2_impl::add_wait_list_lockless(state_mutex_t* state)
  145. {
  146. assert(state != nullptr);
  147. _wait_awakes.push_back(state);
  148. }
  149. }
  150. LIBRF_API mutex_t::mutex_t()
  151. : _mutex(std::make_shared<detail::mutex_v2_impl>())
  152. {
  153. }
  154. LIBRF_API mutex_t::mutex_t(std::adopt_lock_t) noexcept
  155. {
  156. }
  157. LIBRF_API mutex_t::~mutex_t() noexcept
  158. {
  159. }
  160. }