Browse Source

增加切换调度器的功能

版本号升级到2.2.0
tags/v2.9.7
tearshark 4 years ago
parent
commit
f19966d85e

+ 1
- 0
librf/librf.h View File

@@ -40,6 +40,7 @@
#include "src/future.h"
#include "src/promise.h"
#include "src/awaitable.h"
#include "src/switch_scheduler.h"
#include "src/rf_task.h"
#include "src/utils.h"

+ 1
- 1
librf/src/def.h View File

@@ -1,6 +1,6 @@
#pragma once
#define LIB_RESUMEF_VERSION 200103 // 2.1.3
#define LIB_RESUMEF_VERSION 20200 // 2.2.0
#if defined(RESUMEF_MODULE_EXPORT)
#define RESUMEF_NS export namespace resumef

+ 1
- 1
librf/src/promise.inl View File

@@ -78,7 +78,7 @@ RESUMEF_NS


template<class _Ty>
inline void promise_t<_Ty>::return_value(_Ty val)
inline void promise_t<_Ty>::return_value(value_type val)
{
this->get_state()->set_value(std::move(val));
}

+ 5
- 0
librf/src/scheduler.h View File

@@ -56,6 +56,11 @@ RESUMEF_NS
void add_generator(state_base_t* sptr);
void del_final(state_base_t* sptr);
switch_scheduler_t operator co_await()
{
return { this };
}
friend struct task_base;
friend struct local_scheduler;
protected:

+ 23
- 0
librf/src/state.cpp View File

