Browse Source

获得当前协程帮定的调度器

tags/v2.9.7
tearshark 4 years ago
parent
commit
c863f50ff1

+ 2
- 0
librf/librf.h View File

@@ -55,6 +55,8 @@
#include "src/state.inl"
#include "src/switch_scheduler.h"
#include "src/current_scheduler.h"
#include "src/_awaker.h"
#include "src/mutex.h"
#include "src/ring_queue.h"

+ 33
- 0
librf/src/current_scheduler.h View File

@@ -0,0 +1,33 @@
#pragma once
RESUMEF_NS
{
struct get_current_scheduler_awaitor
{
bool await_ready() const noexcept
{
return false;
}
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
_PromiseT& promise = handler.promise();
auto* state = promise.get_state();
this->_scheduler = state->get_scheduler();
return false;
}
scheduler_t* await_resume() const noexcept
{
return _scheduler;
}
private:
scheduler_t* _scheduler = nullptr;
};
inline get_current_scheduler_awaitor get_current_scheduler()
{
return {};
}
}

+ 1
- 1
librf/src/def.h View File

@@ -1,6 +1,6 @@
#pragma once
#define LIB_RESUMEF_VERSION 20500 // 2.5.0
#define LIB_RESUMEF_VERSION 20501 // 2.5.1
#if defined(RESUMEF_MODULE_EXPORT)
#define RESUMEF_NS export namespace resumef

+ 2
- 0
librf/src/macro_def.inl View File

@@ -12,3 +12,5 @@
#define go (*::resumef::this_scheduler()) +
#define GO (*::resumef::this_scheduler()) + [=]()mutable->resumef::future_t<>
#endif

#define current_scheduler() (co_await ::resumef::get_current_scheduler())

+ 4
- 0
librf/src/state.cpp View File

@@ -70,6 +70,8 @@ RESUMEF_NS
if (_scheduler != nullptr)
{
if (_scheduler == sch) return false;
auto task_ptr = _scheduler->del_switch(this);
_scheduler = sch;
@@ -138,6 +140,8 @@ RESUMEF_NS
if (_scheduler != nullptr)
{
if (_scheduler == sch) return false;
auto task_ptr = _scheduler->del_switch(this);
_scheduler = sch;

+ 2
- 2
librf/src/switch_scheduler.h View File

@@ -18,12 +18,12 @@ RESUMEF_NS
}

template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void await_suspend(coroutine_handle<_PromiseT> handler)
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
_PromiseT& promise = handler.promise();

auto* sptr = promise.get_state();
sptr->switch_scheduler_await_suspend(_scheduler, handler);
return sptr->switch_scheduler_await_suspend(_scheduler, handler);
}

void await_resume() noexcept

+ 41
- 19
tutorial/test_async_switch_scheduler.cpp View File

@@ -14,8 +14,6 @@ static std::atomic<scheduler_t*> sch_in_thread = nullptr;

void run_in_thread(channel_t<bool> c_done)
{
std::cout << "other thread = " << std::this_thread::get_id() << std::endl;

local_scheduler my_scheduler; //产生本线程唯一的调度器
sch_in_thread = this_scheduler(); //本线程唯一的调度器赋值给sch_in_thread,以便于后续测试直接访问此线程的调度器

@@ -33,43 +31,61 @@ void run_in_thread(channel_t<bool> c_done)
}

template<class _Ctype>
static void callback_get_long(int64_t val, _Ctype&& cb)
static void callback_get_long_switch_scheduler(int64_t val, _Ctype&& cb)
{
using namespace std::chrono;
std::thread([val, cb = std::forward<_Ctype>(cb)]
{
std::this_thread::sleep_for(500ms);
cb(val * val);
cb(val + 1);
}).detach();
}

