Bläddra i källkod

使用宏来开关是否支持多线程多调度器

tags/v2.9.7
tearshark 6 år sedan
förälder
incheckning
9212bfcdf7

+ 4
- 4
librf/src/channel.h Visa fil

@@ -9,7 +9,7 @@ namespace resumef
template<class _Ty>
struct channel_impl : public std::enable_shared_from_this<channel_impl<_Ty>>
{
typedef _awaker<channel_impl<_Ty>, _Ty *, future_error> channel_read_awaker;
typedef _awaker<channel_impl<_Ty>, _Ty *, error_code> channel_read_awaker;
typedef std::shared_ptr<channel_read_awaker> channel_read_awaker_ptr;
typedef _awaker<channel_impl<_Ty>> channel_write_awaker;
@@ -63,7 +63,7 @@ namespace resumef
auto val = std::move(_values.front());
_values.pop_front();
r_awaker->awake(this, 1, &val, future_error::none);
r_awaker->awake(this, 1, &val, error_code::none);
ret_value = true;
}
else
@@ -111,7 +111,7 @@ namespace resumef
auto r_awaker = *iter;
iter = _read_awakes.erase(iter);
if (r_awaker->awake(this, 1, _values.size() ? &_values.front() : nullptr, future_error::read_before_write))
if (r_awaker->awake(this, 1, _values.size() ? &_values.front() : nullptr, error_code::read_before_write))
{
if(_values.size()) _values.pop_front();
@@ -186,7 +186,7 @@ namespace resumef
awaitable_t<_Ty> awaitable;
auto awaker = std::make_shared<channel_read_awaker>(
[st = awaitable._state](channel_impl_type *, _Ty * val, future_error fe) -> bool
[st = awaitable._state](channel_impl_type *, _Ty * val, error_code fe) -> bool
{
if(val)
st->set_value(std::move(*val));

+ 9
- 16
librf/src/def.h Visa fil

@@ -64,7 +64,7 @@ namespace resumef
#define _coro_promise_ptr(T) _coro_promise_ptr__<resumef::promise_t<T> >(_coro_frame_ptr())
enum struct future_error
enum struct error_code
{
none,
not_ready, // get_value called when value not available
@@ -75,13 +75,13 @@ namespace resumef
max__
};
const char * get_error_string(future_error fe, const char * classname);
const char * get_error_string(error_code fe, const char * classname);
//const char * future_error_string[size_t(future_error::max__)];
struct future_exception : std::exception
{
future_error _error;
future_exception(future_error fe)
error_code _error;
future_exception(error_code fe)
: exception(get_error_string(fe, "future_exception"))
, _error(fe)
{
@@ -90,8 +90,8 @@ namespace resumef
struct lock_exception : std::exception
{
future_error _error;
lock_exception(future_error fe)
error_code _error;
lock_exception(error_code fe)
: exception(get_error_string(fe, "lock_exception"))
, _error(fe)
{
@@ -100,8 +100,8 @@ namespace resumef
struct channel_exception : std::exception
{
future_error _error;
channel_exception(future_error fe)
error_code _error;
channel_exception(error_code fe)
: exception(get_error_string(fe, "channel_exception"))
, _error(fe)
{
@@ -112,14 +112,7 @@ namespace resumef
struct state_base;
//获得当前线程下的调度器
extern scheduler * this_scheduler();
//获得当前线程下,正在由调度器调度的协程
//extern state_base * this_coroutine();
//namespace detail
//{
// extern state_base * current_coroutine();
//}
scheduler * this_scheduler();
}
#define co_yield_void co_yield nullptr

+ 8
- 2
librf/src/future.h Visa fil

@@ -155,19 +155,25 @@ namespace resumef
promise_impl_t()
: _state(make_counted<state_type>())
{
#if RESUMEF_ENABLE_MULT_SCHEDULER
_state->this_promise(this);
#endif
}
promise_impl_t(promise_impl_t&& _Right)
: _state(std::move(_Right._state))
{
#if RESUMEF_ENABLE_MULT_SCHEDULER
_state->this_promise(this);
#endif
}
promise_impl_t & operator = (promise_impl_t&& _Right)
{
if (this != _Right)
{
_state = std::move(_Right._state);
#if RESUMEF_ENABLE_MULT_SCHEDULER
_state->this_promise(this);
#endif
}
return *this;
}
@@ -326,13 +332,13 @@ namespace resumef
using awaitable_vt = awaitable_t<void>;
#if RESUMEF_ENABLE_MULT_SCHEDULER
inline promise_t<void> * state_base::parent_promise() const
{
if (_coro) return _coro_promise_ptr__<promise_t<void>>(_coro.address());
return nullptr;
}
/*
inline scheduler * state_base::parent_scheduler() const
{
auto promise_ = parent_promise();
@@ -340,7 +346,7 @@ namespace resumef
return promise_->_state->current_scheduler();
return nullptr;
}
*/
#endif
}

+ 6
- 0
librf/src/rf_task.h Visa fil

@@ -19,7 +19,9 @@ namespace resumef
virtual bool go_next(scheduler *) = 0;
virtual void cancel() = 0;
virtual void * get_id() = 0;
#if RESUMEF_ENABLE_MULT_SCHEDULER
virtual void bind(scheduler *) = 0;
#endif
};
//----------------------------------------------------------------------------------------------
@@ -76,10 +78,12 @@ namespace resumef
{
return nullptr;
}
#if RESUMEF_ENABLE_MULT_SCHEDULER
virtual void bind(scheduler *) override
{
}
#endif
};
template<class _Ty>
@@ -128,11 +132,13 @@ namespace resumef
{
return _future._state.get();
}
#if RESUMEF_ENABLE_MULT_SCHEDULER
virtual void bind(scheduler * schdler) override
{
auto * _state = _future._state.get();
_state->current_scheduler(schdler);
}
#endif
};
//----------------------------------------------------------------------------------------------

+ 14
- 17
librf/src/scheduler.cpp Visa fil

@@ -11,7 +11,7 @@ std::atomic<intptr_t> g_resumef_evtctx_count = 0;
namespace resumef
{
static const char * future_error_string[(size_t)future_error::max__]
static const char * future_error_string[(size_t)error_code::max__]
{
"none",
"not_ready",
@@ -22,7 +22,7 @@ namespace resumef
static char sz_future_error_buffer[256];
const char * get_error_string(future_error fe, const char * classname)
const char * get_error_string(error_code fe, const char * classname)
{
if (classname)
{
@@ -32,6 +32,7 @@ namespace resumef
return future_error_string[(size_t)(fe)];
}
#if RESUMEF_ENABLE_MULT_SCHEDULER
thread_local scheduler * th_scheduler_ptr = nullptr;
//获得当前线程下的调度器
@@ -39,34 +40,26 @@ namespace resumef
{
return th_scheduler_ptr ? th_scheduler_ptr : &scheduler::g_scheduler;
}
#endif
//获得当前线程下,正在由调度器调度的协程
/*
namespace detail
{
state_base * current_coroutine()
{
scheduler * schdler = this_scheduler();
if (schdler->current_state)
return schdler->current_state;
return schdler->top_state();
}
}
*/
local_scheduler::local_scheduler()
{
#if RESUMEF_ENABLE_MULT_SCHEDULER
if (th_scheduler_ptr == nullptr)
{
_scheduler_ptr = new scheduler;
th_scheduler_ptr = _scheduler_ptr;
}
#endif
}
local_scheduler::~local_scheduler()
{
#if RESUMEF_ENABLE_MULT_SCHEDULER
if (th_scheduler_ptr == _scheduler_ptr)
th_scheduler_ptr = nullptr;
delete _scheduler_ptr;
#endif
}
scheduler::scheduler()
@@ -79,9 +72,10 @@ namespace resumef
scheduler::~scheduler()
{
cancel_all_task_();
#if RESUMEF_ENABLE_MULT_SCHEDULER
if (th_scheduler_ptr == this)
th_scheduler_ptr = nullptr;
#endif
}
void scheduler::new_task(task_base * task)
@@ -89,7 +83,9 @@ namespace resumef
if (task)
{
scoped_lock<std::recursive_mutex> __guard(_mtx_ready);
#if RESUMEF_ENABLE_MULT_SCHEDULER
task->bind(this);
#endif
this->_ready_task.push_back(task);
}
}
@@ -126,9 +122,10 @@ namespace resumef
void scheduler::run_one_batch()
{
#if RESUMEF_ENABLE_MULT_SCHEDULER
if (th_scheduler_ptr == nullptr)
th_scheduler_ptr = this;
#endif
{
scoped_lock<std::recursive_mutex> __guard(_mtx_task);

+ 9
- 0
librf/src/scheduler.h Visa fil

@@ -86,10 +86,19 @@ namespace resumef
local_scheduler & operator = (local_scheduler && right_) = delete;
local_scheduler(const local_scheduler &) = delete;
local_scheduler & operator = (const local_scheduler &) = delete;
#if RESUMEF_ENABLE_MULT_SCHEDULER
private:
scheduler * _scheduler_ptr;
#endif
};
//--------------------------------------------------------------------------------------------------
#if !RESUMEF_ENABLE_MULT_SCHEDULER
//获得当前线程下的调度器
inline scheduler * this_scheduler()
{
return &scheduler::g_scheduler;
}
#endif
#if !defined(_DISABLE_RESUMEF_GO_MACRO)
#define go (*::resumef::this_scheduler()) +

+ 16
- 12
librf/src/state.cpp Visa fil

@@ -33,9 +33,11 @@ namespace resumef
{
return _state.get();
}
#if RESUMEF_ENABLE_MULT_SCHEDULER
virtual void bind(scheduler * ) override
{
}
#endif
};
state_base::~state_base()
@@ -52,12 +54,13 @@ namespace resumef

if (_coro)
{
// auto sch_ = this->current_scheduler();
auto sch_ = this_scheduler();
/*
#if RESUMEF_ENABLE_MULT_SCHEDULER
auto sch_ = this->current_scheduler();
if (sch_ == nullptr)
sch_ = this_scheduler();
*/
#else
auto sch_ = this_scheduler();
#endif
sch_->push_task_internal(new awaitable_task_t<state_base>(this));
}
}
@@ -72,12 +75,13 @@ namespace resumef

if (_coro)
{
// auto sch_ = this->current_scheduler();
auto sch_ = this_scheduler();
/*
#if RESUMEF_ENABLE_MULT_SCHEDULER
auto sch_ = this->current_scheduler();
if (sch_ == nullptr)
sch_ = this_scheduler();
*/
#else
auto sch_ = this_scheduler();
#endif
sch_->push_task_internal(new awaitable_task_t<state_base>(this));
}
}
@@ -88,7 +92,7 @@ namespace resumef
_coro = resume_cb;
/*
#if RESUMEF_ENABLE_MULT_SCHEDULER
if (_current_scheduler == nullptr)
{
auto * promise_ = this->parent_promise();
@@ -107,12 +111,12 @@ namespace resumef
stptr->current_scheduler(_current_scheduler);
_depend_states.clear();
}
*/
#endif
}
#if RESUMEF_ENABLE_MULT_SCHEDULER
void state_base::current_scheduler(scheduler * sch_)
{
/*
scoped_lock<lock_type> __guard(_mtx);
_current_scheduler = sch_;
@@ -120,6 +124,6 @@ namespace resumef
for (auto & stptr : _depend_states)
stptr->current_scheduler(sch_);
_depend_states.clear();
*/
}
#endif
}

+ 16
- 6
librf/src/state.h Visa fil

@@ -16,10 +16,12 @@ namespace resumef
typedef std::recursive_mutex lock_type;
lock_type _mtx; //for value, _exception
RF_API void set_value_none_lock();
#if RESUMEF_ENABLE_MULT_SCHEDULER
private:
void * _this_promise = nullptr;
scheduler * _current_scheduler = nullptr;
std::vector<counted_ptr<state_base>> _depend_states;
#endif
public:
coroutine_handle<> _coro;
std::atomic<intptr_t> _count = 0; // tracks reference count of state object
@@ -84,13 +86,17 @@ namespace resumef
{
#if RESUMEF_DEBUG_COUNTER
{
scoped_lock<lock_type> __lock(g_resumef_cout_mutex);
scoped_lock<std::mutex> __lock(g_resumef_cout_mutex);
std::cout
<< "coro=" << _coro.address()
<< ",thread=" << std::this_thread::get_id()
#if RESUMEF_ENABLE_MULT_SCHEDULER
std::cout << "scheduler=" << current_scheduler()
<< ",coro=" << _coro.address()
<< ",scheduler=" << current_scheduler()
<< ",this_promise=" << this_promise()
<< ",parent_promise=" << parent_promise()
<< ",thread=" << std::this_thread::get_id()
#endif
<< std::endl;
}
#endif
@@ -112,6 +118,7 @@ namespace resumef
delete this;
}
#if RESUMEF_ENABLE_MULT_SCHEDULER
promise_t<void> * parent_promise() const;
//scheduler * parent_scheduler() const;
@@ -129,6 +136,7 @@ namespace resumef
return _current_scheduler;
}
void current_scheduler(scheduler * sch_);
#endif
//------------------------------------------------------------------------------------------
//以下是通过future_t/promise_t, 与编译器生成的resumable function交互的接口
@@ -173,7 +181,7 @@ namespace resumef
scoped_lock<lock_type> __guard(_mtx);
if (!_ready)
throw future_exception{ future_error::not_ready };
throw future_exception{ error_code::not_ready };
return _value;
}
void reset()
@@ -209,7 +217,7 @@ namespace resumef
scoped_lock<lock_type> __guard(_mtx);
if (!_ready)
throw future_exception{ future_error::not_ready };
throw future_exception{ error_code::not_ready };
}
void reset()
{
@@ -218,10 +226,12 @@ namespace resumef
reset_none_lock();
}
#if RESUMEF_ENABLE_MULT_SCHEDULER
promise_t<void> * parent_promise() const
{
return reinterpret_cast<promise_t<void> *>(state_base::parent_promise());
}
#endif
};
}

