Browse Source

协程sleep的重新思考和整理

tags/v2.9.7
tearshark 5 years ago
parent
commit
b1c9962756
9 changed files with 78 additions and 40 deletions
  1. 1
    1
      README.md
  2. 11
    0
      librf/src/def.h
  3. 1
    0
      librf/src/scheduler.cpp
  4. 6
    4
      librf/src/sleep.cpp
  5. 10
    6
      librf/src/sleep.h
  6. 23
    23
      librf/src/state.cpp
  7. 12
    2
      librf/src/state.h
  8. 11
    4
      tutorial/test_async_sleep.cpp
  9. 3
    0
      vs_proj/librf.cpp

+ 1
- 1
README.md View File

librf有以下特点: librf有以下特点:


* 1.基于C++17提案'Stackless Resumable Functions'编写的非对称stackless协程库,可以以同步的方式编写简单的代码,同时获得异步的性能 * 1.基于C++17提案'Stackless Resumable Functions'编写的非对称stackless协程库,可以以同步的方式编写简单的代码,同时获得异步的性能
* 2.理论上支持海量协程, 创建100万个协程只需使用<820M>物理内存
* 2.理论上支持海量协程, 创建100万个协程只需使用<430M>物理内存
* 3.提供协程锁(mutex), 定时器, channel等特性, 帮助用户更加容易地编写程序 * 3.提供协程锁(mutex), 定时器, channel等特性, 帮助用户更加容易地编写程序
* 4.可以很好的跟asio,libuv等库结合,能跟现有的callback范式的异步/延迟代码很好的结合 * 4.可以很好的跟asio,libuv等库结合,能跟现有的callback范式的异步/延迟代码很好的结合
* 5.目前还处于实验状态,不对今后正式的C++ Coroutines支持有任何正式的承诺 * 5.目前还处于实验状态,不对今后正式的C++ Coroutines支持有任何正式的承诺

+ 11
- 0
librf/src/def.h View File

already_acquired, // attempt to get another future already_acquired, // attempt to get another future
unlock_more, // unlock 次数多余lock次数 unlock_more, // unlock 次数多余lock次数
read_before_write, // 0容量的channel,先读后写 read_before_write, // 0容量的channel,先读后写
timer_canceled, // 定时器被意外取消
max__ max__
}; };
} }
}; };
struct timer_canceled_exception : public std::exception
{
error_code _error;
timer_canceled_exception(error_code fe)
: exception(get_error_string(fe, "timer canceled"))
, _error(fe)
{
}
};
struct scheduler; struct scheduler;
struct state_base; struct state_base;

+ 1
- 0
librf/src/scheduler.cpp View File

"already_acquired", "already_acquired",
"unlock_more", "unlock_more",
"read_before_write", "read_before_write",
"timer_canceled",
}; };
static char sz_future_error_buffer[256]; static char sz_future_error_buffer[256];

+ 6
- 4
librf/src/sleep.cpp View File

