Browse Source

深入理解协程的初始化过程

tags/v2.9.7
tearshark 4 years ago
parent
commit
256dbb8584
5 changed files with 401 additions and 25 deletions
  1. 15
    15
      librf/src/asio_task_1.12.0.inl
  2. 6
    0
      librf/src/future.h
  3. 15
    8
      tutorial/test_async_cb.cpp
  4. 363
    0
      vs_proj/librf.cpp
  5. 2
    2
      vs_proj/librf.vcxproj

+ 15
- 15
librf/src/asio_task_1.12.0.inl View File

@@ -49,7 +49,7 @@ namespace asio {

void operator()() const
{
state_->set_value();
this->state_->set_value();
}
};

@@ -61,9 +61,9 @@ namespace asio {
void operator()(const asio::error_code& ec) const
{
if (!ec)
state_->set_value();
this->state_->set_value();
else
state_->set_exception(std::make_exception_ptr(asio::system_error(ec)));
this->state_->set_exception(std::make_exception_ptr(asio::system_error(ec)));
}
};

@@ -75,9 +75,9 @@ namespace asio {
void operator()(std::exception_ptr ex) const
{
if (!ex)
state_->set_value();
this->state_->set_value();
else
state_->set_exception(ex);
this->state_->set_exception(ex);
}
};

@@ -91,7 +91,7 @@ namespace asio {
template <typename Arg>
void operator()(Arg&& arg) const
{
state_->set_value(std::forward<Arg>(arg));
this->state_->set_value(std::forward<Arg>(arg));
}
};

@@ -104,9 +104,9 @@ namespace asio {
void operator()(const asio::error_code& ec, Arg&& arg) const
{
if (!ec)
state_->set_value(std::forward<Arg>(arg));
this->state_->set_value(std::forward<Arg>(arg));
else
state_->set_exception(std::make_exception_ptr(asio::system_error(ec)));
this->state_->set_exception(std::make_exception_ptr(asio::system_error(ec)));
}
};

@@ -119,9 +119,9 @@ namespace asio {
void operator()(std::exception_ptr ex, Arg&& arg) const
{
if (!ex)
state_->set_value(std::forward<Arg>(arg));
this->state_->set_value(std::forward<Arg>(arg));
else
state_->set_exception(ex);
this->state_->set_exception(ex);
}
};

@@ -135,7 +135,7 @@ namespace asio {
template <typename... Args>
void operator()(Args&&... args) const
{
state_->set_value(std::make_tuple(std::forward<Args>(args)...));
this->state_->set_value(std::make_tuple(std::forward<Args>(args)...));
}
};

@@ -148,9 +148,9 @@ namespace asio {
void operator()(const asio::error_code& ec, Args&&... args) const
{
if (!ec)
state_->set_value(std::make_tuple(std::forward<Args>(args)...));
this->state_->set_value(std::make_tuple(std::forward<Args>(args)...));
else
state_->set_exception(std::make_exception_ptr(asio::system_error(ec)));
this->state_->set_exception(std::make_exception_ptr(asio::system_error(ec)));
}
};

@@ -163,9 +163,9 @@ namespace asio {
void operator()(std::exception_ptr ex, Args&&... args) const
{
if (!ex)
state_->set_value(std::make_tuple(std::forward<Args>(args)...));
this->state_->set_value(std::make_tuple(std::forward<Args>(args)...));
else
state_->set_exception(ex);
this->state_->set_exception(ex);
}
};


+ 6
- 0
librf/src/future.h View File

@@ -310,6 +310,12 @@ namespace resumef
//以上是与编译器生成的resumable function交互的接口
//------------------------------------------------------------------------------------------
//兼容std::promise<>用法
void set_value()
{
_state->set_value();
}
};
using promise_vt = promise_t<void>;

+ 15
- 8
tutorial/test_async_cb.cpp View File

