1
0
mirror of https://github.com/tearshark/librf.git synced 2024-10-02 00:00:11 +08:00
librf/tutorial/test_async_channel_mult_thread.cpp

115 lines
2.5 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//验证channel是否线程安全
#include <chrono>
#include <iostream>
#include <string>
#include <thread>
#include <deque>
#include <mutex>
#include "librf.h"
using namespace resumef;
using namespace std::chrono;
static std::mutex cout_mutex;
std::atomic<intptr_t> gcounter = 0;
#define OUTPUT_DEBUG 0
future_t<> test_channel_consumer(channel_t<std::string> c, size_t cnt)
{
for (size_t i = 0; i < cnt; ++i)
{
try
{
auto val = co_await c.read();
++gcounter;
#if OUTPUT_DEBUG
{
scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << "R " << val << "@" << std::this_thread::get_id() << std::endl;
}
#endif
}
catch (channel_exception& e)
{
//MAX_CHANNEL_QUEUE=0,并且先读后写会触发read_before_write异常
scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << e.what() << std::endl;
}
#if OUTPUT_DEBUG
co_await sleep_for(50ms);
#endif
}
}
future_t<> test_channel_producer(channel_t<std::string> c, size_t cnt)
{
for (size_t i = 0; i < cnt; ++i)
{
co_await c.write(std::to_string(i));
#if OUTPUT_DEBUG
{
scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << "W " << i << "@" << std::this_thread::get_id() << std::endl;
}
#endif
}
}
const size_t WRITE_THREAD = 6;
const size_t READ_THREAD = 6;
const size_t READ_BATCH = 1000000;
const size_t MAX_CHANNEL_QUEUE = 5; //0, 1, 5, 10, -1
void resumable_main_channel_mult_thread()
{
channel_t<std::string> c(MAX_CHANNEL_QUEUE);
std::thread write_th[WRITE_THREAD];
for (size_t i = 0; i < WRITE_THREAD; ++i)
{
write_th[i] = std::thread([&]
{
local_scheduler_t my_scheduler;
go test_channel_producer(c, READ_BATCH * READ_THREAD / WRITE_THREAD);
this_scheduler()->run_until_notask();
{
scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << "Write OK\r\n";
}
});
}
std::this_thread::sleep_for(100ms);
std::thread read_th[READ_THREAD];
for (size_t i = 0; i < READ_THREAD; ++i)
{
read_th[i] = std::thread([&]
{
local_scheduler_t my_scheduler;
go test_channel_consumer(c, READ_BATCH);
this_scheduler()->run_until_notask();
{
scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << "Read OK\r\n";
}
});
}
std::this_thread::sleep_for(100ms);
scheduler_t::g_scheduler.run_until_notask();
for(auto & th : read_th)
th.join();
for (auto& th : write_th)
th.join();
std::cout << "OK: counter = " << gcounter.load() << std::endl;
}