基于C++ Coroutines提案 ‘Stackless Resumable Functions’编写的协程库
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_async_mutex.cpp 4.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. #include <chrono>
  2. #include <iostream>
  3. #include <string>
  4. #include <conio.h>
  5. #include <thread>
  6. #include <deque>
  7. #include "librf.h"
  8. using namespace resumef;
  9. using namespace std::chrono;
  10. static mutex_t g_lock;
  11. static intptr_t g_counter = 0;
  12. static const size_t N = 10;
  13. //🔒-50ms-🔒🗝🗝-150ms-|
  14. //-------------.........
  15. static future_t<> test_mutex_pop(size_t idx)
  16. {
  17. for (size_t i = 0; i < N / 2; ++i)
  18. {
  19. {
  20. auto _locker = co_await g_lock.lock(); //_locker析构后,会调用对应的unlock()函数。
  21. --g_counter;
  22. std::cout << "pop :" << g_counter << " on " << idx << std::endl;
  23. co_await 50ms;
  24. auto _locker_2 = co_await g_lock;
  25. --g_counter;
  26. std::cout << "pop :" << g_counter << " on " << idx << std::endl;
  27. }
  28. co_await 150ms;
  29. }
  30. }
  31. //🔒-50ms-🗝-50ms-🔒-50ms-🗝-50ms-|
  32. //---------........---------.......
  33. //方法之一
  34. static future_t<> test_mutex_push(size_t idx)
  35. {
  36. for (size_t i = 0; i < N; ++i)
  37. {
  38. {
  39. auto _locker = co_await g_lock.lock();
  40. ++g_counter;
  41. std::cout << "push:" << g_counter << " on " << idx << std::endl;
  42. co_await 50ms;
  43. }
  44. co_await 50ms;
  45. }
  46. }
  47. static future_t<> test_mutex_try_push(size_t idx)
  48. {
  49. for (size_t i = 0; i < N; ++i)
  50. {
  51. {
  52. while (!co_await g_lock.try_lock())
  53. co_await yield();
  54. ++g_counter;
  55. std::cout << "push:" << g_counter << " on " << idx << std::endl;
  56. co_await 50ms;
  57. co_await g_lock.unlock();
  58. }
  59. co_await 50ms;
  60. }
  61. }
  62. static future_t<> test_mutex_timeout_push(size_t idx)
  63. {
  64. for (size_t i = 0; i < N; ++i)
  65. {
  66. {
  67. while (!co_await g_lock.try_lock_for(10ms))
  68. co_await yield();
  69. ++g_counter;
  70. std::cout << "push:" << g_counter << " on " << idx << std::endl;
  71. co_await 50ms;
  72. co_await g_lock.unlock();
  73. }
  74. co_await 50ms;
  75. }
  76. }
  77. //🔒-50ms-🗝-50ms-🔒-50ms-🗝-50ms-|
  78. //---------........---------.......
  79. static std::thread test_mutex_async_push(size_t idx)
  80. {
  81. return std::thread([=]
  82. {
  83. char provide_unique_address = 0;
  84. for (size_t i = 0; i < N; ++i)
  85. {
  86. if (g_lock.try_lock_for(500ms, &provide_unique_address))
  87. {
  88. //scoped_lock_mutex_t _locker(std::adopt_lock, g_lock, &provide_unique_address);
  89. ++g_counter;
  90. std::cout << "push:" << g_counter << " on " << idx << std::endl;
  91. std::this_thread::sleep_for(50ms);
  92. g_lock.unlock(&provide_unique_address);
  93. }
  94. std::this_thread::sleep_for(50ms);
  95. }
  96. });
  97. }
  98. static void resumable_mutex_synch()
  99. {
  100. go test_mutex_timeout_push(0);
  101. go test_mutex_pop(1);
  102. this_scheduler()->run_until_notask();
  103. std::cout << "result:" << g_counter << std::endl;
  104. }
  105. static void resumable_mutex_async()
  106. {
  107. auto th = test_mutex_async_push(0);
  108. std::this_thread::sleep_for(25ms);
  109. go test_mutex_pop(1);
  110. this_scheduler()->run_until_notask();
  111. th.join();
  112. std::cout << "result:" << g_counter << std::endl;
  113. }
  114. static future_t<> resumable_mutex_range_push(size_t idx, mutex_t a, mutex_t b, mutex_t c)
  115. {
  116. for (int i = 0; i < 10; ++i)
  117. {
  118. auto __lockers = mutex_t::lock(a, b, c);
  119. ++g_counter;
  120. std::cout << "push:" << g_counter << " on " << idx << std::endl;
  121. co_await 5ms;
  122. }
  123. }
  124. static future_t<> resumable_mutex_range_pop(size_t idx, mutex_t a, mutex_t b, mutex_t c)
  125. {
  126. for (int i = 0; i < 10; ++i)
  127. {
  128. auto __lockers = mutex_t::lock(a, b, c);
  129. --g_counter;
  130. std::cout << "pop :" << g_counter << " on " << idx << std::endl;
  131. co_await 5ms;
  132. }
  133. }
  134. static void resumable_mutex_lock_range()
  135. {
  136. mutex_t mtxA, mtxB, mtxC;
  137. //不同的线程里加锁也需要是线程安全的
  138. std::thread push_th([&]
  139. {
  140. local_scheduler __ls__;
  141. go resumable_mutex_range_push(10, mtxA, mtxB, mtxC);
  142. go resumable_mutex_range_push(11, mtxA, mtxC, mtxB);
  143. this_scheduler()->run_until_notask();
  144. });
  145. go resumable_mutex_range_pop(12, mtxC, mtxB, mtxA);
  146. go resumable_mutex_range_pop(13, mtxB, mtxA, mtxC);
  147. this_scheduler()->run_until_notask();
  148. push_th.join();
  149. std::cout << "result:" << g_counter << std::endl;
  150. }
  151. void resumable_main_mutex()
  152. {
  153. resumable_mutex_synch();
  154. std::cout << std::endl;
  155. resumable_mutex_async();
  156. std::cout << std::endl;
  157. resumable_mutex_lock_range();
  158. }