//这种情况下,没有生成 frame-context,因此,并没有promise_type被内嵌在frame-context里
static future_t<int64_t> async_get_long(int64_t val)
static future_t<int64_t> async_get_long_switch_scheduler(int64_t val)
{
awaitable_t<int64_t> awaitable;
callback_get_long(val, [awaitable](int64_t val)
callback_get_long_switch_scheduler(val, [awaitable](int64_t result)
{
awaitable.set_value(val);
awaitable.set_value(result);
});
return awaitable.get_future();
}

//这种情况下,会生成对应的 frame-context,一个promise_type被内嵌在frame-context里
static future_t<> resumable_get_long(int64_t val, channel_t<bool> c_done)
static future_t<> resumable_get_long_switch_scheduler(int64_t val, channel_t<bool> c_done)
{
std::cout << "thread = " << std::this_thread::get_id() << ", value = " << val << std::endl;
std::cout << "thread = " << std::this_thread::get_id();
std::cout << ", scheduler = " << current_scheduler();
std::cout << ", value = " << val << std::endl;

co_await *sch_in_thread;
val = co_await async_get_long(val);
std::cout << "thread = " << std::this_thread::get_id() << ", value = " << val << std::endl;
val = co_await async_get_long_switch_scheduler(val);

std::cout << "thread = " << std::this_thread::get_id();
std::cout << ", scheduler = " << current_scheduler();
std::cout << ", value = " << val << std::endl;

co_await *sch_in_main;
val = co_await async_get_long(val);
std::cout << "thread = " << std::this_thread::get_id() << ", value = " << val << std::endl;
val = co_await async_get_long_switch_scheduler(val);

std::cout << "thread = " << std::this_thread::get_id();
std::cout << ", scheduler = " << current_scheduler();
std::cout << ", value = " << val << std::endl;

co_await *sch_in_thread;
val = co_await async_get_long(val);
std::cout << "thread = " << std::this_thread::get_id() << ", value = " << val << std::endl;
val = co_await async_get_long_switch_scheduler(val);

std::cout << "thread = " << std::this_thread::get_id();
std::cout << ", scheduler = " << current_scheduler();
std::cout << ", value = " << val << std::endl;

co_await *sch_in_thread; //fake switch
val = co_await async_get_long_switch_scheduler(val);

std::cout << "thread = " << std::this_thread::get_id();
std::cout << ", scheduler = " << current_scheduler();
std::cout << ", value = " << val << std::endl;

(void)c_done.write(true);
}
@@ -77,15 +93,21 @@ static future_t<> resumable_get_long(int64_t val, channel_t<bool> c_done)
void resumable_main_switch_scheduler()
{
sch_in_main = this_scheduler();
channel_t<bool> c_done{ 1 };

std::cout << "main thread = " << std::this_thread::get_id() << std::endl;
std::cout << "main thread = " << std::this_thread::get_id();
std::cout << ", scheduler = " << sch_in_main << std::endl;

channel_t<bool> c_done{ 1 };
std::thread other(&run_in_thread, std::ref(c_done));

GO
go[&other, c_done]()->future_t<>
{
co_await c_done; //第一次等待,等待run_in_thread准备好了
go resumable_get_long(3, c_done); //开启另外一个协程
std::cout << "other thread = " << other.get_id();
std::cout << ", sch_in_thread = " << sch_in_thread << std::endl;
go resumable_get_long_switch_scheduler(1, c_done); //开启另外一个协程
//co_await resumable_get_long(3, c_done);
co_await c_done; //等待新的协程运行完毕,从而保证主线程的协程不会提早退出
};

+ 9
- 1
tutorial/test_async_when_all.cpp View File

@@ -12,6 +12,14 @@
using namespace resumef;
template<class... _Fty>
auto when_all2(_Fty&&... f) -> future_t<std::tuple<detail::remove_future_vt<_Fty>...>>
{
using tuple_type = std::tuple<detail::remove_future_vt<_Fty>...>;
co_return co_await when_all(*current_scheduler(), std::forward<_Fty>(f)...);
}
void test_when_any()
{
using namespace std::chrono;
@@ -92,7 +100,7 @@ void test_when_all()
co_await when_all();
std::cout << "when all: zero!" << std::endl << std::endl;
auto ab = co_await when_all(my_sleep("a"), my_sleep_v("b"));
auto ab = co_await when_all2(my_sleep("a"), my_sleep_v("b"));
//ab.1 is std::ignore
std::cout << "when all:" << std::get<0>(ab) << std::endl << std::endl;

+ 3
- 1
vs_proj/librf.cpp View File

@@ -43,7 +43,9 @@ int main(int argc, const char* argv[])
//test_ring_queue<resumef::ring_queue_spinlock<int, false, uint32_t>>();
//test_ring_queue<resumef::ring_queue_lockfree<int, uint64_t>>();
resumable_main_event_v2();
resumable_main_switch_scheduler();
//resumable_main_when_all();
//resumable_main_event_v2();
return 0;
//if (argc > 1)

+ 1
- 0
vs_proj/librf.vcxproj View File

@@ -225,6 +225,7 @@
<ClInclude Include="..\librf\src\channel_v1.h" />
<ClInclude Include="..\librf\src\channel_v2.h" />
<ClInclude Include="..\librf\src\counted_ptr.h" />
<ClInclude Include="..\librf\src\current_scheduler.h" />
<ClInclude Include="..\librf\src\def.h" />
<ClInclude Include="..\librf\src\event.h" />
<ClInclude Include="..\librf\src\event_v1.h" />

+ 3
- 0
vs_proj/librf.vcxproj.filters View File

@@ -225,6 +225,9 @@
<ClInclude Include="..\librf\src\intrusive_link_queue.h">
<Filter>librf\src</Filter>
</ClInclude>
<ClInclude Include="..\librf\src\current_scheduler.h">
<Filter>librf\src</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<None Include="..\librf\src\asio_task_1.12.0.inl">

Loading…
Cancel
Save