基于C++ Coroutines提案 ‘Stackless Resumable Functions’编写的协程库
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

scheduler.cpp 4.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. #include "librf/librf.h"
  2. #if RESUMEF_DEBUG_COUNTER
  3. std::mutex g_resumef_cout_mutex;
  4. std::atomic<intptr_t> g_resumef_state_count = 0;
  5. std::atomic<intptr_t> g_resumef_task_count = 0;
  6. std::atomic<intptr_t> g_resumef_evtctx_count = 0;
  7. std::atomic<intptr_t> g_resumef_state_id = 0;
  8. #endif
  9. namespace librf
  10. {
  11. const char * future_error_string[(size_t)error_code::max__]
  12. {
  13. "none",
  14. "not_ready",
  15. "already_acquired",
  16. "unlock_more",
  17. "read_before_write",
  18. "timer_canceled",
  19. "not_await_lock",
  20. "stop_requested",
  21. };
  22. thread_local char sz_future_error_buffer[256];
  23. LIBRF_API const char * get_error_string(error_code fe, const char * classname)
  24. {
  25. if (classname)
  26. {
  27. #if defined(__clang__) || defined(__GNUC__)
  28. #define sprintf_s sprintf
  29. #endif
  30. sprintf_s(sz_future_error_buffer, "%s, code=%s", classname, future_error_string[(size_t)(fe)]);
  31. return sz_future_error_buffer;
  32. }
  33. return future_error_string[(size_t)(fe)];
  34. }
  35. thread_local scheduler_t * th_scheduler_ptr = nullptr;
  36. //获得当前线程下的调度器
  37. LIBRF_API scheduler_t * this_scheduler()
  38. {
  39. return th_scheduler_ptr ? th_scheduler_ptr : &scheduler_t::g_scheduler;
  40. }
  41. LIBRF_API local_scheduler_t::local_scheduler_t()
  42. {
  43. if (th_scheduler_ptr == nullptr)
  44. {
  45. _scheduler_ptr = new scheduler_t;
  46. th_scheduler_ptr = _scheduler_ptr;
  47. }
  48. else
  49. {
  50. _scheduler_ptr = nullptr;
  51. }
  52. }
  53. LIBRF_API local_scheduler_t::local_scheduler_t(scheduler_t& sch) noexcept
  54. {
  55. if (th_scheduler_ptr == nullptr)
  56. {
  57. th_scheduler_ptr = &sch;
  58. }
  59. _scheduler_ptr = nullptr;
  60. }
  61. LIBRF_API local_scheduler_t::~local_scheduler_t()
  62. {
  63. if (th_scheduler_ptr == _scheduler_ptr)
  64. th_scheduler_ptr = nullptr;
  65. delete _scheduler_ptr;
  66. }
  67. LIBRF_API scheduler_t::scheduler_t()
  68. : _timer(std::make_shared<timer_manager>())
  69. {
  70. _runing_states.reserve(1024);
  71. _cached_states.reserve(1024);
  72. if (th_scheduler_ptr == nullptr)
  73. th_scheduler_ptr = this;
  74. }
  75. LIBRF_API scheduler_t::~scheduler_t()
  76. {
  77. //cancel_all_task_();
  78. if (th_scheduler_ptr == this)
  79. th_scheduler_ptr = nullptr;
  80. }
  81. LIBRF_API task_t* scheduler_t::new_task(task_t * task)
  82. {
  83. state_base_t* sptr = task->_state.get();
  84. sptr->set_scheduler(this);
  85. {
  86. #if !RESUMEF_DISABLE_MULT_THREAD
  87. scoped_lock<spinlock> __guard(_lock_ready);
  88. #endif
  89. _ready_task.emplace(sptr, task);
  90. }
  91. //如果是单独的future,没有被co_await过,则handler是nullptr。
  92. if (sptr->has_handler())
  93. {
  94. add_generator(sptr);
  95. }
  96. return task;
  97. }
  98. LIBRF_API std::unique_ptr<task_t> scheduler_t::del_switch(state_base_t* sptr)
  99. {
  100. #if !RESUMEF_DISABLE_MULT_THREAD
  101. scoped_lock<spinlock> __guard(_lock_ready);
  102. #endif
  103. std::unique_ptr<task_t> task_ptr;
  104. auto iter = this->_ready_task.find(sptr);
  105. if (iter != this->_ready_task.end())
  106. {
  107. task_ptr = std::exchange(iter->second, nullptr);
  108. this->_ready_task.erase(iter);
  109. }
  110. return task_ptr;
  111. }
  112. LIBRF_API void scheduler_t::request_stop_all_if_possible()
  113. {
  114. scoped_lock<spinlock> __guard(_lock_ready);
  115. for (auto& kv : this->_ready_task)
  116. kv.second->request_stop_if_possible();
  117. //this->_ready_task.clear();
  118. this->_timer->clear();
  119. }
  120. /*
  121. void scheduler_t::cancel_all_task_()
  122. {
  123. scoped_lock<spinlock, spinlock> __guard(_lock_ready, _lock_running);
  124. this->_ready_task.clear();
  125. this->_runing_states.clear();
  126. }
  127. void scheduler_t::break_all()
  128. {
  129. cancel_all_task_();
  130. this->_timer->clear();
  131. }
  132. */
  133. LIBRF_API bool scheduler_t::run_one_batch()
  134. {
  135. this->_timer->update();
  136. {
  137. #if !RESUMEF_DISABLE_MULT_THREAD
  138. scoped_lock<spinlock> __guard(_lock_running);
  139. #endif
  140. if (likely(_runing_states.empty()))
  141. return false;
  142. std::swap(_cached_states, _runing_states);
  143. }
  144. for (state_sptr& sptr : _cached_states)
  145. sptr->resume();
  146. _cached_states.clear();
  147. return true;
  148. }
  149. LIBRF_API void scheduler_t::run_until_notask()
  150. {
  151. for(;;)
  152. {
  153. //介于网上有人做评测,导致单协程切换数据很难看,那就注释掉吧。
  154. //std::this_thread::yield();
  155. if (likely(this->run_one_batch())) continue; //当前运行了一个state,则认为还可能有任务未完成
  156. {
  157. #if !RESUMEF_DISABLE_MULT_THREAD
  158. scoped_lock<spinlock> __guard(_lock_ready);
  159. #endif
  160. if (likely(!_ready_task.empty())) continue; //当前还存在task,则必然还有任务未完成
  161. }
  162. if (unlikely(!_timer->empty())) continue; //定时器不为空,也需要等待定时器触发
  163. break;
  164. };
  165. }
  166. LIBRF_API scheduler_t scheduler_t::g_scheduler;
  167. }