Browse Source

将state的操作封装在其内部,以便于检查是否加锁而从保证了线程安全

tags/v2.9.7
tearshark 4 years ago
parent
commit
3575506740

+ 1
- 0
librf/librf.h View File

@@ -21,6 +21,7 @@
#include "src/channel.h"
#include "src/scheduler.h"
#include "src/promise.inl"
#include "src/state.inl"
#include "src/sleep.h"
#include "src/awaitable.h"

+ 10
- 33
librf/src/future.h View File

@@ -6,61 +6,38 @@
namespace resumef
{
template<class _Ty>
struct future_impl_t
struct future_t
{
using value_type = _Ty;
using state_type = state_t<value_type>;
using promise_type = promise_t<value_type>;
using future_type = future_t<value_type>;
using lock_type = typename state_type::lock_type;
counted_ptr<state_type> _state;
future_impl_t(counted_ptr<state_type> _st)
future_t(counted_ptr<state_type> _st)
:_state(std::move(_st)) {}
future_impl_t(const future_impl_t&) = default;
future_impl_t(future_impl_t&&) = default;
future_t(const future_t&) = default;
future_t(future_t&&) = default;
future_impl_t& operator = (const future_impl_t&) = default;
future_impl_t& operator = (future_impl_t&&) = default;
future_t& operator = (const future_t&) = default;
future_t& operator = (future_t&&) = default;
bool await_ready()
{
return _state->has_value();
return _state->future_await_ready();
}
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void await_suspend(coroutine_handle<_PromiseT> handler)
{
_PromiseT& promise = handler.promise();
scheduler_t* sch = promise._state->_scheduler;
sch->add_await(_state.get(), handler);
_state->future_await_suspend(handler);
}
};
template<class _Ty>
struct future_t : public future_impl_t<_Ty>
{
using future_impl_t::future_impl_t;
using future_impl_t::_state;
value_type await_resume()
{
if (_state->_exception)
std::rethrow_exception(std::move(_state->_exception));
return std::move(_state->_value.value());
}
};
template<>
struct future_t<void> : public future_impl_t<void>
{
using future_impl_t::future_impl_t;
using future_impl_t::_state;
void await_resume()
{
if (_state->_exception)
std::rethrow_exception(std::move(_state->_exception));
return _state->future_await_resume();
}
};
}

+ 67
- 21
librf/src/scheduler.cpp View File

