基于C++ Coroutines提案 ‘Stackless Resumable Functions’编写的协程库
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

benchmark_asio_echo.cpp 7.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. /*
  2. #include <chrono>
  3. #include <iostream>
  4. #include <string>
  5. #include <conio.h>
  6. #include <thread>
  7. */
  8. #include "librf.h"
  9. #include <asio.hpp>
  10. #include "src/asio_task.h"
  11. #pragma warning(disable : 4834)
  12. using namespace asio;
  13. using namespace asio::ip;
  14. using namespace resumef;
  15. template<class _Ty, size_t _Size>
  16. union uarray
  17. {
  18. std::array<_Ty, _Size> c;
  19. template<class... Args>
  20. uarray(Args&... args)
  21. {
  22. for (auto & v : c)
  23. new(&v) _Ty(args...);
  24. }
  25. ~uarray()
  26. {
  27. for (auto & v : c)
  28. v.~_Ty();
  29. }
  30. };
  31. #define BUF_SIZE 1024
  32. std::atomic<intptr_t> g_echo_count = 0;
  33. future_t<> RunEchoSession(tcp::socket socket)
  34. {
  35. std::size_t bytes_transferred = 0;
  36. std::array<char, BUF_SIZE> buffer;
  37. for(;;)
  38. {
  39. #ifndef __clang__
  40. try
  41. #endif
  42. {
  43. bytes_transferred += co_await socket.async_read_some(asio::buffer(buffer.data() + bytes_transferred, buffer.size() - bytes_transferred), rf_task);
  44. if (bytes_transferred >= buffer.size())
  45. {
  46. co_await asio::async_write(socket, asio::buffer(buffer, buffer.size()), rf_task);
  47. bytes_transferred = 0;
  48. g_echo_count.fetch_add(1, std::memory_order_release);
  49. }
  50. }
  51. #ifndef __clang__
  52. catch (std::exception & e)
  53. {
  54. std::cerr << e.what() << std::endl;
  55. break;
  56. }
  57. #endif
  58. }
  59. }
  60. template<size_t _N>
  61. void AcceptConnections(tcp::acceptor & acceptor, uarray<tcp::socket, _N> & socketes)
  62. {
  63. try
  64. {
  65. for (size_t idx = 0; idx < socketes.c.size(); ++idx)
  66. {
  67. go[&, idx]() -> future_t<>
  68. {
  69. for (;;)
  70. {
  71. #ifndef __clang__
  72. try
  73. #endif
  74. {
  75. co_await acceptor.async_accept(socketes.c[idx], rf_task);
  76. go RunEchoSession(std::move(socketes.c[idx]));
  77. }
  78. #ifndef __clang__
  79. catch (std::exception & e)
  80. {
  81. std::cerr << e.what() << std::endl;
  82. }
  83. #endif
  84. }
  85. };
  86. }
  87. }
  88. catch (std::exception & e)
  89. {
  90. std::cerr << e.what() << std::endl;
  91. }
  92. }
  93. void StartPrintEchoCount()
  94. {
  95. using namespace std::literals;
  96. GO
  97. {
  98. for (;;)
  99. {
  100. g_echo_count.exchange(0, std::memory_order_release);
  101. co_await 1s;
  102. std::cout << g_echo_count.load(std::memory_order_acquire) << std::endl;
  103. }
  104. };
  105. }
  106. void RunOneBenchmark(bool bMain)
  107. {
  108. resumef::local_scheduler ls;
  109. asio::io_service io_service;
  110. tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 3456));
  111. uarray<tcp::socket, 16> socketes(io_service);
  112. AcceptConnections(acceptor, socketes);
  113. if (bMain) StartPrintEchoCount();
  114. for (;;)
  115. {
  116. io_service.poll();
  117. this_scheduler()->run_one_batch();
  118. }
  119. }
  120. void resumable_main_benchmark_asio_server()
  121. {
  122. #if RESUMEF_ENABLE_MULT_SCHEDULER
  123. std::array<std::thread, 2> thds;
  124. for (size_t i = 0; i < thds.size(); ++i)
  125. {
  126. thds[i] = std::thread(&RunOneBenchmark, i == 0);
  127. }
  128. for (auto & t : thds)
  129. t.join();
  130. #else
  131. RunOneBenchmark(true);
  132. #endif
  133. }
  134. //----------------------------------------------------------------------------------------------------------------------
  135. future_t<> RunPipelineEchoClient(asio::io_service & ios, tcp::resolver::iterator ep)
  136. {
  137. std::shared_ptr<tcp::socket> sptr = std::make_shared<tcp::socket>(ios);
  138. #ifndef __clang__
  139. try
  140. #endif
  141. {
  142. co_await asio::async_connect(*sptr, ep, rf_task);
  143. GO
  144. {
  145. std::array<char, BUF_SIZE> write_buff_;
  146. for (auto & c : write_buff_)
  147. c = 'A' + rand() % 52;
  148. #ifndef __clang__
  149. try
  150. #endif
  151. {
  152. for (;;)
  153. {
  154. co_await asio::async_write(*sptr, asio::buffer(write_buff_), rf_task);
  155. }
  156. }
  157. #ifndef __clang__
  158. catch (std::exception & e)
  159. {
  160. std::cerr << e.what() << std::endl;
  161. }
  162. #endif
  163. };
  164. GO
  165. {
  166. #ifndef __clang__
  167. try
  168. #endif
  169. {
  170. std::array<char, BUF_SIZE> read_buff_;
  171. for (;;)
  172. {
  173. co_await sptr->async_read_some(asio::buffer(read_buff_), rf_task);
  174. }
  175. }
  176. #ifndef __clang__
  177. catch (std::exception & e)
  178. {
  179. std::cerr << e.what() << std::endl;
  180. }
  181. #endif
  182. };
  183. }
  184. #ifndef __clang__
  185. catch (std::exception & e)
  186. {
  187. std::cerr << e.what() << std::endl;
  188. }
  189. #endif
  190. }
  191. #if _HAS_CXX17
  192. future_t<> RunPingPongEchoClient(asio::io_service & ios, tcp::resolver::iterator ep)
  193. {
  194. tcp::socket socket_{ ios };
  195. std::array<char, BUF_SIZE> read_buff_;
  196. std::array<char, BUF_SIZE> write_buff_;
  197. #ifndef __clang__
  198. try
  199. #endif
  200. {
  201. co_await asio::async_connect(socket_, ep, rf_task);
  202. for (auto & c : write_buff_)
  203. c = 'A' + rand() % 52;
  204. for (;;)
  205. {
  206. co_await when_all(
  207. asio::async_write(socket_, asio::buffer(write_buff_), rf_task),
  208. socket_.async_read_some(asio::buffer(read_buff_), rf_task)
  209. );
  210. }
  211. }
  212. #ifndef __clang__
  213. catch (std::exception & e)
  214. {
  215. std::cerr << e.what() << std::endl;
  216. }
  217. #endif
  218. }
  219. void resumable_main_benchmark_asio_client_with_rf(intptr_t nNum)
  220. {
  221. nNum = std::max((intptr_t)1, nNum);
  222. try
  223. {
  224. asio::io_service ios;
  225. asio::ip::tcp::resolver resolver_(ios);
  226. asio::ip::tcp::resolver::query query_("localhost", "3456");
  227. tcp::resolver::iterator iter = resolver_.resolve(query_);
  228. for (intptr_t i = 0; i < nNum; ++i)
  229. {
  230. go RunPingPongEchoClient(ios, iter);
  231. }
  232. for (;;)
  233. {
  234. ios.poll();
  235. this_scheduler()->run_one_batch();
  236. }
  237. }
  238. catch (std::exception & e)
  239. {
  240. std::cout << e.what() << std::endl;
  241. }
  242. }
  243. #endif
  244. class chat_session : public std::enable_shared_from_this<chat_session>
  245. {
  246. public:
  247. chat_session(asio::io_service & ios, tcp::resolver::iterator ep)
  248. : socket_(ios)
  249. , endpoint_(ep)
  250. {
  251. }
  252. void start()
  253. {
  254. do_connect();
  255. }
  256. private:
  257. void do_connect()
  258. {
  259. auto self = this->shared_from_this();
  260. asio::async_connect(socket_, endpoint_,
  261. [this, self](std::error_code ec, tcp::resolver::iterator )
  262. {
  263. if (!ec)
  264. {
  265. for (auto & c : write_buff_)
  266. c = 'A' + rand() % 52;
  267. do_write();
  268. }
  269. else
  270. {
  271. std::cerr << ec.message() << std::endl;
  272. }
  273. });
  274. }
  275. void do_read()
  276. {
  277. auto self(shared_from_this());
  278. socket_.async_read_some(asio::buffer(read_buff_),
  279. [this, self](const asio::error_code& ec, std::size_t )
  280. {
  281. if (!ec)
  282. {
  283. do_write();
  284. }
  285. else
  286. {
  287. std::cerr << ec.message() << std::endl;
  288. }
  289. });
  290. }
  291. void do_write()
  292. {
  293. auto self(shared_from_this());
  294. asio::async_write(socket_,
  295. asio::buffer(write_buff_),
  296. [this, self](std::error_code ec, std::size_t)
  297. {
  298. if (!ec)
  299. {
  300. do_read();
  301. }
  302. else
  303. {
  304. std::cerr << ec.message() << std::endl;
  305. }
  306. });
  307. }
  308. tcp::socket socket_;
  309. tcp::resolver::iterator endpoint_;
  310. std::array<char, BUF_SIZE> read_buff_;
  311. std::array<char, BUF_SIZE> write_buff_;
  312. };
  313. void resumable_main_benchmark_asio_client_with_callback(intptr_t nNum)
  314. {
  315. nNum = std::max((intptr_t)1, nNum);
  316. try
  317. {
  318. asio::io_service ios;
  319. asio::ip::tcp::resolver resolver_(ios);
  320. asio::ip::tcp::resolver::query query_("127.0.0.1", "3456");
  321. tcp::resolver::iterator iter = resolver_.resolve(query_);
  322. for (intptr_t i = 0; i < nNum; ++i)
  323. {
  324. auto chat = std::make_shared<chat_session>(ios, iter);
  325. chat->start();
  326. }
  327. ios.run();
  328. }
  329. catch (std::exception & e)
  330. {
  331. std::cout << "Exception: " << e.what() << "\n";
  332. }
  333. }
  334. void resumable_main_benchmark_asio_client(intptr_t nNum)
  335. {
  336. resumable_main_benchmark_asio_client_with_callback(nNum);
  337. }