#include #include "librf/librf.h" #include #include "../asio/asio_task.h" #pragma warning(disable : 4834) using namespace asio::ip; using namespace librf; template union uarray { std::array<_Ty, _Size> c; template uarray(Args&... args) { for (auto & v : c) new(&v) _Ty(args...); } ~uarray() { for (auto & v : c) v.~_Ty(); } }; #define BUF_SIZE 1024 std::atomic g_echo_count = 0; future_t<> RunEchoSession(tcp::socket socket) { std::size_t bytes_transferred = 0; std::array buffer; for(;;) { try { bytes_transferred += co_await socket.async_read_some( asio::buffer(buffer.data() + bytes_transferred, buffer.size() - bytes_transferred), asio::rf_task); if (bytes_transferred >= buffer.size()) { co_await asio::async_write(socket, asio::buffer(buffer, buffer.size()), asio::rf_task); bytes_transferred = 0; g_echo_count.fetch_add(1, std::memory_order_release); } } catch (std::exception & e) { std::cerr << e.what() << std::endl; break; } } } template void AcceptConnections(tcp::acceptor & acceptor, uarray & socketes) { try { for (size_t idx = 0; idx < socketes.c.size(); ++idx) { go[&, idx]() -> future_t<> { for (;;) { try { co_await acceptor.async_accept(socketes.c[idx], asio::rf_task); go RunEchoSession(std::move(socketes.c[idx])); } catch (std::exception & e) { std::cerr << e.what() << std::endl; } } }; } } catch (std::exception & e) { std::cerr << e.what() << std::endl; } } void StartPrintEchoCount() { using namespace std::literals; GO { for (;;) { g_echo_count.exchange(0, std::memory_order_release); co_await 1s; std::cout << g_echo_count.load(std::memory_order_acquire) << std::endl; } }; } void RunOneBenchmark(bool bMain) { librf::local_scheduler_t ls; asio::io_service io_service; tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 3456)); uarray socketes(io_service); AcceptConnections(acceptor, socketes); if (bMain) StartPrintEchoCount(); for (;;) { io_service.poll(); this_scheduler()->run_one_batch(); } } void resumable_main_benchmark_asio_server() { std::array thds; for (size_t i = 0; i < thds.size(); ++i) { thds[i] = std::thread(&RunOneBenchmark, i == 0); } for (auto & t : thds) t.join(); } //---------------------------------------------------------------------------------------------------------------------- future_t<> RunPipelineEchoClient(asio::io_service & ios, tcp::resolver::iterator ep) { std::shared_ptr sptr = std::make_shared(ios); try { co_await asio::async_connect(*sptr, ep, asio::rf_task); GO { std::array write_buff_; for (auto & c : write_buff_) c = 'A' + rand() % 52; try { for (;;) { co_await asio::async_write(*sptr, asio::buffer(write_buff_), asio::rf_task); } } catch (std::exception & e) { std::cerr << e.what() << std::endl; } }; GO { try { std::array read_buff_; for (;;) { co_await sptr->async_read_some(asio::buffer(read_buff_), asio::rf_task); } } catch (std::exception & e) { std::cerr << e.what() << std::endl; } }; } catch (std::exception & e) { std::cerr << e.what() << std::endl; } } #if _HAS_CXX17 future_t<> RunPingPongEchoClient(asio::io_service & ios, tcp::resolver::iterator ep) { tcp::socket socket_{ ios }; std::array read_buff_; std::array write_buff_; try { co_await asio::async_connect(socket_, ep, asio::rf_task); for (auto & c : write_buff_) c = 'A' + rand() % 52; for (;;) { co_await when_all( asio::async_write(socket_, asio::buffer(write_buff_), asio::rf_task), socket_.async_read_some(asio::buffer(read_buff_), asio::rf_task) ); } } catch (std::exception & e) { std::cerr << e.what() << std::endl; } } void resumable_main_benchmark_asio_client_with_rf(intptr_t nNum) { nNum = std::max((intptr_t)1, nNum); try { asio::io_service ios; asio::ip::tcp::resolver resolver_(ios); asio::ip::tcp::resolver::query query_("localhost", "3456"); tcp::resolver::iterator iter = resolver_.resolve(query_); for (intptr_t i = 0; i < nNum; ++i) { go RunPingPongEchoClient(ios, iter); } for (;;) { ios.poll(); this_scheduler()->run_one_batch(); } } catch (std::exception & e) { std::cout << e.what() << std::endl; } } #endif class chat_session : public std::enable_shared_from_this { public: chat_session(asio::io_service & ios, tcp::resolver::iterator ep) : socket_(ios) , endpoint_(ep) { } void start() { do_connect(); } private: void do_connect() { auto self = this->shared_from_this(); asio::async_connect(socket_, endpoint_, [this, self](std::error_code ec, tcp::resolver::iterator ) { if (!ec) { for (auto & c : write_buff_) c = 'A' + rand() % 52; do_write(); } else { std::cerr << ec.message() << std::endl; } }); } void do_read() { auto self(shared_from_this()); socket_.async_read_some(asio::buffer(read_buff_), [this, self](const asio::error_code& ec, std::size_t ) { if (!ec) { do_write(); } else { std::cerr << ec.message() << std::endl; } }); } void do_write() { auto self(shared_from_this()); asio::async_write(socket_, asio::buffer(write_buff_), [this, self](std::error_code ec, std::size_t) { if (!ec) { do_read(); } else { std::cerr << ec.message() << std::endl; } }); } tcp::socket socket_; tcp::resolver::iterator endpoint_; std::array read_buff_; std::array write_buff_; }; void resumable_main_benchmark_asio_client_with_callback(intptr_t nNum) { nNum = std::max((intptr_t)1, nNum); try { asio::io_service ios; asio::ip::tcp::resolver resolver_(ios); asio::ip::tcp::resolver::query query_("127.0.0.1", "3456"); tcp::resolver::iterator iter = resolver_.resolve(query_); for (intptr_t i = 0; i < nNum; ++i) { auto chat = std::make_shared(ios, iter); chat->start(); } ios.run(); } catch (std::exception & e) { std::cout << "Exception: " << e.what() << "\n"; } } void resumable_main_benchmark_asio_client(intptr_t nNum) { resumable_main_benchmark_asio_client_with_callback(nNum); } #if LIBRF_TUTORIAL_STAND_ALONE int main(int argc, const char* argv[]) { if (argc > 1) resumable_main_benchmark_asio_client(atoi(argv[1])); else resumable_main_benchmark_asio_server(); return 0; } #endif