Browse Source

测试channel在多线程多协程调度下的表现

测试结果表明N:M模型的代码还存在BUG
tags/v2.9.7
tearshark 6 years ago
parent
commit
84fe5c4b1a

+ 5
- 2
librf/src/_awaker.h View File

// 返回false表示此事件已经无效,event内部只删除此awaker // 返回false表示此事件已经无效,event内部只删除此awaker
typedef std::function<bool(_Ety * e, _Types...)> callee_type; typedef std::function<bool(_Ety * e, _Types...)> callee_type;
private: private:
spinlock _lock;
//typedef spinlock lock_type;
typedef std::recursive_mutex lock_type;
lock_type _lock;
callee_type _callee; callee_type _callee;
std::atomic<intptr_t> _counter; std::atomic<intptr_t> _counter;
public: public:
bool awake(_Ety * e, intptr_t count_, const _Types&... args) bool awake(_Ety * e, intptr_t count_, const _Types&... args)
{ {
assert(count_ > 0); assert(count_ > 0);
scoped_lock<spinlock> lock_(this->_lock);
scoped_lock<lock_type> lock_(this->_lock);
if ((this->_counter.fetch_sub(count_) - count_) <= 0) if ((this->_counter.fetch_sub(count_) - count_) <= 0)
{ {

+ 25
- 55
librf/src/channel.h View File

typedef std::shared_ptr<channel_write_awaker> channel_write_awaker_ptr; typedef std::shared_ptr<channel_write_awaker> channel_write_awaker_ptr;
typedef std::pair<channel_write_awaker_ptr, _Ty> write_tuple_type; typedef std::pair<channel_write_awaker_ptr, _Ty> write_tuple_type;
private: private:
spinlock _lock; //保证访问本对象是线程安全的
//typedef spinlock lock_type;
typedef std::recursive_mutex lock_type;
lock_type _lock; //保证访问本对象是线程安全的
const size_t _max_counter; //数据队列的容量上限 const size_t _max_counter; //数据队列的容量上限
std::deque<_Ty> _values; //数据队列 std::deque<_Ty> _values; //数据队列
std::list<channel_read_awaker_ptr> _read_awakes; //读队列 std::list<channel_read_awaker_ptr> _read_awakes; //读队列
{ {
return read_(std::make_shared<channel_read_awaker>(std::forward<callee_t>(awaker))); return read_(std::make_shared<channel_read_awaker>(std::forward<callee_t>(awaker)));
} }
template<class callee_t, class = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, channel_write_awaker_ptr>::value>>
auto write(callee_t && awaker, const _Ty& val)
{
return write_(std::make_shared<channel_write_awaker>(std::forward<callee_t>(awaker)), val);
}
template<class callee_t, class = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, channel_write_awaker_ptr>::value>>
auto write(callee_t && awaker, _Ty&& val)
template<class callee_t, class _Ty2, class = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, channel_write_awaker_ptr>::value>>
auto write(callee_t && awaker, _Ty2&& val)
{ {
return write_(std::make_shared<channel_write_awaker>(std::forward<callee_t>(awaker)), std::forward<_Ty>(val));
return write_(std::make_shared<channel_write_awaker>(std::forward<callee_t>(awaker)), std::forward<_Ty2>(val));
} }
//如果已经触发了awaker,则返回true //如果已经触发了awaker,则返回true
//设计目标是线程安全的,实际情况待考察
bool read_(channel_read_awaker_ptr && r_awaker) bool read_(channel_read_awaker_ptr && r_awaker)
{ {
assert(r_awaker); assert(r_awaker);
scoped_lock<spinlock> lock_(this->_lock);
scoped_lock<lock_type> lock_(this->_lock);
bool ret_value; bool ret_value;
if (_values.size() > 0) if (_values.size() > 0)
return ret_value; return ret_value;
} }
void write_(channel_write_awaker_ptr && w_awaker, const _Ty& val)
{
assert(w_awaker);
scoped_lock<spinlock> lock_(this->_lock);
//如果满了,则不添加到数据队列,而是将“写等待”和值,放入“写队列”
bool is_full = _values.size() >= _max_counter;
if (is_full)
_write_awakes.push_back(std::make_pair(std::forward<channel_write_awaker_ptr>(w_awaker), val));
else
_values.push_back(val);
//如果已有读队列,则唤醒一个“读等待”
awake_one_reader_();
//触发 没有放入“写队列”的“写等待”
if (!is_full) w_awaker->awake(this, 1);
}
void write_(channel_write_awaker_ptr && w_awaker, _Ty&& val)
//设计目标是线程安全的,实际情况待考察
template<class _Ty2>
void write_(channel_write_awaker_ptr && w_awaker, _Ty2&& val)
{ {
assert(w_awaker); assert(w_awaker);
scoped_lock<spinlock> lock_(this->_lock);
scoped_lock<lock_type> lock_(this->_lock);
//如果满了,则不添加到数据队列,而是将“写等待”和值,放入“写队列” //如果满了,则不添加到数据队列,而是将“写等待”和值,放入“写队列”
bool is_full = _values.size() >= _max_counter; bool is_full = _values.size() >= _max_counter;
if (is_full) if (is_full)
_write_awakes.push_back(std::make_pair(std::forward<channel_write_awaker_ptr>(w_awaker), std::forward<_Ty>(val)));
_write_awakes.push_back(std::make_pair(std::forward<channel_write_awaker_ptr>(w_awaker), std::forward<_Ty2>(val)));
else else
_values.push_back(std::forward<_Ty>(val));
_values.push_back(std::forward<_Ty2>(val));
//如果已有读队列,则唤醒一个“读等待” //如果已有读队列,则唤醒一个“读等待”
awake_one_reader_(); awake_one_reader_();
} }
private: private:
//只能被write_函数调用,内部不再需要加锁
void awake_one_reader_() void awake_one_reader_()
{ {
//assert(!(_read_awakes.size() >= 0 && _values.size() == 0)); //assert(!(_read_awakes.size() >= 0 && _values.size() == 0));
} }
} }
//只能被read_函数调用,内部不再需要加锁
void awake_one_writer_() void awake_one_writer_()
{ {
for (auto iter = _write_awakes.begin(); iter != _write_awakes.end(); ) for (auto iter = _write_awakes.begin(); iter != _write_awakes.end(); )
} }
awaitable_t<bool> write(_Ty&& val) const
template<class _Ty2>
awaitable_t<bool> write(_Ty2&& val) const
{ {
awaitable_t<bool> awaitable; awaitable_t<bool> awaitable;
st->set_value(chan ? true : false); st->set_value(chan ? true : false);
return true; return true;
}); });
_chan->write_(std::move(awaker), std::forward<_Ty>(val));
_chan->write_(std::move(awaker), std::forward<_Ty2>(val));
return awaitable; return awaitable;
} }
awaitable_t<bool> write(const _Ty& val) const
{
awaitable_t<bool> awaitable;
auto awaker = std::make_shared<channel_write_awaker>(
[st = awaitable._state](channel_impl_type * chan) -> bool
{
st->set_value(chan ? true : false);
return true;
});
_chan->write_(std::move(awaker), val);
return awaitable;
}
awaitable_t<_Ty> read() const awaitable_t<_Ty> read() const
{ {
awaitable_t<_Ty> awaitable; awaitable_t<_Ty> awaitable;
return awaitable; return awaitable;
} }
awaitable_t<bool> operator << (_Ty&& val) const
{
return std::move(write(std::forward<_Ty>(val)));
}
awaitable_t<bool> operator << (const _Ty& val) const
template<class _Ty2>
awaitable_t<bool> operator << (_Ty2&& val) const
{ {
return std::move(write(val));
return std::move(write(std::forward<_Ty2>(val)));
} }
awaitable_t<_Ty> operator co_await () const awaitable_t<_Ty> operator co_await () const
} }
#if _DEBUG #if _DEBUG
//非线程安全,返回的队列也不是线程安全的
const auto & debug_queue() const const auto & debug_queue() const
{ {
return _chan->debug_queue(); return _chan->debug_queue();
}; };
typedef channel_t<bool> semaphore_t;
using semaphore_t = channel_t<bool>;
} }