namespace resumef namespace resumef
{ {
future_t<bool> sleep_until_(const std::chrono::system_clock::time_point& tp_, scheduler & scheduler_)
future_vt sleep_until_(const std::chrono::system_clock::time_point& tp_, scheduler & scheduler_)
{ {
promise_t<bool> awaitable;
promise_vt awaitable;
scheduler_.timer()->add(tp_, scheduler_.timer()->add(tp_,
[st = awaitable._state](bool cancellation_requested) [st = awaitable._state](bool cancellation_requested)
{ {
st->set_value(cancellation_requested);
if (cancellation_requested)
st->throw_exception(timer_canceled_exception{ error_code::timer_canceled });
else
st->set_value();
}); });
return awaitable.get_future(); return awaitable.get_future();

+ 10
- 6
librf/src/sleep.h View File

//协程的定时器
//如果定时器被取消了,则会抛 timer_canceled_exception 异常
//不使用co_await而单独sleep_for/sleep_until,是错误的用法,并不能达到等待的目的。而仅仅是添加了一个无效的定时器,造成无必要的内存使用
//
#pragma once #pragma once
namespace resumef namespace resumef
{ {
struct scheduler; struct scheduler;
RF_API future_t<bool> sleep_until_(const std::chrono::system_clock::time_point& tp_, scheduler & scheduler_);
RF_API future_vt sleep_until_(const std::chrono::system_clock::time_point& tp_, scheduler & scheduler_);
inline future_t<bool> sleep_for_(const std::chrono::system_clock::duration& dt_, scheduler & scheduler_)
inline future_vt sleep_for_(const std::chrono::system_clock::duration& dt_, scheduler & scheduler_)
{ {
return std::move(sleep_until_(std::chrono::system_clock::now() + dt_, scheduler_)); return std::move(sleep_until_(std::chrono::system_clock::now() + dt_, scheduler_));
} }
template<class _Rep, class _Period> template<class _Rep, class _Period>
future_t<bool> sleep_for(const std::chrono::duration<_Rep, _Period>& dt_, scheduler & scheduler_)
future_vt sleep_for(const std::chrono::duration<_Rep, _Period>& dt_, scheduler & scheduler_)
{ {
return std::move(sleep_for_(std::chrono::duration_cast<std::chrono::system_clock::duration>(dt_), scheduler_)); return std::move(sleep_for_(std::chrono::duration_cast<std::chrono::system_clock::duration>(dt_), scheduler_));
} }
template<class _Rep, class _Period> template<class _Rep, class _Period>
future_t<bool> sleep_for(const std::chrono::duration<_Rep, _Period>& dt_)
future_vt sleep_for(const std::chrono::duration<_Rep, _Period>& dt_)
{ {
return std::move(sleep_for_(std::chrono::duration_cast<std::chrono::system_clock::duration>(dt_), *this_scheduler())); return std::move(sleep_for_(std::chrono::duration_cast<std::chrono::system_clock::duration>(dt_), *this_scheduler()));
} }
template<class _Clock, class _Duration = typename _Clock::duration> template<class _Clock, class _Duration = typename _Clock::duration>
future_t<bool> sleep_until(const std::chrono::time_point<_Clock, _Duration>& tp_, scheduler & scheduler_)
future_vt sleep_until(const std::chrono::time_point<_Clock, _Duration>& tp_, scheduler & scheduler_)
{ {
return std::move(sleep_until_(std::chrono::time_point_cast<std::chrono::system_clock::duration>(tp_), scheduler_)); return std::move(sleep_until_(std::chrono::time_point_cast<std::chrono::system_clock::duration>(tp_), scheduler_));
} }
template<class _Clock, class _Duration> template<class _Clock, class _Duration>
future_t<bool> sleep_until(const std::chrono::time_point<_Clock, _Duration>& tp_)
future_vt sleep_until(const std::chrono::time_point<_Clock, _Duration>& tp_)
{ {
return std::move(sleep_until_(std::chrono::time_point_cast<std::chrono::system_clock::duration>(tp_), *this_scheduler())); return std::move(sleep_until_(std::chrono::time_point_cast<std::chrono::system_clock::duration>(tp_), *this_scheduler()));
} }

+ 23
- 23
librf/src/state.cpp View File

#endif #endif
} }
void state_base::set_value_none_lock()
{
// Set all members first as calling coroutine may reset stuff here.
_ready = true;
if (_coro)
{
void state_base::set_value_none_lock()
{
// Set all members first as calling coroutine may reset stuff here.
_ready = true;
if (_coro)
{
#if RESUMEF_ENABLE_MULT_SCHEDULER #if RESUMEF_ENABLE_MULT_SCHEDULER
auto sch_ = this->current_scheduler();
#else
auto sch_ = this_scheduler();
#endif
if(sch_) sch_->push_task_internal(new awaitable_task_t<state_base>(this));
}
}
auto sch_ = this->current_scheduler();
#else
auto sch_ = this_scheduler();
#endif
if(sch_) sch_->push_task_internal(new awaitable_task_t<state_base>(this));
}
}
void state_base::set_exception(std::exception_ptr && e_) void state_base::set_exception(std::exception_ptr && e_)
{ {
scoped_lock<lock_type> __guard(_mtx); scoped_lock<lock_type> __guard(_mtx);
_exception = std::move(e_); _exception = std::move(e_);
// Set all members first as calling coroutine may reset stuff here.
_ready = true;
if (_coro)
{
// Set all members first as calling coroutine may reset stuff here.
_ready = true;
if (_coro)
{
#if RESUMEF_ENABLE_MULT_SCHEDULER #if RESUMEF_ENABLE_MULT_SCHEDULER
auto sch_ = this->current_scheduler();
auto sch_ = this->current_scheduler();
#else #else
auto sch_ = this_scheduler();
auto sch_ = this_scheduler();
#endif #endif
if (sch_) sch_->push_task_internal(new awaitable_task_t<state_base>(this));
}
if (sch_) sch_->push_task_internal(new awaitable_task_t<state_base>(this));
}
} }
void state_base::await_suspend(coroutine_handle<> resume_cb) void state_base::await_suspend(coroutine_handle<> resume_cb)

+ 12
- 2
librf/src/state.h View File

std::atomic<bool> _done = false; std::atomic<bool> _done = false;
public: public:
state_base()
state_base() noexcept
{ {
#if RESUMEF_DEBUG_COUNTER #if RESUMEF_DEBUG_COUNTER
++g_resumef_state_count; ++g_resumef_state_count;
_value = std::forward<value_type>(t); _value = std::forward<value_type>(t);
state_base::set_value_none_lock(); state_base::set_value_none_lock();
} }
_Ty & get_value()
value_type & set_value__()
{
scoped_lock<lock_type> __guard(_mtx);
state_base::set_value_none_lock();
return _value;
}
value_type & get_value()
{ {
scoped_lock<lock_type> __guard(_mtx); scoped_lock<lock_type> __guard(_mtx);
throw future_exception{ error_code::not_ready }; throw future_exception{ error_code::not_ready };
return _value; return _value;
} }
void reset() void reset()
{ {
scoped_lock<lock_type> __guard(_mtx); scoped_lock<lock_type> __guard(_mtx);

+ 11
- 4
tutorial/test_async_sleep.cpp View File

{ {
using namespace std::chrono; using namespace std::chrono;
resumef::sleep_for(100ms); //incorrect!!!
co_await resumef::sleep_for(100ms); co_await resumef::sleep_for(100ms);
std::cout << "timer after 100ms." << std::endl; std::cout << "timer after 100ms." << std::endl;
if (co_await resumef::sleep_until(system_clock::now() + 200ms))
std::cout << "timer canceled." << std::endl;
else
try
{
co_await resumef::sleep_until(system_clock::now() + 200ms);
std::cout << "timer after 200ms." << std::endl; std::cout << "timer after 200ms." << std::endl;
}
catch (resumef::timer_canceled_exception)
{
std::cout << "timer canceled." << std::endl;
}
} }
void test_wait_all_events_with_signal_by_sleep() void test_wait_all_events_with_signal_by_sleep()
}; };
srand((int)time(nullptr)); srand((int)time(nullptr));
for(size_t i=0; i<_countof(evts); ++i)
for (size_t i = 0; i < _countof(evts); ++i)
{ {
go[&, i]() -> future_vt go[&, i]() -> future_vt
{ {

+ 3
- 0
vs_proj/librf.cpp View File

int main(int argc, const char * argv[]) int main(int argc, const char * argv[])
{ {
resumable_main_sleep();
//resumable_main_resumable(); //resumable_main_resumable();
/*
resumable_main_when_all(); resumable_main_when_all();
resumable_main_multi_thread(); resumable_main_multi_thread();
resumable_main_yield_return(); resumable_main_yield_return();
resumable_main_channel(); resumable_main_channel();
resumable_main_cb(); resumable_main_cb();
resumable_main_exception(); resumable_main_exception();
*/
return 0; return 0;
} }

Loading…
Cancel
Save