基于C++ Coroutines提案 ‘Stackless Resumable Functions’编写的协程库
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_async_mutex.cpp 4.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. #include <chrono>
  2. #include <iostream>
  3. #include <string>
  4. #include <thread>
  5. #include <deque>
  6. #include "librf/librf.h"
  7. using namespace librf;
  8. using namespace std::chrono;
  9. static mutex_t g_lock;
  10. static intptr_t g_counter = 0;
  11. static const size_t N = 10;
  12. //🔒-50ms-🔒🗝🗝-150ms-|
  13. //-------------.........
  14. static future_t<> test_mutex_pop(size_t idx)
  15. {
  16. for (size_t i = 0; i < N / 2; ++i)
  17. {
  18. {
  19. batch_unlock_t _locker = co_await g_lock.lock(); //_locker析构后,会调用对应的unlock()函数。
  20. --g_counter;
  21. std::cout << "pop :" << g_counter << " on " << idx << std::endl;
  22. co_await 50ms;
  23. batch_unlock_t _locker_2 = co_await g_lock;
  24. --g_counter;
  25. std::cout << "pop :" << g_counter << " on " << idx << std::endl;
  26. }
  27. co_await 150ms;
  28. }
  29. }
  30. #ifndef __clang__
  31. //🔒-50ms-🗝-50ms-🔒-50ms-🗝-50ms-|
  32. //---------........---------.......
  33. //方法之一
  34. static future_t<> test_mutex_push(size_t idx)
  35. {
  36. for (size_t i = 0; i < N; ++i)
  37. {
  38. {
  39. batch_unlock_t _locker = co_await g_lock.lock();
  40. ++g_counter;
  41. std::cout << "push:" << g_counter << " on " << idx << std::endl;
  42. co_await 50ms;
  43. }
  44. co_await 50ms;
  45. }
  46. }
  47. static future_t<> test_mutex_try_push(size_t idx)
  48. {
  49. for (size_t i = 0; i < N; ++i)
  50. {
  51. {
  52. for (;;)
  53. {
  54. auto result = co_await g_lock.try_lock();
  55. if (result) break;
  56. co_await yield();
  57. }
  58. ++g_counter;
  59. std::cout << "push:" << g_counter << " on " << idx << std::endl;
  60. co_await 50ms;
  61. co_await g_lock.unlock();
  62. }
  63. co_await 50ms;
  64. }
  65. }
  66. #endif
  67. static future_t<> test_mutex_timeout_push(size_t idx)
  68. {
  69. for (size_t i = 0; i < N; ++i)
  70. {
  71. {
  72. for (;;)
  73. {
  74. auto result = co_await g_lock.try_lock_for(10ms);
  75. if (result) break;
  76. co_await yield();
  77. }
  78. ++g_counter;
  79. std::cout << "push:" << g_counter << " on " << idx << std::endl;
  80. co_await 50ms;
  81. co_await g_lock.unlock();
  82. }
  83. co_await 50ms;
  84. }
  85. }
  86. //🔒-50ms-🗝-50ms-🔒-50ms-🗝-50ms-|
  87. //---------........---------.......
  88. static std::thread test_mutex_async_push(size_t idx)
  89. {
  90. return std::thread([=]
  91. {
  92. char provide_unique_address = 0;
  93. for (size_t i = 0; i < N; ++i)
  94. {
  95. if (g_lock.try_lock_for(500ms, &provide_unique_address))
  96. {
  97. batch_unlock_t _locker(std::adopt_lock, &provide_unique_address, g_lock);
  98. ++g_counter;
  99. std::cout << "push:" << g_counter << " on " << idx << std::endl;
  100. std::this_thread::sleep_for(50ms);
  101. //g_lock.unlock(&provide_unique_address);
  102. }
  103. std::this_thread::sleep_for(50ms);
  104. }
  105. });
  106. }
  107. static void resumable_mutex_synch()
  108. {
  109. go test_mutex_timeout_push(0);
  110. go test_mutex_pop(1);
  111. this_scheduler()->run_until_notask();
  112. std::cout << "result:" << g_counter << std::endl;
  113. }
  114. static void resumable_mutex_async()
  115. {
  116. auto th = test_mutex_async_push(0);
  117. std::this_thread::sleep_for(25ms);
  118. go test_mutex_pop(1);
  119. this_scheduler()->run_until_notask();
  120. th.join();
  121. std::cout << "result:" << g_counter << std::endl;
  122. }
  123. static future_t<> resumable_mutex_range_push(size_t idx, mutex_t a, mutex_t b, mutex_t c)
  124. {
  125. for (int i = 0; i < 10000; ++i)
  126. {
  127. batch_unlock_t __lockers = co_await mutex_t::lock(a, b, c);
  128. assert(a.is_locked());
  129. assert(b.is_locked());
  130. assert(c.is_locked());
  131. ++g_counter;
  132. //std::cout << "push:" << g_counter << " on " << idx << std::endl;
  133. //co_await 5ms;
  134. }
  135. }
  136. static future_t<> resumable_mutex_range_pop(size_t idx, mutex_t a, mutex_t b, mutex_t c)
  137. {
  138. for (int i = 0; i < 10000; ++i)
  139. {
  140. batch_unlock_t __lockers = co_await mutex_t::lock(a, b, c);
  141. assert(a.is_locked());
  142. assert(b.is_locked());
  143. assert(c.is_locked());
  144. --g_counter;
  145. //std::cout << "pop :" << g_counter << " on " << idx << std::endl;
  146. //co_await 5ms;
  147. }
  148. }
  149. static void resumable_mutex_lock_range()
  150. {
  151. mutex_t mtxA, mtxB, mtxC;
  152. //不同的线程里加锁也需要是线程安全的
  153. std::thread push_th([&]
  154. {
  155. local_scheduler_t __ls__;
  156. go resumable_mutex_range_push(10, mtxA, mtxB, mtxC);
  157. go resumable_mutex_range_push(11, mtxA, mtxC, mtxB);
  158. this_scheduler()->run_until_notask();
  159. });
  160. go resumable_mutex_range_pop(12, mtxC, mtxB, mtxA);
  161. go resumable_mutex_range_pop(13, mtxB, mtxA, mtxC);
  162. this_scheduler()->run_until_notask();
  163. push_th.join();
  164. std::cout << "result:" << g_counter << std::endl;
  165. }
  166. void resumable_main_mutex()
  167. {
  168. std::cout << "begin resumable_mutex_synch()" << std::endl;
  169. resumable_mutex_synch();
  170. std::cout << std::endl;
  171. std::cout << "begin resumable_mutex_async()" << std::endl;
  172. resumable_mutex_async();
  173. std::cout << std::endl;
  174. std::cout << "begin resumable_mutex_lock_range()" << std::endl;
  175. resumable_mutex_lock_range();
  176. }
  177. #if LIBRF_TUTORIAL_STAND_ALONE
  178. int main()
  179. {
  180. resumable_main_mutex();
  181. return 0;
  182. }
  183. #endif