+ 8
- 6
librf/src/event.cpp View File

#include "event.h" #include "event.h"
#include <assert.h> #include <assert.h>
#include <mutex>
#include "scheduler.h" #include "scheduler.h"
namespace resumef namespace resumef
void event_impl::signal() void event_impl::signal()
{ {
scoped_lock<spinlock> lock_(this->_lock);
scoped_lock<lock_type> lock_(this->_lock);
++this->_counter; ++this->_counter;
void event_impl::reset() void event_impl::reset()
{ {
scoped_lock<spinlock> lock_(this->_lock);
scoped_lock<lock_type> lock_(this->_lock);
this->_awakes.clear(); this->_awakes.clear();
this->_counter = 0; this->_counter = 0;
{ {
assert(awaker); assert(awaker);
scoped_lock<spinlock> lock_(this->_lock);
scoped_lock<lock_type> lock_(this->_lock);
if (this->_counter > 0) if (this->_counter > 0)
{ {
struct event_t::wait_all_ctx struct event_t::wait_all_ctx
{ {
//typedef spinlock lock_type;
typedef std::recursive_mutex lock_type;
counted_ptr<state_t<bool>> st; counted_ptr<state_t<bool>> st;
std::vector<event_impl_ptr> evts; std::vector<event_impl_ptr> evts;
std::vector<event_impl_ptr> evts_waited; std::vector<event_impl_ptr> evts_waited;
timer_handler th; timer_handler th;
spinlock _lock;
lock_type _lock;
wait_all_ctx() wait_all_ctx()
{ {
bool awake(detail::event_impl * eptr) bool awake(detail::event_impl * eptr)
{ {
scoped_lock<spinlock> lock_(this->_lock);
scoped_lock<lock_type> lock_(this->_lock);
//如果st为nullptr,则说明之前已经返回过值了。本环境无效了。 //如果st为nullptr,则说明之前已经返回过值了。本环境无效了。
if (!st.get()) if (!st.get())

+ 4
- 1
librf/src/event.h View File

struct event_impl : public std::enable_shared_from_this<event_impl> struct event_impl : public std::enable_shared_from_this<event_impl>
{ {
private: private:
//typedef spinlock lock_type;
typedef std::recursive_mutex lock_type;
std::list<event_awaker_ptr> _awakes; std::list<event_awaker_ptr> _awakes;
intptr_t _counter; intptr_t _counter;
spinlock _lock;
lock_type _lock;
public: public:
RF_API event_impl(intptr_t initial_counter_); RF_API event_impl(intptr_t initial_counter_);

+ 3
- 3
librf/src/mutex.cpp View File

void mutex_impl::unlock() void mutex_impl::unlock()
{ {
scoped_lock<spinlock> lock_(this->_lock);
scoped_lock<lock_type> lock_(this->_lock);
if (_owner != nullptr) if (_owner != nullptr)
{ {
{ {
assert(awaker); assert(awaker);
scoped_lock<spinlock> lock_(this->_lock);
scoped_lock<lock_type> lock_(this->_lock);
if (_owner == nullptr) if (_owner == nullptr)
{ {
{ {
assert(awaker); assert(awaker);
scoped_lock<spinlock> lock_(this->_lock);
scoped_lock<lock_type> lock_(this->_lock);
if (_owner == nullptr) if (_owner == nullptr)
{ {

+ 4
- 1
librf/src/mutex.h View File

struct mutex_impl : public std::enable_shared_from_this<mutex_impl> struct mutex_impl : public std::enable_shared_from_this<mutex_impl>
{ {
private: private:
//typedef spinlock lock_type;
typedef std::recursive_mutex lock_type;
std::list<mutex_awaker_ptr> _awakes; std::list<mutex_awaker_ptr> _awakes;
mutex_awaker_ptr _owner; mutex_awaker_ptr _owner;
spinlock _lock;
lock_type _lock;
public: public:
RF_API mutex_impl(); RF_API mutex_impl();

+ 2
- 2
librf/src/rf_task.h View File

template<class _Ty> template<class _Ty>
struct task_t; struct task_t;
//co_task接受的是一个experimental::generator<_Ty>类型,是调用一个支持异步的函数后返回的结果
//task_t接受的是一个experimental::generator<_Ty>类型,是调用一个支持异步的函数后返回的结果
template<class _Ty> template<class _Ty>
struct task_t<std::experimental::generator<_Ty> > : public task_base struct task_t<std::experimental::generator<_Ty> > : public task_base
{ {
//---------------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------------
//co_task_with_ctx接受的是一个'函数对象'
//ctx_task_t接受的是一个'函数对象'
//这个'函数对象'被调用后,返回generator<_Ty>/future_t<_Ty>类型 //这个'函数对象'被调用后,返回generator<_Ty>/future_t<_Ty>类型
//然后'函数对象'作为异步执行的上下文状态保存起来 //然后'函数对象'作为异步执行的上下文状态保存起来
template<class _Ctx> template<class _Ctx>

+ 13
- 12
librf/src/scheduler.cpp View File

void scheduler::cancel_all_task_() void scheduler::cancel_all_task_()
{ {
{ {
scoped_lock<std::recursive_mutex> __guard(_mtx_ready);
for (auto task : this->_ready_task)
scoped_lock<std::recursive_mutex> __guard(_mtx_task);
for (auto task : this->_task)
{ {
task->cancel(); task->cancel();
delete task; delete task;
} }
this->_ready_task.clear();
this->_task.clear();
} }
{ {
scoped_lock<std::recursive_mutex> __guard(_mtx_task);
for (auto task : this->_task)
scoped_lock<std::recursive_mutex> __guard(_mtx_ready);
for (auto task : this->_ready_task)
{ {
task->cancel(); task->cancel();
delete task; delete task;
} }
this->_task.clear();
this->_ready_task.clear();
} }
} }
iter = this->_task.erase(iter); iter = this->_task.erase(iter);
delete task; delete task;
} }
}
{
scoped_lock<std::recursive_mutex> __guard(_mtx_ready);
if (this->_ready_task.size() > 0)
{ {
this->_task.insert(this->_task.end(), this->_ready_task.begin(), this->_ready_task.end());
this->_ready_task.clear();
scoped_lock<std::recursive_mutex> __guard(_mtx_ready);
if (this->_ready_task.size() > 0)
{
this->_task.insert(this->_task.end(), this->_ready_task.begin(), this->_ready_task.end());
this->_ready_task.clear();
}
} }
} }
} }

+ 5
- 3
librf/src/state.cpp View File

if (sch_ == nullptr) if (sch_ == nullptr)
sch_ = this_scheduler(); sch_ = this_scheduler();
*/ */
sch_->push_task_internal(new awaitable_task_t<state_base>(this));
if (sch_)
sch_->push_task_internal(new awaitable_task_t<state_base>(this));
} }
} }
void state_base::set_exception(std::exception_ptr && e_) void state_base::set_exception(std::exception_ptr && e_)
{ {
scoped_lock<std::mutex> __guard(_mtx);
scoped_lock<lock_type> __guard(_mtx);
_exception = std::move(e_); _exception = std::move(e_);
// Set all members first as calling coroutine may reset stuff here. // Set all members first as calling coroutine may reset stuff here.
if (sch_ == nullptr) if (sch_ == nullptr)
sch_ = this_scheduler(); sch_ = this_scheduler();
*/ */
sch_->push_task_internal(new awaitable_task_t<state_base>(this));
if (sch_)
sch_->push_task_internal(new awaitable_task_t<state_base>(this));
} }
} }

+ 11
- 10
librf/src/state.h View File

struct state_base struct state_base
{ {
protected: protected:
std::mutex _mtx; //for value, _exception
typedef std::recursive_mutex lock_type;
lock_type _mtx; //for value, _exception
RF_API void set_value_none_lock(); RF_API void set_value_none_lock();
private: private:
void * _this_promise = nullptr; void * _this_promise = nullptr;
void cancel() void cancel()
{ {
scoped_lock<std::mutex> __guard(_mtx);
scoped_lock<lock_type> __guard(_mtx);
_cancellation = true; _cancellation = true;
_coro = nullptr; _coro = nullptr;
{ {
#if RESUMEF_DEBUG_COUNTER #if RESUMEF_DEBUG_COUNTER
{ {
scoped_lock<std::mutex> __lock(g_resumef_cout_mutex);
scoped_lock<lock_type> __lock(g_resumef_cout_mutex);
std::cout << "scheduler=" << current_scheduler() std::cout << "scheduler=" << current_scheduler()
<< ",coro=" << _coro.address() << ",coro=" << _coro.address()
void set_value(const value_type& t) void set_value(const value_type& t)
{ {
scoped_lock<std::mutex> __guard(_mtx);
scoped_lock<lock_type> __guard(_mtx);
_value = t; _value = t;
state_base::set_value_none_lock(); state_base::set_value_none_lock();
} }
void set_value(value_type&& t) void set_value(value_type&& t)
{ {
scoped_lock<std::mutex> __guard(_mtx);
scoped_lock<lock_type> __guard(_mtx);
_value = std::forward<value_type>(t); _value = std::forward<value_type>(t);
state_base::set_value_none_lock(); state_base::set_value_none_lock();
} }
_Ty & get_value() _Ty & get_value()
{ {
scoped_lock<std::mutex> __guard(_mtx);
scoped_lock<lock_type> __guard(_mtx);
if (!_ready) if (!_ready)
throw future_exception{ future_error::not_ready }; throw future_exception{ future_error::not_ready };
} }
void reset() void reset()
{ {
scoped_lock<std::mutex> __guard(_mtx);
scoped_lock<lock_type> __guard(_mtx);
state_base::reset_none_lock(); state_base::reset_none_lock();
_value = value_type{}; _value = value_type{};
} }
void set_value() void set_value()
{ {
scoped_lock<std::mutex> __guard(_mtx);
scoped_lock<lock_type> __guard(_mtx);
set_value_none_lock(); set_value_none_lock();
} }
void get_value() void get_value()
{ {
scoped_lock<std::mutex> __guard(_mtx);
scoped_lock<lock_type> __guard(_mtx);
if (!_ready) if (!_ready)
throw future_exception{ future_error::not_ready }; throw future_exception{ future_error::not_ready };
} }
void reset() void reset()
{ {
scoped_lock<std::mutex> __guard(_mtx);
scoped_lock<lock_type> __guard(_mtx);
reset_none_lock(); reset_none_lock();
} }

+ 9
- 9
tutorial/test_async_channel.cpp View File

using namespace resumef; using namespace resumef;
const size_t MAX_CHANNEL_QUEUE = 0; //0, 1, 5, 10, -1
const size_t MAX_CHANNEL_QUEUE = 5; //0, 1, 5, 10, -1
future_vt test_channel_read(const channel_t<size_t> & c)
future_vt test_channel_read(const channel_t<std::string> & c)
{ {
using namespace std::chrono; using namespace std::chrono;
{ {
try try
{ {
//auto val = co_await c.read();
auto val = co_await c; //第二种从channel读出数据的方法。利用重载operator co_await(),而不是c是一个awaitable_t。
auto val = co_await c.read();
//auto val = co_await c; //第二种从channel读出数据的方法。利用重载operator co_await(),而不是c是一个awaitable_t。
std::cout << val << ":"; std::cout << val << ":";
#if _DEBUG #if _DEBUG
} }
} }
future_vt test_channel_write(const channel_t<size_t> & c)
future_vt test_channel_write(const channel_t<std::string> & c)
{ {
using namespace std::chrono; using namespace std::chrono;
for (size_t i = 0; i < 10; ++i) for (size_t i = 0; i < 10; ++i)
{ {
//co_await c.write(i);
co_await (c << i); //第二种写入数据到channel的方法。因为优先级关系,需要将'c << i'括起来
co_await c.write(std::to_string(i));
//co_await (c << std::to_string(i)); //第二种写入数据到channel的方法。因为优先级关系,需要将'c << i'括起来
std::cout << "<" << i << ">:"; std::cout << "<" << i << ">:";
#if _DEBUG #if _DEBUG
void test_channel_read_first() void test_channel_read_first()
{ {
channel_t<size_t> c(MAX_CHANNEL_QUEUE);
channel_t<std::string> c(MAX_CHANNEL_QUEUE);
go test_channel_read(c); go test_channel_read(c);
go test_channel_write(c); go test_channel_write(c);
void test_channel_write_first() void test_channel_write_first()
{ {
channel_t<size_t> c(MAX_CHANNEL_QUEUE);
channel_t<std::string> c(MAX_CHANNEL_QUEUE);
go test_channel_write(c); go test_channel_write(c);
go test_channel_read(c); go test_channel_read(c);

+ 189
- 0
tutorial/test_async_channel_mult_thread.cpp View File

//验证channel是否线程安全
#include <chrono>
#include <iostream>
#include <string>
#include <conio.h>
#include <thread>
#include <deque>
#include <mutex>
#include "librf.h"
using namespace resumef;
using namespace std::chrono;
static std::mutex cout_mutex;
#define OUTPUT_DEBUG 0
future_vt test_channel_consumer(const channel_t<std::string> & c, size_t cnt)
{
for (size_t i = 0; i < cnt; ++i)
{
try
{
auto val = co_await c.read();
#if OUTPUT_DEBUG
{
scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << "R " << val << "@" << std::this_thread::get_id() << std::endl;
}
#endif
}
catch (channel_exception e)
{
//MAX_CHANNEL_QUEUE=0,并且先读后写,会触发read_before_write异常
scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << e.what() << std::endl;
}
#if OUTPUT_DEBUG
co_await sleep_for(50ms);
#endif
}
}
future_vt test_channel_producer(const channel_t<std::string> & c, size_t cnt)
{
for (size_t i = 0; i < cnt; ++i)
{
co_await c.write(std::to_string(i));
#if OUTPUT_DEBUG
{
scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << "W " << i << "@" << std::this_thread::get_id() << std::endl;
}
#endif
}
}
const size_t N = 8;
const size_t BATCH = 1000000;
const size_t MAX_CHANNEL_QUEUE = N + 1; //0, 1, 5, 10, -1
void resumable_main_channel_mult_thread()
{
channel_t<std::string> c(MAX_CHANNEL_QUEUE);
std::thread wth([&]
{
//local_scheduler my_scheduler; //2017/11/27日,仍然存在BUG。真多线程下调度,存在有协程无法被调度完成的BUG
go test_channel_producer(c, BATCH * N);
this_scheduler()->run_until_notask();
std::cout << "Write OK" << std::endl;
});
//std::this_thread::sleep_for(100ms);
std::thread rth[N];
for (size_t i = 0; i < N; ++i)
{
rth[i] = std::thread([&]
{
//local_scheduler my_scheduler; //2017/11/27日,仍然存在BUG。真多线程下调度,存在有协程无法被调度完成的BUG
go test_channel_consumer(c, BATCH);
this_scheduler()->run_until_notask();
std::cout << "Read OK" << std::endl;
});
}
for(auto & th : rth)
th.join();
wth.join();
std::cout << "OK" << std::endl;
_getch();
}
//----------------------------------------------------------------------------------------------------------------------
/*
const size_t POOL_COUNT = 8;
//这是一个重度计算任务,只能单开线程来避免主线程被阻塞
auto async_heavy_computing_tasks(int64_t val)
{
using namespace std::chrono;
awaitable_t<int64_t> awaitable;
std::thread([val, st = awaitable._state]
{
std::this_thread::sleep_for(500ms);
st->set_value(val * val);
}).detach();
return awaitable;
}
future_vt heavy_computing_sequential(int64_t val)
{
for (size_t i = 0; i < 3; ++i)
{
{
scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << val << " @" << std::this_thread::get_id() << std::endl;
}
val = co_await async_heavy_computing_tasks(val);
}
{
scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << val << " @" << std::this_thread::get_id() << std::endl;
}
}
void test_use_single_thread(int64_t val)
{
//使用local_scheduler来申明一个绑定到本线程的调度器 my_scheduler
//后续在本线程运行的协程,通过this_scheduler()获得my_scheduler的地址
//从而将这些协程的所有操作都绑定到my_scheduler里面去调度
//实现一个协程始终绑定到一个线程的目的
//在同一个线程里,申明多个local_scheduler会怎么样?
//----我也不知道
//如果不申明my_scheduler,则this_scheduler()获得默认主调度器的地址
local_scheduler my_scheduler;
{
scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << "running in thread @" << std::this_thread::get_id() << std::endl;
}
go heavy_computing_sequential(val);
this_scheduler()->run_until_notask();
}
const size_t N = 2;
void test_use_multi_thread()
{
std::thread th_array[N];
for (size_t i = 0; i < N; ++i)
th_array[i] = std::thread(&test_use_single_thread, 4 + i);
test_use_single_thread(3);
for (auto & th : th_array)
th.join();
}
void resumable_main_multi_thread()
{
std::cout << "test_use_single_thread @" << std::this_thread::get_id() << std::endl << std::endl;
test_use_single_thread(2);
std::cout << std::endl;
std::cout << "test_use_multi_thread @" << std::this_thread::get_id() << std::endl << std::endl;
test_use_multi_thread();
//运行主调度器里面的协程
//但本范例不应该有协程存在,仅演示不要忽略了主调度器
scheduler::g_scheduler.run_until_notask();
}
void resumable_main_channel_mult_thread()
{
}
*/

+ 6
- 6
tutorial/test_async_multi_thread.cpp View File

#include "librf.h" #include "librf.h"
using namespace resumef; using namespace resumef;
std::mutex cout_mutex;
static std::mutex cout_mutex;
//这是一个重度计算任务,只能单开线程来避免主线程被阻塞 //这是一个重度计算任务,只能单开线程来避免主线程被阻塞
auto async_heavy_computing_tasks(int64_t val) auto async_heavy_computing_tasks(int64_t val)
} }
} }
void test_use_single_thread()
void test_use_single_thread(int64_t val)
{ {
//使用local_scheduler来申明一个绑定到本线程的调度器 my_scheduler //使用local_scheduler来申明一个绑定到本线程的调度器 my_scheduler
//后续在本线程运行的协程,通过this_scheduler()获得my_scheduler的地址 //后续在本线程运行的协程,通过this_scheduler()获得my_scheduler的地址
scoped_lock<std::mutex> __lock(cout_mutex); scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << "running in thread @" << std::this_thread::get_id() << std::endl; std::cout << "running in thread @" << std::this_thread::get_id() << std::endl;
} }
go heavy_computing_sequential(2);
go heavy_computing_sequential(val);
this_scheduler()->run_until_notask(); this_scheduler()->run_until_notask();
} }
{ {
std::thread th_array[N]; std::thread th_array[N];
for (size_t i = 0; i < N; ++i) for (size_t i = 0; i < N; ++i)
th_array[i] = std::thread(&test_use_single_thread);
th_array[i] = std::thread(&test_use_single_thread, 4 + i);
test_use_single_thread();
test_use_single_thread(3);
for (auto & th : th_array) for (auto & th : th_array)
th.join(); th.join();
void resumable_main_multi_thread() void resumable_main_multi_thread()
{ {
std::cout << "test_use_single_thread @" << std::this_thread::get_id() << std::endl << std::endl; std::cout << "test_use_single_thread @" << std::this_thread::get_id() << std::endl << std::endl;
test_use_single_thread();
test_use_single_thread(2);
std::cout << std::endl; std::cout << std::endl;
std::cout << "test_use_multi_thread @" << std::this_thread::get_id() << std::endl << std::endl; std::cout << "test_use_multi_thread @" << std::this_thread::get_id() << std::endl << std::endl;

+ 0
- 1
tutorial/test_async_mutex.cpp View File

#include <conio.h> #include <conio.h>
#include <thread> #include <thread>
#include <deque> #include <deque>
#include <mutex>
#include "librf.h" #include "librf.h"

+ 3
- 1
vs_proj/librf.cpp View File

extern void resumable_main_channel(); extern void resumable_main_channel();
extern void resumable_main_cb(); extern void resumable_main_cb();
extern void resumable_main_multi_thread(); extern void resumable_main_multi_thread();
extern void resumable_main_channel_mult_thread();
extern void resumable_main_benchmark_mem(); extern void resumable_main_benchmark_mem();
int main(int argc, const char * argv[]) int main(int argc, const char * argv[])
{ {
resumable_main_multi_thread();
resumable_main_channel_mult_thread();
//resumable_main_multi_thread();
return 0; return 0;
resumable_main_yield_return(); resumable_main_yield_return();

+ 4
- 1
vs_proj/librf.sln View File

 
Microsoft Visual Studio Solution File, Format Version 12.00 Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15 # Visual Studio 15
VisualStudioVersion = 15.0.26730.15
VisualStudioVersion = 15.0.27004.2006
MinimumVisualStudioVersion = 10.0.40219.1 MinimumVisualStudioVersion = 10.0.40219.1
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "librf", "librf.vcxproj", "{C1D4A6BD-592F-4E48-8178-7C87219BF80E}" Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "librf", "librf.vcxproj", "{C1D4A6BD-592F-4E48-8178-7C87219BF80E}"
EndProject EndProject
Global Global
GlobalSection(Performance) = preSolution
HasPerformanceSessions = true
EndGlobalSection
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|x64 = Debug|x64 Debug|x64 = Debug|x64
Debug|x86 = Debug|x86 Debug|x86 = Debug|x86

+ 16
- 3
vs_proj/librf.vcxproj View File

<ClCompile> <ClCompile>
<WarningLevel>Level3</WarningLevel> <WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization> <Optimization>Disabled</Optimization>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;RESUMEF_DEBUG_COUNTER=1;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;RESUMEF_DEBUG_COUNTER=0;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<SDLCheck>true</SDLCheck> <SDLCheck>true</SDLCheck>
<AdditionalIncludeDirectories>..\librf;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories> <AdditionalIncludeDirectories>..\librf;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalOptions>/await /std:c++latest </AdditionalOptions> <AdditionalOptions>/await /std:c++latest </AdditionalOptions>
<EnableCOMDATFolding>true</EnableCOMDATFolding> <EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences> <OptimizeReferences>true</OptimizeReferences>
<GenerateDebugInformation>true</GenerateDebugInformation> <GenerateDebugInformation>true</GenerateDebugInformation>
<Profile>true</Profile>
</Link> </Link>
</ItemDefinitionGroup> </ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<FunctionLevelLinking>true</FunctionLevelLinking> <FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions> <IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions> <PreprocessorDefinitions>NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<SDLCheck>true</SDLCheck>
<SDLCheck>
</SDLCheck>
<AdditionalIncludeDirectories>..\librf;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories> <AdditionalIncludeDirectories>..\librf;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalOptions>/await /std:c++latest </AdditionalOptions>
<AdditionalOptions>/await </AdditionalOptions>
<ExceptionHandling>Sync</ExceptionHandling>
<InlineFunctionExpansion>AnySuitable</InlineFunctionExpansion>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<OmitFramePointers>true</OmitFramePointers>
<EnableFiberSafeOptimizations>true</EnableFiberSafeOptimizations>
<StringPooling>true</StringPooling>
<BufferSecurityCheck>false</BufferSecurityCheck>
<LanguageStandard>stdcpplatest</LanguageStandard>
</ClCompile> </ClCompile>
<Link> <Link>
<SubSystem>Console</SubSystem> <SubSystem>Console</SubSystem>
<EnableCOMDATFolding>true</EnableCOMDATFolding> <EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences> <OptimizeReferences>true</OptimizeReferences>
<GenerateDebugInformation>true</GenerateDebugInformation> <GenerateDebugInformation>true</GenerateDebugInformation>
<LinkTimeCodeGeneration>UseLinkTimeCodeGeneration</LinkTimeCodeGeneration>
<Profile>true</Profile>
</Link> </Link>
</ItemDefinitionGroup> </ItemDefinitionGroup>
<ItemGroup> <ItemGroup>
<ClCompile Include="..\librf\src\timer.cpp" /> <ClCompile Include="..\librf\src\timer.cpp" />
<ClCompile Include="..\tutorial\test_async_cb.cpp" /> <ClCompile Include="..\tutorial\test_async_cb.cpp" />
<ClCompile Include="..\tutorial\test_async_channel.cpp" /> <ClCompile Include="..\tutorial\test_async_channel.cpp" />
<ClCompile Include="..\tutorial\test_async_channel_mult_thread.cpp" />
<ClCompile Include="..\tutorial\test_async_dynamic_go.cpp" /> <ClCompile Include="..\tutorial\test_async_dynamic_go.cpp" />
<ClCompile Include="..\tutorial\test_async_event.cpp" /> <ClCompile Include="..\tutorial\test_async_event.cpp" />
<ClCompile Include="..\tutorial\test_async_event_timeout.cpp" /> <ClCompile Include="..\tutorial\test_async_event_timeout.cpp" />

+ 3
- 0
vs_proj/librf.vcxproj.filters View File

<ClCompile Include="..\tutorial\test_async_multi_thread.cpp"> <ClCompile Include="..\tutorial\test_async_multi_thread.cpp">
<Filter>tutorial</Filter> <Filter>tutorial</Filter>
</ClCompile> </ClCompile>
<ClCompile Include="..\tutorial\test_async_channel_mult_thread.cpp">
<Filter>tutorial</Filter>
</ClCompile>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClInclude Include="..\librf\librf.h"> <ClInclude Include="..\librf\librf.h">

Loading…
Cancel
Save