@@ -63,15 +63,15 @@ namespace resumef
}
scheduler_t::scheduler_t()
: _task()
, _ready_task()
, _timer(std::make_shared<timer_manager>())
: _timer(std::make_shared<timer_manager>())
{
_runing_states.reserve(1024);
_cached_states.reserve(1024);
}
scheduler_t::~scheduler_t()
{
cancel_all_task_();
//cancel_all_task_();
#if RESUMEF_ENABLE_MULT_SCHEDULER
if (th_scheduler_ptr == this)
th_scheduler_ptr = nullptr;
@@ -80,34 +80,78 @@ namespace resumef
void scheduler_t::new_task(task_base_t * task)
{
if (task)
state_base_t* sptr = task->get_state();
{
scoped_lock<spinlock> __guard(_mtx_ready);
scoped_lock<spinlock> __guard(_lock_ready);
this->_ready_task.emplace(sptr, task);
}
//如果是单独的future,没有被co_await过,则handler是nullptr。
if (sptr->get_handler() != nullptr)
this->add_initial(sptr);
else
sptr->set_scheduler(this);
}
void scheduler_t::add_initial(state_base_t* sptr)
{
sptr->set_scheduler(this);
this->_ready_task.push_back(task);
this->add_initial(task->get_state());
scoped_lock<lock_type> __guard(_lock_running);
_runing_states.emplace_back(sptr);
}
void scheduler_t::add_await(state_base_t* sptr, coroutine_handle<> handler)
{
sptr->set_scheduler(this);
sptr->set_handler(handler);
if (sptr->is_ready())
{
scoped_lock<lock_type> __guard(_lock_running);
_runing_states.emplace_back(sptr);
}
}
void scheduler_t::cancel_all_task_()
void scheduler_t::add_ready(state_base_t* sptr)
{
assert(sptr->get_scheduler() == this);
if (sptr->get_handler() != nullptr)
{
scoped_lock<lock_type> __guard(_mtx_task);
this->_task.clear(true);
scoped_lock<lock_type> __guard(_lock_running);
_runing_states.emplace_back(sptr);
}
else
{
scoped_lock<spinlock> __guard(_mtx_ready);
this->_ready_task.clear(true);
scoped_lock<spinlock> __guard(_lock_ready);
this->_ready_task.erase(sptr);
}
}
void scheduler_t::del_final(state_base_t* sptr)
{
assert(sptr->get_scheduler() == this);
scoped_lock<spinlock> __guard(_lock_ready);
this->_ready_task.erase(sptr);
}
/*
void scheduler_t::cancel_all_task_()
{
scoped_lock<spinlock, lock_type> __guard(_lock_ready, _lock_running);
this->_ready_task.clear();
this->_runing_states.clear();
}
void scheduler_t::break_all()
{
cancel_all_task_();
scoped_lock<lock_type> __guard(_mtx_task);
this->_timer->clear();
}
*/
void scheduler_t::run_one_batch()
{
@@ -115,15 +159,17 @@ namespace resumef
if (th_scheduler_ptr == nullptr)
th_scheduler_ptr = this;
#endif
this->_timer->update();
{
scoped_lock<lock_type> __guard(_mtx_task);
scoped_lock<lock_type> __guard(_lock_running);
std::swap(_cached_states, _runing_states);
}
this->_timer->update();
for (state_sptr& sptr : _cached_states)
sptr->resume();
state_vector states = std::move(_runing_states);
for (state_sptr& sptr : states)
sptr->resume();
}
_cached_states.clear();
}
void scheduler_t::run_until_notask()

+ 19
- 38
librf/src/scheduler.h View File

