Browse Source

完善切换调度器的功能

tags/v2.9.7
tearshark 4 years ago
parent
commit
9ae5ee31f3
4 changed files with 89 additions and 12 deletions
  1. 33
    0
      librf/src/scheduler.cpp
  2. 2
    0
      librf/src/scheduler.h
  3. 32
    3
      librf/src/state.cpp
  4. 22
    9
      tutorial/test_async_switch_scheduler.cpp

+ 33
- 0
librf/src/scheduler.cpp View File

@@ -143,6 +143,30 @@ RESUMEF_NS
}
}
std::unique_ptr<task_base_t> scheduler_t::del_switch(state_base_t* sptr)
{
scoped_lock<spinlock> __guard(_lock_ready);
std::unique_ptr<task_base_t> task_ptr;
auto iter = this->_ready_task.find(sptr);
if (iter != this->_ready_task.end())
{
task_ptr = std::move(iter->second);
this->_ready_task.erase(iter);
}
return task_ptr;
}
void scheduler_t::add_switch(std::unique_ptr<task_base_t> task)
{
state_base_t* sptr = task->get_state();
scoped_lock<spinlock> __guard(_lock_ready);
this->_ready_task.emplace(sptr, std::move(task));
}
/*
void scheduler_t::cancel_all_task_()
{
@@ -169,6 +193,9 @@ RESUMEF_NS
{
scoped_lock<lock_type> __guard(_lock_running);
if (_runing_states.empty())
return;
std::swap(_cached_states, _runing_states);
}
@@ -181,13 +208,19 @@ RESUMEF_NS
void scheduler_t::run_until_notask()
{
while (!this->empty())
{
this->run_one_batch();
std::this_thread::yield();
}
}
void scheduler_t::run()
{
for (;;)
{
this->run_one_batch();
std::this_thread::yield();
}
}
scheduler_t scheduler_t::g_scheduler;

+ 2
- 0
librf/src/scheduler.h View File

@@ -55,6 +55,8 @@ RESUMEF_NS
void add_ready(state_base_t* sptr);
void add_generator(state_base_t* sptr);
void del_final(state_base_t* sptr);
std::unique_ptr<task_base_t> del_switch(state_base_t* sptr);
void add_switch(std::unique_ptr<task_base_t> task);
switch_scheduler_t operator co_await()
{

+ 32
- 3
librf/src/state.cpp View File

@@ -64,7 +64,23 @@ RESUMEF_NS
bool state_generator_t::switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<>)
{
_scheduler = sch;
assert(sch != nullptr);
if (_scheduler != nullptr)
{
auto task_ptr = _scheduler->del_switch(this);
_scheduler = sch;
if (task_ptr != nullptr)
sch->add_switch(std::move(task_ptr));
}
else
{
_scheduler = sch;
}
sch->add_generator(this);
return true;
}
@@ -127,16 +143,29 @@ RESUMEF_NS
bool state_future_t::switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<> handler)
{
assert(sch != nullptr);
scoped_lock<lock_type> __guard(this->_mtx);
_scheduler = sch;
if (_scheduler != nullptr)
{
auto task_ptr = _scheduler->del_switch(this);
_scheduler = sch;
if (task_ptr != nullptr)
sch->add_switch(std::move(task_ptr));
}
else
{
_scheduler = sch;
}
if (_parent != nullptr)
_parent->switch_scheduler_await_suspend(sch, nullptr);
if (handler != nullptr)
{
_coro = handler;
_scheduler->add_generator(this);
sch->add_generator(this);
}
return true;

+ 22
- 9
tutorial/test_async_switch_scheduler.cpp View File

@@ -10,18 +10,26 @@
using namespace resumef;

static scheduler_t* sch_in_main = nullptr;
static scheduler_t* sch_in_thread = nullptr;
static std::atomic<scheduler_t*> sch_in_thread = nullptr;

void run_in_thread(channel_t<bool>& c_done)
{
std::cout << "other thread = " << std::this_thread::get_id() << std::endl;

local_scheduler my_scheduler;
sch_in_thread = this_scheduler();
local_scheduler my_scheduler; //产生本线程唯一的调度器
sch_in_thread = this_scheduler(); //本线程唯一的调度器赋值给sch_in_thread,以便于后续测试直接访问此线程的调度器

c_done << true;
c_done << true; //数据都准备好了,通过channel通知其他协程可以启动后续依赖sch_in_thread变量的协程了

sch_in_thread->run();
//循环直到sch_in_thread为nullptr
for (;;)
{
auto sch = sch_in_thread.load(std::memory_order::acquire);
if (sch == nullptr)
break;
sch->run_one_batch();
std::this_thread::yield();
}
}

template<class _Ctype>
@@ -72,14 +80,19 @@ void resumable_main_switch_scheduler()
channel_t<bool> c_done{ 1 };

std::cout << "main thread = " << std::this_thread::get_id() << std::endl;
std::thread(&run_in_thread, std::ref(c_done)).detach();
std::thread other(&run_in_thread, std::ref(c_done));

GO
{
co_await c_done;
go resumable_get_long(3, c_done);
co_await c_done;
co_await c_done; //第一次等待,等待run_in_thread准备好了
go resumable_get_long(3, c_done); //开启另外一个协程
//co_await resumable_get_long(3, c_done);
co_await c_done; //等待新的协程运行完毕,从而保证主线程的协程不会提早退出
};

sch_in_main->run_until_notask();

//通知另外一个线程退出
sch_in_thread.store(nullptr, std::memory_order_release);
other.join();
}

Loading…
Cancel
Save