基于C++ Coroutines提案 ‘Stackless Resumable Functions’编写的协程库
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_async_stop_token.cpp 2.3KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. #include <chrono>
  2. #include <iostream>
  3. #include <string>
  4. #include <thread>
  5. #include "librf.h"
  6. using namespace resumef;
  7. using namespace std::chrono;
  8. //token触发停止后,将不再调用cb
  9. template<class _Ctype>
  10. static void callback_get_long_with_stop(stop_token token, int64_t val, _Ctype&& cb)
  11. {
  12. std::thread([val, token = std::move(token), cb = std::forward<_Ctype>(cb)]
  13. {
  14. for (int i = 0; i < 10; ++i)
  15. {
  16. if (token.stop_requested())
  17. return;
  18. std::this_thread::sleep_for(10ms);
  19. }
  20. cb(val * val);
  21. }).detach();
  22. }
  23. //token触发后,设置canceled_exception异常。
  24. static future_t<int64_t> async_get_long_with_stop(stop_token token, int64_t val)
  25. {
  26. awaitable_t<int64_t> awaitable;
  27. //保证stopptr的生存期,与callback_get_long_with_cancel()的回调参数的生存期一致。
  28. //如果token已经被取消,则传入的lambda会立即被调用,则awaitable将不能再set_value
  29. auto stopptr = make_stop_callback(token, [awaitable]
  30. {
  31. if (awaitable)
  32. awaitable.throw_exception(canceled_exception(error_code::stop_requested));
  33. });
  34. if (awaitable) //处理已经被取消的情况
  35. {
  36. callback_get_long_with_stop(token, val, [awaitable, stopptr = std::move(stopptr)](int64_t val)
  37. {
  38. if (awaitable)
  39. awaitable.set_value(val);
  40. });
  41. }
  42. return awaitable.get_future();
  43. }
  44. //如果关联的协程被取消了,则触发canceled_exception异常。
  45. static future_t<int64_t> async_get_long_with_stop(int64_t val)
  46. {
  47. task_t* task = current_task();
  48. co_return co_await async_get_long_with_stop(task->get_stop_token(), val);
  49. }
  50. //测试取消协程
  51. static void test_get_long_with_stop(int64_t val)
  52. {
  53. //异步获取值的协程
  54. task_t* task = GO
  55. {
  56. try
  57. {
  58. int64_t result = co_await async_get_long_with_stop(val);
  59. std::cout << result << std::endl;
  60. }
  61. catch (const std::logic_error& e)
  62. {
  63. std::cout << e.what() << std::endl;
  64. }
  65. };
  66. //task的生命周期只在task代表的协程生存期间存在。
  67. //但通过复制与其关联的stop_source,生存期可以超过task的生存期。
  68. stop_source stops = task->get_stop_source();
  69. //取消上一个协程的延迟协程
  70. GO
  71. {
  72. co_await sleep_for(1ms * (rand() % 300));
  73. stops.request_stop();
  74. };
  75. this_scheduler()->run_until_notask();
  76. }
  77. void resumable_main_stop_token()
  78. {
  79. srand((int)time(nullptr));
  80. for (int i = 0; i < 10; ++i)
  81. test_get_long_with_stop(i);
  82. }