+ 7
- 7
tutorial/test_async_channel_mult_thread.cpp Visa fil

@@ -66,33 +66,33 @@ void resumable_main_channel_mult_thread()
{
channel_t<std::string> c(MAX_CHANNEL_QUEUE);
std::thread wth([&]
std::thread write_th([&]
{
//local_scheduler my_scheduler; //2017/11/27日,仍然存在BUG。真多线程下调度,存在有协程无法被调度完成的BUG
go test_channel_producer(c, BATCH * N);
this_scheduler()->run_until_notask();
std::cout << "Write OK" << std::endl;
std::cout << "Write OK\r\n";
});
//std::this_thread::sleep_for(100ms);
std::thread rth[N];
std::thread read_th[N];
for (size_t i = 0; i < N; ++i)
{
rth[i] = std::thread([&]
read_th[i] = std::thread([&]
{
//local_scheduler my_scheduler; //2017/11/27日,仍然存在BUG。真多线程下调度,存在有协程无法被调度完成的BUG
go test_channel_consumer(c, BATCH);
this_scheduler()->run_until_notask();
std::cout << "Read OK" << std::endl;
std::cout << "Read OK\r\n";
});
}
for(auto & th : rth)
for(auto & th : read_th)
th.join();
wth.join();
write_th.join();
std::cout << "OK" << std::endl;
_getch();

+ 1
- 1
vs_proj/librf.vcxproj Visa fil

@@ -100,7 +100,7 @@
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;RESUMEF_DEBUG_COUNTER=0;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;RESUMEF_DEBUG_COUNTER=1;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<SDLCheck>true</SDLCheck>
<AdditionalIncludeDirectories>..\librf;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalOptions>/await /std:c++latest </AdditionalOptions>

Laddar…
Avbryt
Spara