@@ -61,6 +61,12 @@ RESUMEF_NS
{
return _coro != nullptr;
}
bool state_generator_t::switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<>)
{
_scheduler = sch;
return true;
}
void state_future_t::resume()
{
@@ -119,6 +125,23 @@ RESUMEF_NS
sch->add_ready(this);
}
bool state_future_t::switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<> handler)
{
scoped_lock<lock_type> __guard(this->_mtx);
_scheduler = sch;
if (_parent != nullptr)
_parent->switch_scheduler_await_suspend(sch, nullptr);
if (handler != nullptr)
{
_coro = handler;
_scheduler->add_generator(this);
}
return true;
}
void state_t<void>::future_await_resume()
{
scoped_lock<lock_type> __guard(this->_mtx);

+ 3
- 0
librf/src/state.h View File

@@ -34,6 +34,7 @@ RESUMEF_NS
virtual void resume() = 0;
virtual bool has_handler() const = 0;
virtual bool is_ready() const = 0;
virtual bool switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<> handler) = 0;
void set_scheduler(scheduler_t* sch)
{
@@ -57,6 +58,7 @@ RESUMEF_NS
virtual void resume() override;
virtual bool has_handler() const override;
virtual bool is_ready() const override;
virtual bool switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<> handler) override;
static state_generator_t * _Alloc_state(coroutine_handle<> handler)
{
@@ -102,6 +104,7 @@ RESUMEF_NS
virtual void resume() override;
virtual bool has_handler() const override;
virtual bool is_ready() const override;
virtual bool switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<> handler) override;
scheduler_t* get_scheduler() const
{

+ 40
- 0
librf/src/switch_scheduler.h View File

@@ -0,0 +1,40 @@
#pragma once

RESUMEF_NS
{
struct switch_scheduler_t
{
switch_scheduler_t(scheduler_t* sch)
:_scheduler(sch) {}
switch_scheduler_t(const switch_scheduler_t&) = default;
switch_scheduler_t(switch_scheduler_t&&) = default;

switch_scheduler_t& operator = (const switch_scheduler_t&) = default;
switch_scheduler_t& operator = (switch_scheduler_t&&) = default;

bool await_ready()
{
return false;
}

template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void await_suspend(coroutine_handle<_PromiseT> handler)
{
_PromiseT& promise = handler.promise();

auto* sptr = promise.get_state();
sptr->switch_scheduler_await_suspend(_scheduler, handler);
}

void await_resume()
{
}
private:
scheduler_t* _scheduler;
};

inline switch_scheduler_t via(scheduler_t* sch)
{
return { sch };
}
}

+ 10
- 9
tutorial/test_async_cb.cpp View File

@@ -21,7 +21,7 @@ static void callback_get_long(int64_t val, _Ctype&& cb)
}
//这种情况下,没有生成 frame-context,因此,并没有promise_type被内嵌在frame-context里
future_t<int64_t> async_get_long(int64_t val)
static future_t<int64_t> async_get_long(int64_t val)
{
resumef::awaitable_t<int64_t> awaitable;
callback_get_long(val, [awaitable](int64_t val)
@@ -31,24 +31,25 @@ future_t<int64_t> async_get_long(int64_t val)
return awaitable.get_future();
}
future_t<> wait_get_long(int64_t val)
static future_t<int64_t> wait_get_long(int64_t val)
{
co_await async_get_long(val);
co_return co_await async_get_long(val);
}
//这种情况下,会生成对应的 frame-context,一个promise_type被内嵌在frame-context里
future_t<> resumable_get_long(int64_t val)
static future_t<int64_t> resumable_get_long(int64_t val)
{
std::cout << val << std::endl;
val = co_await async_get_long(val);
val = co_await wait_get_long(val);
std::cout << val << std::endl;
val = co_await async_get_long(val);
val = co_await wait_get_long(val);
std::cout << val << std::endl;
val = co_await async_get_long(val);
val = co_await wait_get_long(val);
std::cout << val << std::endl;
co_return val;
}
future_t<int64_t> loop_get_long(int64_t val)
static future_t<int64_t> loop_get_long(int64_t val)
{
std::cout << val << std::endl;
for (int i = 0; i < 5; ++i)
@@ -65,7 +66,7 @@ void resumable_main_cb()
GO
{
auto val = co_await loop_get_long(2);
auto val = co_await resumable_get_long(2);
std::cout << "GO:" << val << std::endl;
};

+ 85
- 0
tutorial/test_async_switch_scheduler.cpp View File

@@ -0,0 +1,85 @@

#include <chrono>
#include <iostream>
#include <string>
#include <conio.h>
#include <thread>

#include "librf.h"

using namespace resumef;

static scheduler_t* sch_in_main = nullptr;
static scheduler_t* sch_in_thread = nullptr;

void run_in_thread(channel_t<bool>& c_done)
{
std::cout << "other thread = " << std::this_thread::get_id() << std::endl;

local_scheduler my_scheduler;
sch_in_thread = this_scheduler();

c_done << true;

sch_in_thread->run();
}

template<class _Ctype>
static void callback_get_long(int64_t val, _Ctype&& cb)
{
using namespace std::chrono;
std::thread([val, cb = std::forward<_Ctype>(cb)]
{
std::this_thread::sleep_for(500ms);
cb(val * val);
}).detach();
}

//这种情况下,没有生成 frame-context,因此,并没有promise_type被内嵌在frame-context里
static future_t<int64_t> async_get_long(int64_t val)
{
resumef::awaitable_t<int64_t> awaitable;
callback_get_long(val, [awaitable](int64_t val)
{
awaitable.set_value(val);
});
return awaitable.get_future();
}

//这种情况下,会生成对应的 frame-context,一个promise_type被内嵌在frame-context里
static future_t<> resumable_get_long(int64_t val, channel_t<bool> & c_done)
{
std::cout << "thread = " << std::this_thread::get_id() << ", value = " << val << std::endl;

co_await via(sch_in_thread);
val = co_await async_get_long(val);
std::cout << "thread = " << std::this_thread::get_id() << ", value = " << val << std::endl;

co_await *sch_in_main;
val = co_await async_get_long(val);
std::cout << "thread = " << std::this_thread::get_id() << ", value = " << val << std::endl;

co_await *sch_in_thread;
val = co_await async_get_long(val);
std::cout << "thread = " << std::this_thread::get_id() << ", value = " << val << std::endl;

c_done << true;
}

void resumable_main_switch_scheduler()
{
sch_in_main = this_scheduler();
channel_t<bool> c_done{ 1 };

std::cout << "main thread = " << std::this_thread::get_id() << std::endl;
std::thread(&run_in_thread, std::ref(c_done)).detach();

GO
{
co_await c_done;
go resumable_get_long(3, c_done);
co_await c_done;
};

sch_in_main->run_until_notask();
}

+ 3
- 1
vs_proj/librf.cpp View File

@@ -23,6 +23,7 @@ extern void resumable_main_multi_thread();
extern void resumable_main_channel_mult_thread();
extern void resumable_main_when_all();
extern void resumable_main_layout();
extern void resumable_main_switch_scheduler();
extern void resumable_main_benchmark_mem();
extern void benchmark_main_channel_passing_next();
@@ -33,7 +34,7 @@ int main(int argc, const char* argv[])
{
(void)argc;
(void)argv;
benchmark_main_channel_passing_next();
resumable_main_switch_scheduler();
//if (argc > 1)
// resumable_main_benchmark_asio_client(atoi(argv[1]));
@@ -59,6 +60,7 @@ int main(int argc, const char* argv[])
//resumable_main_channel_mult_thread();
//resumable_main_sleep();
//resumable_main_when_all();
//resumable_main_switch_scheduler();
//benchmark_main_channel_passing_next();
return 0;

+ 2
- 0
vs_proj/librf.vcxproj View File

@@ -196,6 +196,7 @@
<ClCompile Include="..\tutorial\test_async_routine.cpp" />
<ClCompile Include="..\tutorial\test_async_sleep.cpp" />
<ClCompile Include="..\tutorial\test_async_suspend_always.cpp" />
<ClCompile Include="..\tutorial\test_async_switch_scheduler.cpp" />
<ClCompile Include="..\tutorial\test_async_timer.cpp" />
<ClCompile Include="..\tutorial\test_async_when_all.cpp" />
<ClCompile Include="..\tutorial\test_async_yield_return.cpp" />
@@ -223,6 +224,7 @@
<ClInclude Include="..\librf\src\sleep.h" />
<ClInclude Include="..\librf\src\spinlock.h" />
<ClInclude Include="..\librf\src\state.h" />
<ClInclude Include="..\librf\src\switch_scheduler.h" />
<ClInclude Include="..\librf\src\timer.h" />
<ClInclude Include="..\librf\src\unix\coroutine.h" />
<ClInclude Include="..\librf\src\utils.h" />

+ 6
- 0
vs_proj/librf.vcxproj.filters View File

@@ -112,6 +112,9 @@
<ClCompile Include="..\tutorial\test_async_memory_layout.cpp">
<Filter>tutorial</Filter>
</ClCompile>
<ClCompile Include="..\tutorial\test_async_switch_scheduler.cpp">
<Filter>tutorial</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\librf\librf.h">
@@ -180,6 +183,9 @@
<ClInclude Include="..\librf\librf_macro.h">
<Filter>librf</Filter>
</ClInclude>
<ClInclude Include="..\librf\src\switch_scheduler.h">
<Filter>librf\src</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<None Include="..\librf\src\asio_task_1.12.0.inl">

Loading…
Cancel
Save