@@ -16,28 +16,29 @@ namespace resumef
struct scheduler_t : public std::enable_shared_from_this<scheduler_t>
{
using state_sptr = std::shared_ptr<state_base_t>;
using state_sptr = counted_ptr<state_base_t>;
using state_vector = std::vector<state_sptr>;
private:
state_vector _runing_states;
private:
//typedef spinlock lock_type;
typedef std::recursive_mutex lock_type;
using lock_type = std::recursive_mutex;
using task_dictionary_type = std::unordered_map<state_base_t*, task_base_t*>;
mutable lock_type _lock_running;
state_vector _runing_states;
state_vector _cached_states;
mutable spinlock _mtx_ready;
task_list _ready_task;
mutable spinlock _lock_ready;
task_dictionary_type _ready_task;
mutable lock_type _mtx_task;
task_list _task;
timer_mgr_ptr _timer;
timer_mgr_ptr _timer;
RF_API void new_task(task_base_t * task);
void cancel_all_task_();
//void cancel_all_task_();
public:
RF_API void run_one_batch();
RF_API void run_until_notask();
RF_API void run();
//RF_API void break_all();
template<class _Ty, typename = std::enable_if_t<std::is_callable_v<_Ty> || is_future_v<_Ty>>>
inline void operator + (_Ty && t_)
@@ -55,42 +56,22 @@ namespace resumef
inline bool empty() const
{
scoped_lock<spinlock, lock_type> __guard(_mtx_ready, _mtx_task);
return this->_task.empty() && this->_ready_task.empty() && this->_timer->empty();
scoped_lock<spinlock> __guard(_lock_ready);
return this->_ready_task.empty() && this->_timer->empty();
}
RF_API void break_all();
inline timer_manager * timer() const
{
return _timer.get();
}
void add_initial(state_base_t* sptr);
void add_await(state_base_t* sptr, coroutine_handle<> handler);
void add_ready(state_base_t* sptr);
void del_final(state_base_t* sptr);
friend struct task_base;
friend struct local_scheduler;
void add_initial(state_base_t* sptr)
{
sptr->_scheduler = this;
assert(sptr->_coro != nullptr);
_runing_states.emplace_back(sptr);
}
void add_await(state_base_t* sptr, coroutine_handle<> handler)
{
sptr->_scheduler = this;
sptr->_coro = handler;
if (sptr->has_value() || sptr->_exception != nullptr)
_runing_states.emplace_back(sptr);
}
void add_ready(state_base_t* sptr)
{
assert(sptr->_scheduler == this);
if (sptr->_coro != nullptr)
_runing_states.emplace_back(sptr);
}
protected:
RF_API scheduler_t();
public:

+ 36
- 0
librf/src/state.cpp View File

@@ -5,7 +5,43 @@
namespace resumef
{
std::atomic<intptr_t> g_resumef_static_count = {0};
state_base_t::~state_base_t()
{
}
void state_base_t::promise_final_suspend()
{
scoped_lock<lock_type> __guard(this->_mtx);
scheduler_t* _scheduler = this->_scheduler;
if (_scheduler != nullptr)
_scheduler->del_final(this);
}
void state_base_t::set_exception(std::exception_ptr e)
{
scoped_lock<lock_type> __guard(this->_mtx);
this->_exception = std::move(e);
if (this->_scheduler != nullptr)
this->_scheduler->add_ready(this);
}
void state_t<void>::future_await_resume()
{
scoped_lock<lock_type> __guard(this->_mtx);
if (this->_exception)
std::rethrow_exception(std::move(this->_exception));
}
void state_t<void>::set_value()
{
scoped_lock<lock_type> __guard(this->_mtx);
this->_has_value = true;
if (this->_scheduler != nullptr)
this->_scheduler->add_ready(this);
}
}

+ 78
- 4
librf/src/state.h View File

@@ -8,18 +8,38 @@
namespace resumef
{
extern std::atomic<intptr_t> g_resumef_static_count;
struct state_base_t : public counted_t<state_base_t>
{
typedef std::recursive_mutex lock_type;
lock_type _mtx;
scheduler_t * _scheduler = nullptr;
protected:
mutable lock_type _mtx;
scheduler_t* _scheduler = nullptr;
//可能来自协程里的promise产生的,则经过co_await操作后,_coro在初始时不会为nullptr。
//也可能来自awaitable_t,如果
// 一、经过co_await操作后,_coro在初始时不会为nullptr。
// 二、没有co_await操作,直接加入到了调度器里,则_coro在初始时为nullptr。调度器需要特殊处理此种情况。
coroutine_handle<> _coro;
std::exception_ptr _exception;
public:
intptr_t _id;
state_base_t* _parent = nullptr;
state_base_t()
{
_id = ++g_resumef_static_count;
}
RF_API virtual ~state_base_t();
virtual bool has_value() const = 0;
bool is_ready() const
{
return has_value() && _exception != nullptr;
}
void resume()
{
scoped_lock<lock_type> __guard(_mtx);
@@ -28,28 +48,82 @@ namespace resumef
_coro = nullptr;
handler();
}
void set_handler(coroutine_handle<> handler)
{
scoped_lock<lock_type> __guard(_mtx);
assert(_coro == nullptr);
_coro = handler;
}
coroutine_handle<> get_handler() const
{
return _coro;
}
void set_scheduler(scheduler_t* sch)
{
scoped_lock<lock_type> __guard(_mtx);
_scheduler = sch;
}
scheduler_t* get_scheduler() const
{
return _scheduler;
}
void set_exception(std::exception_ptr e);
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void future_await_suspend(coroutine_handle<_PromiseT> handler);
void promise_final_suspend();
};
template <typename _Ty>
struct state_t : public state_base_t
{
using state_base_t::lock_type;
using value_type = _Ty;
protected:
std::optional<value_type> _value;
public:
virtual bool has_value() const override
{
scoped_lock<lock_type> __guard(this->_mtx);
return _value.has_value();
}
bool future_await_ready()
{
scoped_lock<lock_type> __guard(this->_mtx);
return _value.has_value();
}
auto future_await_resume() -> value_type;
void set_value(value_type val);
};
template<>
struct state_t<void> : public state_base_t
{
bool _has_value = false;
using state_base_t::lock_type;
protected:
std::atomic<bool> _has_value{ false };
public:
virtual bool has_value() const override
{
return _has_value;
}
bool future_await_ready()
{
return _has_value;
}
void future_await_resume();
void set_value();
};
}

+ 13
- 5
tutorial/test_async_cb.cpp View File

@@ -13,7 +13,8 @@ void callback_get_long(int64_t val, _Ctype&& cb)
using namespace std::chrono;
std::thread([val, cb = std::forward<_Ctype>(cb)]
{
std::this_thread::sleep_for(500ms);
//std::this_thread::sleep_for(500ms);
std::this_thread::sleep_for(10s);
cb(val * val);
}).detach();
}
@@ -29,6 +30,11 @@ auto async_get_long(int64_t val)
return st.get_future();
}
resumef::future_t<> wait_get_long(int64_t val)
{
co_await async_get_long(val);
}
//这种情况下,会生成对应的 frame-context,一个promise_type被内嵌在frame-context里
resumef::future_t<> resumable_get_long(int64_t val)
{
@@ -49,20 +55,22 @@ resumef::future_t<int64_t> loop_get_long(int64_t val)
val = co_await async_get_long(val);
std::cout << val << std::endl;
}
return val;
co_return val;
}
void resumable_main_cb()
{
std::cout << std::this_thread::get_id() << std::endl;
go wait_get_long(3);
resumef::this_scheduler()->run_until_notask();
go []()->resumef::future_t<>
GO
{
auto val = co_await loop_get_long(2);
std::cout << val << std::endl;
};
//resumef::this_scheduler()->run_until_notask();
resumef::this_scheduler()->run_until_notask();
go resumable_get_long(3);
//go resumable_get_long(3);
resumef::this_scheduler()->run_until_notask();
}

