Procházet zdrojové kódy

深入ASIO的pingpong测试

根据VS的静态分析改善代码
tags/v2.9.7
tearshark před 5 roky
rodič
revize
fc2d08af7c

+ 150
- 15
benchmark/benchmark_asio_echo.cpp Zobrazit soubor

@@ -39,7 +39,7 @@ union uarray
#define BUF_SIZE 1024
intptr_t g_echo_count = 0;
std::atomic<intptr_t> g_echo_count = 0;
future_vt RunEchoSession(tcp::socket socket)
{
@@ -55,7 +55,7 @@ future_vt RunEchoSession(tcp::socket socket)
co_await asio::async_write(socket, asio::buffer(buffer, buffer.size()), rf_task);
bytes_transferred = 0;
++g_echo_count;
g_echo_count.fetch_add(1, std::memory_order_release);
}
}
catch (std::exception & e)
@@ -82,7 +82,7 @@ void AcceptConnections(tcp::acceptor & acceptor, uarray<tcp::socket, _N> & socke
co_await acceptor.async_accept(socketes.c[idx], rf_task);
go RunEchoSession(std::move(socketes.c[idx]));
}
catch (std::exception e)
catch (std::exception & e)
{
std::cerr << e.what() << std::endl;
}
@@ -96,25 +96,31 @@ void AcceptConnections(tcp::acceptor & acceptor, uarray<tcp::socket, _N> & socke
}
}
void resumable_main_benchmark_asio_server()
void StartPrintEchoCount()
{
using namespace std::literals;
asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 3456));
uarray<tcp::socket, 128> socketes(acceptor.get_io_service());
AcceptConnections(acceptor, socketes);
GO
{
for (;;)
{
g_echo_count = 0;
g_echo_count.exchange(0, std::memory_order_release);
co_await 1s;
std::cout << g_echo_count << std::endl;
}
std::cout << g_echo_count.load(std::memory_order_acquire) << std::endl;
}
};
}
void RunOneBenchmark(bool bMain)
{
resumef::local_scheduler ls;
asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 3456));
uarray<tcp::socket, 16> socketes(acceptor.get_io_service());
AcceptConnections(acceptor, socketes);
if (bMain) StartPrintEchoCount();
for (;;)
{
@@ -123,6 +129,130 @@ void resumable_main_benchmark_asio_server()
}
}
void resumable_main_benchmark_asio_server()
{
#if RESUMEF_ENABLE_MULT_SCHEDULER
std::array<std::thread, 2> thds;
for (size_t i = 0; i < thds.size(); ++i)
{
thds[i] = std::thread(&RunOneBenchmark, i == 0);
}
for (auto & t : thds)
t.join();
#else
RunOneBenchmark(true);
#endif
}
//----------------------------------------------------------------------------------------------------------------------
future_vt RunPipelineEchoClient(asio::io_service & ios, tcp::resolver::iterator ep)
{
std::shared_ptr<tcp::socket> sptr = std::make_shared<tcp::socket>(ios);
try
{
co_await asio::async_connect(*sptr, ep, rf_task);
GO
{
std::array<char, BUF_SIZE> write_buff_;
for (auto & c : write_buff_)
c = 'A' + rand() % 52;
try
{
for (;;)
{
co_await asio::async_write(*sptr, asio::buffer(write_buff_), rf_task);
}
}
catch (std::exception & e)
{
std::cerr << e.what() << std::endl;
}
};
GO
{
try
{
std::array<char, BUF_SIZE> read_buff_;
for (;;)
{
co_await sptr->async_read_some(asio::buffer(read_buff_), rf_task);
}
}
catch (std::exception & e)
{
std::cerr << e.what() << std::endl;
}
};
}
catch (std::exception & e)
{
std::cerr << e.what() << std::endl;
}
}
future_vt RunPingPongEchoClient(asio::io_service & ios, tcp::resolver::iterator ep)
{
tcp::socket socket_{ ios };
std::array<char, BUF_SIZE> read_buff_;
std::array<char, BUF_SIZE> write_buff_;
try
{
co_await asio::async_connect(socket_, ep, rf_task);
for (auto & c : write_buff_)
c = 'A' + rand() % 52;
for (;;)
{
co_await when_all(
asio::async_write(socket_, asio::buffer(write_buff_), rf_task),
socket_.async_read_some(asio::buffer(read_buff_), rf_task)
);
}
}
catch (std::exception & e)
{
std::cerr << e.what() << std::endl;
}
}
void resumable_main_benchmark_asio_client_with_rf(intptr_t nNum)
{
nNum = std::max((intptr_t)1, nNum);
try
{
asio::io_service ios;
asio::ip::tcp::resolver resolver_(ios);
asio::ip::tcp::resolver::query query_("localhost", "3456");
tcp::resolver::iterator iter = resolver_.resolve(query_);
for (intptr_t i = 0; i < nNum; ++i)
{
go RunPingPongEchoClient(ios, iter);
}
for (;;)
{
ios.poll();
this_scheduler()->run_one_batch();
}
}
catch (std::exception & e)
{
std::cout << e.what() << std::endl;
}
}
class chat_session : public std::enable_shared_from_this<chat_session>
{
public:
@@ -162,7 +292,7 @@ private:
{
auto self(shared_from_this());
socket_.async_read_some(asio::buffer(read_buff_),
[this, self](const asio::error_code& ec, std::size_t size)
[this, self](const asio::error_code& ec, std::size_t )
{
if (!ec)
{
@@ -200,7 +330,7 @@ private:
std::array<char, BUF_SIZE> write_buff_;
};
void resumable_main_benchmark_asio_client(intptr_t nNum)
void resumable_main_benchmark_asio_client_with_callback(intptr_t nNum)
{
nNum = std::max((intptr_t)1, nNum);
@@ -224,4 +354,9 @@ void resumable_main_benchmark_asio_client(intptr_t nNum)
{
std::cout << "Exception: " << e.what() << "\n";
}
}
void resumable_main_benchmark_asio_client(intptr_t nNum)
{
resumable_main_benchmark_asio_client_with_callback(nNum);
}

+ 1
- 1
benchmark/benchmark_async_mem.cpp Zobrazit soubor

@@ -32,5 +32,5 @@ void resumable_main_benchmark_mem()
}
resumef::this_scheduler()->run_until_notask();
_getch();
(void)_getch();
}

+ 7
- 7
librf/src/event.cpp Zobrazit soubor

@@ -93,8 +93,8 @@ namespace resumef
});
_event->wait_(awaker);
this_scheduler()->timer()->add(tp,
[awaker](bool bValue)
(void)this_scheduler()->timer()->add(tp,
[awaker](bool )
{
awaker->awake(nullptr, 1);
});
@@ -208,8 +208,8 @@ namespace resumef
e->wait_(awaker);
}
this_scheduler()->timer()->add(tp,
[awaker](bool bValue)
(void)this_scheduler()->timer()->add(tp,
[awaker](bool )
{
awaker->awake(nullptr, 1);
});
@@ -324,8 +324,8 @@ namespace resumef
promise_t<bool> awaitable;
if (evts.size() <= 0)
{
this_scheduler()->timer()->add_handler(tp,
[st = awaitable._state](bool bValue)
(void)this_scheduler()->timer()->add_handler(tp,
[st = awaitable._state](bool )
{
st->set_value(false);
});
@@ -337,7 +337,7 @@ namespace resumef
ctx->evts_waited.reserve(evts.size());
ctx->evts = std::move(evts);
ctx->th = std::move(this_scheduler()->timer()->add_handler(tp,
[ctx](bool bValue)
[ctx](bool )
{
ctx->awake(nullptr);
}));

+ 3
- 2
librf/src/generator.h Zobrazit soubor

@@ -204,12 +204,13 @@ namespace experimental {
generator &operator=(generator const &) = delete;
generator(generator &&right_) : _Coro(right_._Coro)
generator(generator &&right_) noexcept
: _Coro(right_._Coro)
{
right_._Coro = nullptr;
}
generator &operator=(generator &&right_)
generator &operator=(generator &&right_) noexcept
{
if (&right_ != this) {
_Coro = right_._Coro;

+ 2
- 2
librf/src/mutex.cpp Zobrazit soubor

@@ -115,8 +115,8 @@ namespace resumef
});
_locker->lock_(awaker);
this_scheduler()->timer()->add(tp,
[awaker](bool bValue)
(void)this_scheduler()->timer()->add(tp,
[awaker](bool )
{
awaker->awake(nullptr, 1);
});

+ 1
- 1
librf/src/rf_task.h Zobrazit soubor

@@ -120,7 +120,7 @@ namespace resumef
//返回true,表示任务还未完成,后续还需要继续执行
//否则,任务从调度器里删除
virtual bool go_next(scheduler * schdler) override
virtual bool go_next(scheduler * ) override
{
auto * _state = _future._state.get();
_state->resume();

+ 1
- 1
librf/src/sleep.cpp Zobrazit soubor

@@ -8,7 +8,7 @@ namespace resumef
{
promise_vt awaitable;
scheduler_.timer()->add(tp_,
(void)scheduler_.timer()->add(tp_,
[st = awaitable._state](bool cancellation_requested)
{
if (cancellation_requested)

+ 1
- 1
librf/src/state.cpp Zobrazit soubor

@@ -20,7 +20,7 @@ namespace resumef
{
return !_state->ready();
}
virtual bool go_next(scheduler * schdler) override
virtual bool go_next(scheduler * ) override
{
_state->resume();
return false;

+ 4
- 4
librf/src/when.h Zobrazit soubor

@@ -91,7 +91,7 @@ namespace resumef
, _f(std::move(f))
, _val(v)
{}
when_all_functor(when_all_functor &&) = default;
when_all_functor(when_all_functor &&) noexcept = default;
when_all_functor & operator = (const when_all_functor &) = default;
when_all_functor & operator = (when_all_functor &&) = default;
@@ -112,7 +112,7 @@ namespace resumef
};
template<class _Tup, size_t _Idx>
inline void when_all_one__(scheduler & s, const when_impl_ptr & e, _Tup & t)
inline void when_all_one__(scheduler & , const when_impl_ptr & , _Tup & )
{
}
@@ -202,7 +202,7 @@ namespace resumef
{
assert(idx >= 0);
}
when_any_functor(when_any_functor &&) = default;
when_any_functor(when_any_functor &&) noexcept = default;
when_any_functor & operator = (const when_any_functor &) = default;
when_any_functor & operator = (when_any_functor &&) = default;
@@ -238,7 +238,7 @@ namespace resumef
};
template<intptr_t _Idx>
inline void when_any_one__(scheduler & s, const when_impl_ptr & e, const when_any_result_ptr & t)
inline void when_any_one__(scheduler & , const when_impl_ptr & , const when_any_result_ptr & )
{
}

+ 1
- 1
tutorial/test_async_channel_mult_thread.cpp Zobrazit soubor

@@ -103,5 +103,5 @@ void resumable_main_channel_mult_thread()
write_th.join();
std::cout << "OK: counter = " << gcounter.load() << std::endl;
_getch();
(void)_getch();
}

+ 4
- 0
tutorial/test_async_event_timeout.cpp Zobrazit soubor

@@ -63,6 +63,8 @@ void test_wait_timeout_any_invalid()
{
intptr_t idx = co_await event_t::wait_any_for(500ms, std::begin(evts), std::end(evts));
assert(idx < 0);
(void)idx;
std::cout << "invalid wait!" << std::endl;
};
this_scheduler()->run_until_notask();
@@ -118,6 +120,8 @@ void test_wait_timeout_all_invalid()
{
bool result = co_await event_t::wait_all_for(500ms, std::begin(evts), std::end(evts));
assert(!result);
(void)result;
std::cout << "invalid wait!" << std::endl;
};
this_scheduler()->run_until_notask();

+ 3
- 0
vs_proj/librf.sln Zobrazit soubor

@@ -31,4 +31,7 @@ Global
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {401D9E59-A4B3-4CA3-9696-B7D2D14D90FD}
EndGlobalSection
GlobalSection(Performance) = preSolution
HasPerformanceSessions = true
EndGlobalSection
EndGlobal

+ 21
- 9
vs_proj/librf.vcxproj Zobrazit soubor

@@ -81,12 +81,14 @@
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<LinkIncremental>false</LinkIncremental>
<CodeAnalysisRuleSet>NativeRecommendedRules.ruleset</CodeAnalysisRuleSet>
<RunCodeAnalysis>false</RunCodeAnalysis>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>WIN32;_DEBUG;_CONSOLE;ASIO_STANDALONE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<SDLCheck>true</SDLCheck>
<AdditionalIncludeDirectories>..\librf;..\..\asio-1.10.6\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalOptions>/await</AdditionalOptions>
@@ -102,7 +104,7 @@
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;ASIO_STANDALONE;RESUMEF_DEBUG_COUNTER=0;RESUMEF_ENABLE_MULT_SCHEDULER=0;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;ASIO_STANDALONE;RESUMEF_DEBUG_COUNTER=0;RESUMEF_ENABLE_MULT_SCHEDULER=1;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<SDLCheck>true</SDLCheck>
<AdditionalIncludeDirectories>..\librf;..\..\asio-1.10.6\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalOptions>/await</AdditionalOptions>
@@ -121,31 +123,37 @@
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<Optimization>MaxSpeed</Optimization>
<Optimization>Full</Optimization>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>WIN32;NDEBUG;_CONSOLE;ASIO_STANDALONE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<SDLCheck>
</SDLCheck>
<AdditionalIncludeDirectories>..\librf;..\..\asio-1.10.6\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalOptions>/await</AdditionalOptions>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<LanguageStandard>stdcpplatest</LanguageStandard>
<BufferSecurityCheck>false</BufferSecurityCheck>
<InlineFunctionExpansion>AnySuitable</InlineFunctionExpansion>
<EnableFiberSafeOptimizations>true</EnableFiberSafeOptimizations>
<StringPooling>true</StringPooling>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
<GenerateDebugInformation>true</GenerateDebugInformation>
<Profile>true</Profile>
<Profile>
</Profile>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<WarningLevel>Level4</WarningLevel>
<Optimization>MaxSpeed</Optimization>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>NDEBUG;_CONSOLE;ASIO_STANDALONE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>NDEBUG;_CONSOLE;ASIO_STANDALONE;RESUMEF_ENABLE_MULT_SCHEDULER=0;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalIncludeDirectories>..\librf;..\..\asio-1.10.6\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalOptions>/await</AdditionalOptions>
<ExceptionHandling>Sync</ExceptionHandling>
@@ -157,6 +165,9 @@
<LanguageStandard>stdcpplatest</LanguageStandard>
<BufferSecurityCheck>false</BufferSecurityCheck>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<SDLCheck>
</SDLCheck>
<EnablePREfast>false</EnablePREfast>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
@@ -164,7 +175,8 @@
<OptimizeReferences>true</OptimizeReferences>
<GenerateDebugInformation>true</GenerateDebugInformation>
<LinkTimeCodeGeneration>UseLinkTimeCodeGeneration</LinkTimeCodeGeneration>
<Profile>true</Profile>
<Profile>
</Profile>
</Link>
</ItemDefinitionGroup>
<ItemGroup>

Načítá se…
Zrušit
Uložit