基于C++ Coroutines提案 ‘Stackless Resumable Functions’编写的协程库
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_async_switch_scheduler.cpp 4.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. #include <chrono>
  2. #include <iostream>
  3. #include <string>
  4. #include <thread>
  5. #include "librf/librf.h"
  6. using namespace librf;
  7. static scheduler_t* sch_in_main = nullptr;
  8. static std::atomic<scheduler_t*> sch_in_thread = nullptr;
  9. void run_in_thread(channel_t<bool> c_done)
  10. {
  11. local_scheduler_t my_scheduler; //产生本线程唯一的调度器
  12. sch_in_thread = this_scheduler(); //本线程唯一的调度器赋值给sch_in_thread,以便于后续测试直接访问此线程的调度器
  13. (void)c_done.write(true); //数据都准备好了,通过channel通知其他协程可以启动后续依赖sch_in_thread变量的协程了
  14. //循环直到sch_in_thread为nullptr
  15. for (;;)
  16. {
  17. auto sch = sch_in_thread.load(std::memory_order_acquire);
  18. if (sch == nullptr)
  19. break;
  20. sch->run_one_batch();
  21. std::this_thread::yield();
  22. }
  23. }
  24. template<class _Ctype>
  25. static void callback_get_long_switch_scheduler(int64_t val, _Ctype&& cb)
  26. {
  27. using namespace std::chrono;
  28. std::thread([val, cb = std::forward<_Ctype>(cb)]
  29. {
  30. std::this_thread::sleep_for(500ms);
  31. cb(val + 1);
  32. }).detach();
  33. }
  34. //这种情况下,没有生成 frame-context,因此,并没有promise_type被内嵌在frame-context里
  35. static future_t<int64_t> async_get_long_switch_scheduler(int64_t val)
  36. {
  37. awaitable_t<int64_t> awaitable;
  38. callback_get_long_switch_scheduler(val, [awaitable](int64_t result)
  39. {
  40. awaitable.set_value(result);
  41. });
  42. return awaitable.get_future();
  43. }
  44. //这种情况下,会生成对应的 frame-context,一个promise_type被内嵌在frame-context里
  45. static future_t<> resumable_get_long_switch_scheduler(int64_t val, channel_t<bool> c_done)
  46. {
  47. std::cout << "thread = " << std::this_thread::get_id();
  48. std::cout << ", scheduler = " << librf_current_scheduler();
  49. std::cout << ", value = " << val << std::endl;
  50. co_await via(sch_in_thread);
  51. val = co_await async_get_long_switch_scheduler(val);
  52. std::cout << "thread = " << std::this_thread::get_id();
  53. std::cout << ", scheduler = " << librf_current_scheduler();
  54. std::cout << ", value = " << val << std::endl;
  55. co_await via(sch_in_main);
  56. val = co_await async_get_long_switch_scheduler(val);
  57. std::cout << "thread = " << std::this_thread::get_id();
  58. std::cout << ", scheduler = " << librf_current_scheduler();
  59. std::cout << ", value = " << val << std::endl;
  60. co_await via(sch_in_thread);
  61. val = co_await async_get_long_switch_scheduler(val);
  62. std::cout << "thread = " << std::this_thread::get_id();
  63. std::cout << ", scheduler = " << librf_current_scheduler();
  64. std::cout << ", value = " << val << std::endl;
  65. co_await via(sch_in_thread); //fake switch
  66. val = co_await async_get_long_switch_scheduler(val);
  67. std::cout << "thread = " << std::this_thread::get_id();
  68. std::cout << ", scheduler = " << librf_current_scheduler();
  69. std::cout << ", value = " << val << std::endl;
  70. (void)c_done.write(true);
  71. }
  72. #if defined(__GNUC__)
  73. static future_t<> resumable_main_switch_scheduler_fix_gcc_bugs(std::thread & other, channel_t<bool> c_done)
  74. {
  75. co_await c_done; //第一次等待,等待run_in_thread准备好了
  76. std::cout << "other thread = " << other.get_id();
  77. std::cout << ", sch_in_thread = " << sch_in_thread << std::endl;
  78. go resumable_get_long_switch_scheduler(1, c_done); //开启另外一个协程
  79. //co_await resumable_get_long(3, c_done);
  80. co_await c_done; //等待新的协程运行完毕,从而保证主线程的协程不会提早退出
  81. }
  82. #endif
  83. void resumable_main_switch_scheduler()
  84. {
  85. sch_in_main = this_scheduler();
  86. std::cout << "main thread = " << std::this_thread::get_id();
  87. std::cout << ", scheduler = " << sch_in_main << std::endl;
  88. channel_t<bool> c_done{ 1 };
  89. std::thread other(&run_in_thread, std::ref(c_done));
  90. #if defined(__GNUC__)
  91. go resumable_main_switch_scheduler_fix_gcc_bugs(other, c_done);
  92. #else
  93. go[&other, c_done]()->future_t<>
  94. {
  95. co_await c_done; //第一次等待,等待run_in_thread准备好了
  96. std::cout << "other thread = " << other.get_id();
  97. std::cout << ", sch_in_thread = " << sch_in_thread << std::endl;
  98. go resumable_get_long_switch_scheduler(1, c_done); //开启另外一个协程
  99. //co_await resumable_get_long(3, c_done);
  100. co_await c_done; //等待新的协程运行完毕,从而保证主线程的协程不会提早退出
  101. }; //GCC: internal compiler error: in captures_temporary, at cp/coroutines.cc:2716
  102. #endif
  103. sch_in_main->run_until_notask();
  104. //通知另外一个线程退出
  105. sch_in_thread.store(nullptr, std::memory_order_release);
  106. other.join();
  107. }
  108. #if LIBRF_TUTORIAL_STAND_ALONE
  109. int main()
  110. {
  111. resumable_main_switch_scheduler();
  112. return 0;
  113. }
  114. #endif