基于C++ Coroutines提案 ‘Stackless Resumable Functions’编写的协程库
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_async_stop_token.cpp 2.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. #include <chrono>
  2. #include <iostream>
  3. #include <string>
  4. #include <thread>
  5. #include "librf/librf.h"
  6. using namespace librf;
  7. using namespace std::chrono;
  8. //_Ctype签名:void(bool, int64_t)
  9. template<class _Ctype, typename=std::enable_if_t<std::is_invocable_v<_Ctype, bool, int64_t>>>
  10. static void callback_get_long_with_stop(stop_token token, int64_t val, _Ctype&& cb)
  11. {
  12. std::thread([val, token = std::move(token), cb = std::forward<_Ctype>(cb)]
  13. {
  14. for (int i = 0; i < 10; ++i)
  15. {
  16. if (token.stop_requested())
  17. {
  18. cb(false, 0);
  19. return;
  20. }
  21. std::this_thread::sleep_for(10ms);
  22. }
  23. //有可能未检测到token的停止要求
  24. //如果使用stop_callback来停止,则务必保证检测到的退出要求是唯一的,且线程安全的
  25. //否则,多次调用cb,会导致协程在半退出状态下,外部的awaitable_t管理的state获取跟root出现错误。
  26. cb(true, val * val);
  27. }).detach();
  28. }
  29. //token触发后,设置canceled_exception异常。
  30. static future_t<int64_t> async_get_long_with_stop(stop_token token, int64_t val)
  31. {
  32. awaitable_t<int64_t> awaitable;
  33. //在这里通过stop_callback来处理退出,并将退出转化为error_code::stop_requested异常。
  34. //则必然会存在线程竞争问题,导致协程提前于callback_get_long_with_stop的回调之前而退出。
  35. //同时,callback_get_long_with_stop还未必一定能检测到退出要求----毕竟只是一个要求,而不是强制。
  36. callback_get_long_with_stop(token, val, [awaitable](bool ok, int64_t val)
  37. {
  38. if (ok)
  39. awaitable.set_value(val);
  40. else
  41. awaitable.throw_exception(canceled_exception{error_code::stop_requested});
  42. });
  43. return awaitable.get_future();
  44. }
  45. //如果关联的协程被取消了,则触发canceled_exception异常。
  46. static future_t<int64_t> async_get_long_with_stop(int64_t val)
  47. {
  48. task_t* task = librf_current_task();
  49. co_return co_await async_get_long_with_stop(task->get_stop_token(), val);
  50. }
  51. //测试取消协程
  52. static void test_get_long_with_stop(int64_t val)
  53. {
  54. //异步获取值的协程
  55. task_t* task = GO
  56. {
  57. try
  58. {
  59. int64_t result = co_await async_get_long_with_stop(val);
  60. std::cout << result << std::endl;
  61. }
  62. catch (const std::logic_error& e)
  63. {
  64. std::cout << e.what() << std::endl;
  65. }
  66. };
  67. //task的生命周期只在task代表的协程生存期间存在。
  68. //但通过复制与其关联的stop_source,生存期可以超过task的生存期。
  69. stop_source stops = task->get_stop_source();
  70. //取消上一个协程的延迟协程
  71. GO
  72. {
  73. co_await sleep_for(1ms * (rand() % 300));
  74. stops.request_stop();
  75. };
  76. this_scheduler()->run_until_notask();
  77. }
  78. void resumable_main_stop_token()
  79. {
  80. srand((int)time(nullptr));
  81. for (int i = 0; i < 10; ++i)
  82. test_get_long_with_stop(i);
  83. std::cout << "OK - stop_token!" << std::endl;
  84. }
  85. #if LIBRF_TUTORIAL_STAND_ALONE
  86. int main()
  87. {
  88. resumable_main_stop_token();
  89. return 0;
  90. }
  91. #endif