@@ -7,22 +7,29 @@
#include "librf.h"
auto async_get_long(int64_t val)
template<class _Ctype>
void callback_get_long(int64_t val, _Ctype&& cb)
{
using namespace std::chrono;
std::thread([val, cb = std::forward<_Ctype>(cb)]
{
std::this_thread::sleep_for(500ms);
cb(val * val);
}).detach();
}
//这种情况下,没有生成 frame-context,因此,并没有promise_type被内嵌在frame-context里
auto async_get_long(int64_t val)
{
resumef::promise_t<int64_t> awaitable;
std::thread([val, st = awaitable._state]
callback_get_long(val, [st = awaitable._state](int64_t val)
{
std::this_thread::sleep_for(500ms);
st->set_value(val * val);
}).detach();
st->set_value(val);
});
return awaitable.get_future();
}
//这种情况下,会生成对应的 frame-context,一个promise_type被内嵌在frame-context里
resumef::future_vt resumable_get_long(int64_t val)
{
std::cout << val << std::endl;

+ 363
- 0
vs_proj/librf.cpp View File

@@ -1,5 +1,8 @@

#include "librf.h"
#include <experimental/resumable>
#include <experimental/generator>
#include <optional>
extern void resumable_main_yield_return();
extern void resumable_main_timer();
@@ -23,8 +26,368 @@ extern void resumable_main_benchmark_mem();
extern void resumable_main_benchmark_asio_server();
extern void resumable_main_benchmark_asio_client(intptr_t nNum);
namespace coro = std::experimental;
namespace librf2
{
struct scheduler_t;
template<class _Ty = void>
struct future_t;
template<class _Ty = void>
struct promise_t;
template<class _PromiseT>
struct is_promise : std::false_type {};
template<class _Ty>
struct is_promise<promise_t<_Ty>> : std::true_type {};
template<class _Ty>
_INLINE_VAR constexpr bool is_promise_v = is_promise<_Ty>::value;
struct state_base_t
{
scheduler_t* _scheduler = nullptr;
coro::coroutine_handle<> _coro;
std::exception_ptr _exception;
virtual ~state_base_t() {}
virtual bool has_value() const = 0;
void resume()
{
auto handler = _coro;
_coro = nullptr;
handler();
}
};
template<class _Ty = void>
struct state_t : public state_base_t
{
using value_type = _Ty;
std::optional<value_type> _value;
virtual bool has_value() const override
{
return _value.has_value();
}
};
template<>
struct state_t<void> : public state_base_t
{
bool _has_value = false;
virtual bool has_value() const override
{
return _has_value;
}
};
struct scheduler_t
{
using state_sptr = std::shared_ptr<state_base_t>;
using state_vector = std::vector<state_sptr>;
private:
state_vector _runing_states;
public:
void add_initial(state_sptr sptr)
{
sptr->_scheduler = this;
assert(sptr->_coro != nullptr);
_runing_states.emplace_back(std::move(sptr));
}
void add_await(state_sptr sptr, coro::coroutine_handle<> handler)
{
sptr->_scheduler = this;
sptr->_coro = handler;
if (sptr->has_value() || sptr->_exception != nullptr)
_runing_states.emplace_back(std::move(sptr));
}
void add_ready(state_sptr sptr)
{
assert(sptr->_scheduler == this);
if (sptr->_coro != nullptr)
_runing_states.emplace_back(std::move(sptr));
}
void run()
{
for (;;)
{
state_vector states = std::move(_runing_states);
for (state_sptr& sptr : states)
sptr->resume();
}
}
};
template<class _Ty>
struct future_t
{
using value_type = _Ty;
using state_type = state_t<value_type>;
using promise_type = promise_t<value_type>;
using future_type = future_t<value_type>;
std::shared_ptr<state_type> _state;
future_t(std::shared_ptr<state_type> _st)
:_state(std::move(_st)) {}
future_t(const future_t&) = default;
future_t(future_t&&) = default;
future_t& operator = (const future_t&) = default;
future_t& operator = (future_t&&) = default;
bool await_ready()
{
return _state->has_value();
}
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void await_suspend(coro::coroutine_handle<_PromiseT> handler)
{
_PromiseT& promise = handler.promise();
scheduler_t * sch = promise._state->_scheduler;
sch->add_await(_state, handler);
}
value_type await_resume()
{
if (_state->_exception)
std::rethrow_exception(std::move(_state->_exception));
return std::move(_state->_value.value());
}
void resume() const
{
auto coro_handle = _state->_coro;
_state->_coro = nullptr;
coro_handle();
}
};
struct suspend_on_initial
{
std::shared_ptr<state_base_t> _state;
bool await_ready() noexcept
{
return false;
}
void await_suspend(coro::coroutine_handle<> handler) noexcept
{
_state->_coro = handler;
}
void await_resume() noexcept
{
}
};
template<class _Ty>
struct promise_impl_t
{
using value_type = _Ty;
using state_type = state_t<value_type>;
using promise_type = promise_t<value_type>;
using future_type = future_t<value_type>;
std::shared_ptr<state_type> _state = std::make_shared<state_type>();
promise_impl_t() {}
promise_impl_t(const promise_impl_t&) = delete;
promise_impl_t(promise_impl_t&&) = delete;
promise_impl_t& operator = (const promise_impl_t&) = delete;
promise_impl_t& operator = (promise_impl_t&&) = delete;
auto initial_suspend() noexcept
{
return suspend_on_initial{ _state };
}
auto final_suspend() noexcept
{
return coro::suspend_never{};
}
void set_exception(std::exception_ptr e)
{
_state->_exception = std::move(e);
if (_state->_scheduler != nullptr)
_state->_scheduler->add_ready(_state);
}
future_t<value_type> get_return_object()
{
return { _state };
}
void cancellation_requested()
{
}
};
template<class _Ty>
struct promise_t : public promise_impl_t<_Ty>
{
void return_value(value_type val)
{
_state->_value = std::move(val);
if (_state->_scheduler != nullptr)
_state->_scheduler->add_ready(_state);
}
void yield_value(value_type val)
{
_state->_value = std::move(val);
if (_state->_scheduler != nullptr)
_state->_scheduler->add_ready(_state);
}
};
template<>
struct promise_t<void> : public promise_impl_t<void>
{
void return_void()
{
_state->_has_value = true;
if (_state->_scheduler != nullptr)
_state->_scheduler->add_ready(_state);
}
void yield_value()
{
_state->_has_value = true;
if (_state->_scheduler != nullptr)
_state->_scheduler->add_ready(_state);
}
};
template<class _Ty>
struct awaitable_t
{
using value_type = _Ty;
using state_type = state_t<value_type>;
using future_type = future_t<value_type>;
private:
mutable std::shared_ptr<state_type> _state = std::make_shared<state_type>();
public:
awaitable_t() {}
awaitable_t(const awaitable_t&) = default;
awaitable_t(awaitable_t&&) = default;
awaitable_t& operator = (const awaitable_t&) = default;
awaitable_t& operator = (awaitable_t&&) = default;
void set_value(value_type value) const
{
_state->_value = std::move(value);
if (_state->_scheduler != nullptr)
_state->_scheduler->add_ready(_state);
_state = nullptr;
}
void set_exception(std::exception_ptr e)
{
_state->_exception = std::move(e);
if (_state->_scheduler != nullptr)
_state->_scheduler->add_ready(_state);
_state = nullptr;
}
future_type get_future()
{
return future_type{ _state };
}
};
template<>
struct awaitable_t<void>
{
using value_type = void;
using state_type = state_t<>;
using future_type = future_t<>;
mutable std::shared_ptr<state_type> _state = std::make_shared<state_type>();
awaitable_t() {}
awaitable_t(const awaitable_t&) = default;
awaitable_t(awaitable_t&&) = default;
awaitable_t& operator = (const awaitable_t&) = default;
awaitable_t& operator = (awaitable_t&&) = default;
void set_value() const
{
_state->_has_value = true;
if (_state->_scheduler != nullptr)
_state->_scheduler->add_ready(_state);
_state = nullptr;
}
void set_exception(std::exception_ptr e)
{
_state->_exception = std::move(e);
if (_state->_scheduler != nullptr)
_state->_scheduler->add_ready(_state);
_state = nullptr;
}
future_type get_future()
{
return future_type{ _state };
}
};
}
void async_get_long(int64_t val, std::function<void(int64_t)> cb)
{
using namespace std::chrono;
std::thread([val, cb = std::move(cb)]
{
std::this_thread::sleep_for(10s);
cb(val * val);
}).detach();
}
//这种情况下,没有生成 frame-context,因此,并没有promise_type被内嵌在frame-context里
librf2::future_t<int64_t> co_get_long(int64_t val)
{
librf2::awaitable_t<int64_t > st;
std::cout << "co_get_long@1" << std::endl;
async_get_long(val, [st](int64_t value)
{
std::cout << "co_get_long@2" << std::endl;
st.set_value(value);
});
std::cout << "co_get_long@3" << std::endl;
return st.get_future();
}
//这种情况下,会生成对应的 frame-context,一个promise_type被内嵌在frame-context里
librf2::future_t<> test_librf2()
{
auto f = co_await co_get_long(2);
std::cout << f << std::endl;
}
int main(int argc, const char* argv[])
{
librf2::scheduler_t sch;
librf2::future_t<> ft = test_librf2();
sch.add_initial(ft._state);
sch.run();
resumable_main_resumable();
//resumable_main_benchmark_mem();
/*

+ 2
- 2
vs_proj/librf.vcxproj View File

@@ -29,7 +29,7 @@
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v142</PlatformToolset>
<PlatformToolset>v141</PlatformToolset>
<CharacterSet>NotSet</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
@@ -42,7 +42,7 @@
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v140</PlatformToolset>
<PlatformToolset>v141</PlatformToolset>
<CharacterSet>NotSet</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration">

Loading…
Cancel
Save