Parcourir la source

整理多线程多调度器功能。

仍然没能正常调度
tags/v2.9.7
tearshark il y a 6 ans
Parent
révision
6c9f476658

+ 3
- 3
librf/src/future.h Voir le fichier

@@ -28,7 +28,7 @@ namespace resumef
bool await_ready()
{
return _state->_ready;
return _state->ready();
}
void await_suspend(coroutine_handle<> resume_cb)
{
@@ -43,7 +43,7 @@ namespace resumef
//if ready, can get value
bool ready()
{
return _state->_ready;
return _state->ready();
}
auto & get_value()
{
@@ -314,7 +314,7 @@ namespace resumef
bool await_ready()
{
return _state->_ready;
return _state->ready();
}
void await_suspend(coroutine_handle<> resume_cb)
{

+ 1
- 1
librf/src/rf_task.h Voir le fichier

@@ -122,7 +122,7 @@ namespace resumef
{
auto * _state = _future._state.get();
_state->resume();
return !_state->ready() && !_state->_done;
return !_state->ready() && !_state->done();
}
virtual void cancel() override
{

+ 2
- 6
librf/src/state.cpp Voir le fichier

@@ -56,12 +56,10 @@ namespace resumef
{
#if RESUMEF_ENABLE_MULT_SCHEDULER
auto sch_ = this->current_scheduler();
if (sch_ == nullptr)
sch_ = this_scheduler();
#else
auto sch_ = this_scheduler();
#endif
sch_->push_task_internal(new awaitable_task_t<state_base>(this));
if(sch_) sch_->push_task_internal(new awaitable_task_t<state_base>(this));
}
}
@@ -77,12 +75,10 @@ namespace resumef
{
#if RESUMEF_ENABLE_MULT_SCHEDULER
auto sch_ = this->current_scheduler();
if (sch_ == nullptr)
sch_ = this_scheduler();
#else
auto sch_ = this_scheduler();
#endif
sch_->push_task_internal(new awaitable_task_t<state_base>(this));
if (sch_) sch_->push_task_internal(new awaitable_task_t<state_base>(this));
}
}

+ 19
- 6
librf/src/state.h Voir le fichier

@@ -18,19 +18,20 @@ namespace resumef
RF_API void set_value_none_lock();
#if RESUMEF_ENABLE_MULT_SCHEDULER
private:
void * _this_promise = nullptr;
std::atomic<void *> _this_promise = nullptr;
scheduler * _current_scheduler = nullptr;
std::vector<counted_ptr<state_base>> _depend_states;
#endif
public:
protected:
coroutine_handle<> _coro;
std::atomic<intptr_t> _count = 0; // tracks reference count of state object
std::exception_ptr _exception;
bool _ready = false;
bool _cancellation = false;
bool _done = false;
std::atomic<bool> _ready = false;
std::atomic<bool> _cancellation = false;
std::atomic<bool> _done = false;
public:
state_base()
{
#if RESUMEF_DEBUG_COUNTER
@@ -61,9 +62,15 @@ namespace resumef
{
return _ready;
}
bool done() const
{
return _done;
}
void reset_none_lock()
{
scoped_lock<lock_type> __guard(_mtx);
_coro = nullptr;
_ready = false;
}
@@ -82,6 +89,8 @@ namespace resumef
}
void resume()
{
scoped_lock<lock_type> __guard(_mtx);
if (_coro)
{
#if RESUMEF_DEBUG_COUNTER
@@ -102,6 +111,7 @@ namespace resumef
#endif
auto coro = _coro;
_coro = nullptr;
coro();
}
}
@@ -120,7 +130,7 @@ namespace resumef
#if RESUMEF_ENABLE_MULT_SCHEDULER
promise_t<void> * parent_promise() const;
//scheduler * parent_scheduler() const;
scheduler * parent_scheduler() const;
void * this_promise() const
{
@@ -143,6 +153,7 @@ namespace resumef
void await_suspend(coroutine_handle<> resume_cb);
void final_suspend()
{
scoped_lock<lock_type> __guard(_mtx);
_done = true;
}
//以上是通过future_t/promise_t, 与编译器生成的resumable function交互的接口
@@ -191,10 +202,12 @@ namespace resumef
_value = value_type{};
}
#if RESUMEF_ENABLE_MULT_SCHEDULER
promise_t<_Ty> * parent_promise() const
{
return reinterpret_cast<promise_t<_Ty> *>(state_base::parent_promise());
}
#endif
};
template<>

+ 14
- 6
tutorial/test_async_channel_mult_thread.cpp Voir le fichier

@@ -14,6 +14,7 @@ using namespace resumef;
using namespace std::chrono;
static std::mutex cout_mutex;
std::atomic<intptr_t> gcounter = 0;
#define OUTPUT_DEBUG 0
@@ -24,6 +25,7 @@ future_vt test_channel_consumer(const channel_t<std::string> & c, size_t cnt)
try
{
auto val = co_await c.read();
++gcounter;
#if OUTPUT_DEBUG
{
scoped_lock<std::mutex> __lock(cout_mutex);
@@ -68,10 +70,11 @@ void resumable_main_channel_mult_thread()
std::thread write_th([&]
{
//local_scheduler my_scheduler; //2017/11/27日,仍然存在BUG。真多线程下调度,存在有协程无法被调度完成的BUG
local_scheduler my_scheduler; //2017/12/14日,仍然存在BUG。真多线程下调度,存在有协程无法被调度完成的BUG
go test_channel_producer(c, BATCH * N);
#if RESUMEF_ENABLE_MULT_SCHEDULER
this_scheduler()->run_until_notask();
#endif
std::cout << "Write OK\r\n";
});
@@ -82,18 +85,23 @@ void resumable_main_channel_mult_thread()
{
read_th[i] = std::thread([&]
{
//local_scheduler my_scheduler; //2017/11/27日,仍然存在BUG。真多线程下调度,存在有协程无法被调度完成的BUG
local_scheduler my_scheduler; //2017/12/14日,仍然存在BUG。真多线程下调度,存在有协程无法被调度完成的BUG
go test_channel_consumer(c, BATCH);
#if RESUMEF_ENABLE_MULT_SCHEDULER
this_scheduler()->run_until_notask();
#endif
std::cout << "Read OK\r\n";
});
}
#if !RESUMEF_ENABLE_MULT_SCHEDULER
std::this_thread::sleep_for(100ms);
scheduler::g_scheduler.run_until_notask();
#endif
for(auto & th : read_th)
th.join();
write_th.join();
std::cout << "OK" << std::endl;
std::cout << "OK: counter = " << gcounter.load() << std::endl;
_getch();
}

+ 1
- 1
vs_proj/librf.vcxproj Voir le fichier

@@ -100,7 +100,7 @@
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;RESUMEF_DEBUG_COUNTER=1;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;RESUMEF_DEBUG_COUNTER=0;RESUMEF_ENABLE_MULT_SCHEDULER=0;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<SDLCheck>true</SDLCheck>
<AdditionalIncludeDirectories>..\librf;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalOptions>/await /std:c++latest </AdditionalOptions>

Chargement…
Annuler
Enregistrer