+ 0
- 33
vs_proj/librf.cpp View File

@@ -26,39 +26,6 @@ extern void resumable_main_benchmark_mem();
extern void resumable_main_benchmark_asio_server();
extern void resumable_main_benchmark_asio_client(intptr_t nNum);
void async_get_long(int64_t val, std::function<void(int64_t)> cb)
{
using namespace std::chrono;
std::thread([val, cb = std::move(cb)]
{
std::this_thread::sleep_for(10s);
cb(val * val);
}).detach();
}
//这种情况下,没有生成 frame-context,因此,并没有promise_type被内嵌在frame-context里
resumef::future_t<int64_t> co_get_long(int64_t val)
{
resumef::awaitable_t<int64_t > st;
std::cout << "co_get_long@1" << std::endl;
async_get_long(val, [st](int64_t value)
{
std::cout << "co_get_long@2" << std::endl;
st.set_value(value);
});
std::cout << "co_get_long@3" << std::endl;
return st.get_future();
}
//这种情况下,会生成对应的 frame-context,一个promise_type被内嵌在frame-context里
resumef::future_t<> test_librf2()
{
auto f = co_await co_get_long(2);
std::cout << f << std::endl;
}
int main(int argc, const char* argv[])
{
resumable_main_cb();

+ 1
- 0
vs_proj/librf.vcxproj View File

@@ -310,6 +310,7 @@
<None Include="..\librf\src\asio_task_1.10.0.inl" />
<None Include="..\librf\src\asio_task_1.12.0.inl" />
<None Include="..\librf\src\promise.inl" />
<None Include="..\librf\src\state.inl" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">

+ 3
- 0
vs_proj/librf.vcxproj.filters View File

@@ -185,5 +185,8 @@
<None Include="..\librf\src\promise.inl">
<Filter>librf\src</Filter>
</None>
<None Include="..\librf\src\state.inl">
<Filter>librf\src</Filter>
</None>
</ItemGroup>
</Project>

Loading…
Cancel
Save