Browse Source

删除是否多线程调度的选择宏,现在都是多线程调度。

完善event的文档
tags/v2.9.7
tearshark 4 years ago
parent
commit
2d92dd0dde

+ 0
- 4
CMakeLists.txt View File

option(OPT_USE_CONCEPT "Use conecpt instead of enable_if" OFF) option(OPT_USE_CONCEPT "Use conecpt instead of enable_if" OFF)
endif() endif()
option(OPT_MULT_SCHEDULER "Enable multiple schedulers" ON)
option(OPT_DEBUG_COUNTER "Debug objects count" OFF) option(OPT_DEBUG_COUNTER "Debug objects count" OFF)
option(OPT_KEEP_REAL_SIZE "Keep real size in queue" OFF) option(OPT_KEEP_REAL_SIZE "Keep real size in queue" OFF)
#set(RESUMEF_USE_CUSTOM_SPINLOCK "std::mutex") #set(RESUMEF_USE_CUSTOM_SPINLOCK "std::mutex")
if(OPT_MULT_SCHEDULER)
set(RESUMEF_ENABLE_MULT_SCHEDULER 1)
endif()
if(OPT_INLINE_STATE) if(OPT_INLINE_STATE)
set(RESUMEF_INLINE_STATE 1) set(RESUMEF_INLINE_STATE 1)
endif() endif()

+ 1
- 1
Doxyfile View File

# section is generated. This option has no effect if EXTRACT_ALL is enabled. # section is generated. This option has no effect if EXTRACT_ALL is enabled.
# The default value is: NO. # The default value is: NO.
HIDE_UNDOC_MEMBERS = NO
HIDE_UNDOC_MEMBERS = YES
# If the HIDE_UNDOC_CLASSES tag is set to YES, doxygen will hide all # If the HIDE_UNDOC_CLASSES tag is set to YES, doxygen will hide all
# undocumented classes that are normally visible in the class hierarchy. If set # undocumented classes that are normally visible in the class hierarchy. If set

+ 0
- 4
benchmark/benchmark_asio_echo.cpp View File

void resumable_main_benchmark_asio_server() void resumable_main_benchmark_asio_server()
{ {
#if RESUMEF_ENABLE_MULT_SCHEDULER
std::array<std::thread, 2> thds; std::array<std::thread, 2> thds;
for (size_t i = 0; i < thds.size(); ++i) for (size_t i = 0; i < thds.size(); ++i)
{ {
for (auto & t : thds) for (auto & t : thds)
t.join(); t.join();
#else
RunOneBenchmark(true);
#endif
} }
//---------------------------------------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------------------------------------

+ 0
- 4
config.h.in View File

#pragma once #pragma once
#ifndef RESUMEF_ENABLE_MULT_SCHEDULER
#cmakedefine RESUMEF_ENABLE_MULT_SCHEDULER @RESUMEF_ENABLE_MULT_SCHEDULER@
#endif //RESUMEF_ENABLE_MULT_SCHEDULER
#ifndef RESUMEF_INLINE_STATE #ifndef RESUMEF_INLINE_STATE
#if defined(__clang__) || defined(_MSC_VER) #if defined(__clang__) || defined(_MSC_VER)
#cmakedefine RESUMEF_INLINE_STATE @RESUMEF_INLINE_STATE@ #cmakedefine RESUMEF_INLINE_STATE @RESUMEF_INLINE_STATE@

+ 2
- 2
librf/src/channel_v2.h View File



/** /**
* @brief 在协程中从channel_t里读取一个数据。 * @brief 在协程中从channel_t里读取一个数据。
* @see 参考{read}()函数
* @see 参考read()函数
*/ */
read_awaiter operator co_await() const noexcept; read_awaiter operator co_await() const noexcept;




/** /**
* @brief 在协程中向channel_t里写入一个数据。 * @brief 在协程中向channel_t里写入一个数据。
* @see 参考{write}()函数
* @see 参考write()函数
*/ */
template<class U template<class U
#ifndef DOXYGEN_SKIP_PROPERTY #ifndef DOXYGEN_SKIP_PROPERTY

+ 0
- 4
librf/src/config.h View File

#pragma once #pragma once
#ifndef RESUMEF_ENABLE_MULT_SCHEDULER
#define RESUMEF_ENABLE_MULT_SCHEDULER 1
#endif //RESUMEF_ENABLE_MULT_SCHEDULER
#ifndef RESUMEF_INLINE_STATE #ifndef RESUMEF_INLINE_STATE
#if defined(__clang__) || defined(_MSC_VER) #if defined(__clang__) || defined(_MSC_VER)
#define RESUMEF_INLINE_STATE 1 #define RESUMEF_INLINE_STATE 1

+ 8
- 2
librf/src/def.h View File

#pragma once #pragma once
#define LIB_RESUMEF_VERSION 20905 // 2.9.5
#define LIB_RESUMEF_VERSION 20906 // 2.9.6
namespace resumef namespace resumef
{ {
template<class... _Mutexes> template<class... _Mutexes>
using scoped_lock = std::scoped_lock<_Mutexes...>; using scoped_lock = std::scoped_lock<_Mutexes...>;
/**
* @brief 版本号。
*/
constexpr size_t _Version = LIB_RESUMEF_VERSION; constexpr size_t _Version = LIB_RESUMEF_VERSION;
//获得当前线程下的调度器
/**
* @brief 获得当前线程下的调度器。
*/
scheduler_t* this_scheduler(); scheduler_t* this_scheduler();
} }
#ifndef DOXYGEN_SKIP_PROPERTY #ifndef DOXYGEN_SKIP_PROPERTY

+ 71
- 4
librf/src/event_v2.h View File

{ {
#endif //DOXYGEN_SKIP_PROPERTY #endif //DOXYGEN_SKIP_PROPERTY


/**
* @brief 用于协程的事件。
* @details 用于同步不同线程里运行的协程。
*/
struct event_t struct event_t
{ {
using event_impl_ptr = std::shared_ptr<detail::event_v2_impl>; using event_impl_ptr = std::shared_ptr<detail::event_v2_impl>;
using clock_type = std::chrono::system_clock; using clock_type = std::chrono::system_clock;


/**
* @brief 构造一个事件。
* @param initially 初始是否触发一次信号。
*/
event_t(bool initially = false); event_t(bool initially = false);

/**
* @brief 构造一个无效的事件。
* @details 如果用于后续保存另外一个事件,则应当使用此构造函数,便于节省一次不必要的内部初始化。
*/
event_t(std::adopt_lock_t); event_t(std::adopt_lock_t);

/**
* @brief 采用shared_ptr<>来保存内部的事件实现。故不必担心正在被等待的协程,因为事件提前销毁而出现异常。
*/
~event_t(); ~event_t();


/**
* @brief 向所有正在等待的协程触发一次信号。
* @attention 非协程中也可以使用。
*/
void signal_all() const noexcept; void signal_all() const noexcept;

/**
* @brief 触发一次信号。
* @details 如果有正在等待的协程,则最先等待的协程会被唤醒。\n
* 如果没有正在等待的协程,则信号触发次数加一。之后有协程调用wait(),则会直接返回。
* @attention 非协程中也可以使用。
*/
void signal() const noexcept; void signal() const noexcept;

/**
* @brief 重置信号。
* @attention 非协程中也可以使用。
*/
void reset() const noexcept; void reset() const noexcept;


struct [[nodiscard]] awaiter; struct [[nodiscard]] awaiter;


/**
* @brief 在协程中等待信号触发。
* @see 等同于co_await wait()。
* @attention 只能在协程中调用。
*/
awaiter operator co_await() const noexcept; awaiter operator co_await() const noexcept;

/**
* @brief 在协程中等待信号触发。
* @details 如果信号已经触发,则立即返回true。\n
* 否则,当前协程被阻塞,直到信号被触发后唤醒。
* 消耗一次信号触发次数。
* @retval bool [co_await] 返回是否等到了信号
* @attention 只能在协程中调用。
*/
awaiter wait() const noexcept; awaiter wait() const noexcept;


template<class _Btype> template<class _Btype>


struct [[nodiscard]] timeout_awaiter; struct [[nodiscard]] timeout_awaiter;


/**
* @brief 在协程中等待信号触发,直到超时。
* @details 如果信号已经触发,则立即返回true。\n
* 否则,当前协程被阻塞,直到信号被触发后,或者超时后唤醒。
* 如果等到了信号,则消耗一次信号触发次数。
* @param dt 超时时长
* @retval bool [co_await] 等到了信号返回true,超时了返回false。
* @attention 只能在协程中调用。
*/
template<class _Rep, class _Period> template<class _Rep, class _Period>
timeout_awaiter wait_for(const std::chrono::duration<_Rep, _Period>& dt) const noexcept; timeout_awaiter wait_for(const std::chrono::duration<_Rep, _Period>& dt) const noexcept;

/**
* @brief 在协程中等待信号触发,直到超时。
* @details 如果信号已经触发,则立即返回true。\n
* 否则,当前协程被阻塞,直到信号被触发后,或者超时后唤醒。
* 如果等到了信号,则消耗一次信号触发次数。
* @param tp 超时时刻
* @retval bool [co_await] 等到了信号返回true,超时了返回false。
* @attention 只能在协程中调用。
*/
template<class _Clock, class _Duration> template<class _Clock, class _Duration>
timeout_awaiter wait_until(const std::chrono::time_point<_Clock, _Duration>& tp) const noexcept; timeout_awaiter wait_until(const std::chrono::time_point<_Clock, _Duration>& tp) const noexcept;


#ifndef DOXYGEN_SKIP_PROPERTY #ifndef DOXYGEN_SKIP_PROPERTY
RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>) RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
#endif //DOXYGEN_SKIP_PROPERTY #endif //DOXYGEN_SKIP_PROPERTY
static auto wait_any(_Cont& cnt_)
static auto wait_any(const _Cont& cnt_)
->any_awaiter<decltype(std::begin(cnt_))>; ->any_awaiter<decltype(std::begin(cnt_))>;


template<class _Iter> template<class _Iter>
#ifndef DOXYGEN_SKIP_PROPERTY #ifndef DOXYGEN_SKIP_PROPERTY
RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>) RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
#endif //DOXYGEN_SKIP_PROPERTY #endif //DOXYGEN_SKIP_PROPERTY
static auto wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Cont& cnt_)
static auto wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_)
->timeout_any_awaiter<decltype(std::begin(cnt_))>; ->timeout_any_awaiter<decltype(std::begin(cnt_))>;




#ifndef DOXYGEN_SKIP_PROPERTY #ifndef DOXYGEN_SKIP_PROPERTY
RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>) RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
#endif //DOXYGEN_SKIP_PROPERTY #endif //DOXYGEN_SKIP_PROPERTY
static auto wait_all(_Cont& cnt_)
static auto wait_all(const _Cont& cnt_)
->all_awaiter<decltype(std::begin(cnt_))>; ->all_awaiter<decltype(std::begin(cnt_))>;



template<class _Iter> template<class _Iter>
struct [[nodiscard]] timeout_all_awaiter; struct [[nodiscard]] timeout_all_awaiter;


#ifndef DOXYGEN_SKIP_PROPERTY #ifndef DOXYGEN_SKIP_PROPERTY
RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>) RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
#endif //DOXYGEN_SKIP_PROPERTY #endif //DOXYGEN_SKIP_PROPERTY
static auto wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Cont& cnt_)
static auto wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_)
->timeout_all_awaiter<decltype(std::begin(cnt_))>; ->timeout_all_awaiter<decltype(std::begin(cnt_))>;


event_t(const event_t&) = default; event_t(const event_t&) = default;

+ 5
- 6
librf/src/event_v2.inl View File

template<class _Cont COMMA_RESUMEF_ENABLE_IF_TYPENAME()> template<class _Cont COMMA_RESUMEF_ENABLE_IF_TYPENAME()>
RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>) RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
auto event_t::wait_any(_Cont& cnt_) ->event_t::any_awaiter<decltype(std::begin(cnt_))>
auto event_t::wait_any(const _Cont& cnt_) ->event_t::any_awaiter<decltype(std::begin(cnt_))>
{ {
return { std::begin(cnt_), std::end(cnt_) }; return { std::begin(cnt_), std::end(cnt_) };
} }
template<class _Rep, class _Period, class _Cont COMMA_RESUMEF_ENABLE_IF_TYPENAME()> template<class _Rep, class _Period, class _Cont COMMA_RESUMEF_ENABLE_IF_TYPENAME()>
RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>) RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
auto event_t::wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Cont& cnt_)
auto event_t::wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_)
->event_t::timeout_any_awaiter<decltype(std::begin(cnt_))> ->event_t::timeout_any_awaiter<decltype(std::begin(cnt_))>
{ {
clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt); clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt);
template<class _Iter COMMA_RESUMEF_ENABLE_IF_TYPENAME()> template<class _Iter COMMA_RESUMEF_ENABLE_IF_TYPENAME()>
RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>) RESUMEF_REQUIRES(_IteratorOfT<_Iter, event_t>)
auto event_t::wait_all(_Iter begin_, _Iter end_) ->event_t::all_awaiter<_Iter>
auto event_t::wait_all(_Iter begin_, _Iter end_) ->all_awaiter<_Iter>
{ {
return { begin_, end_ }; return { begin_, end_ };
} }
template<class _Cont COMMA_RESUMEF_ENABLE_IF_TYPENAME()> template<class _Cont COMMA_RESUMEF_ENABLE_IF_TYPENAME()>
RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>) RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
auto event_t::wait_all(_Cont& cnt_) ->event_t::all_awaiter<decltype(std::begin(cnt_))>
auto event_t::wait_all(const _Cont& cnt_) ->all_awaiter<decltype(std::begin(cnt_))>
{ {
return { std::begin(cnt_), std::end(cnt_) }; return { std::begin(cnt_), std::end(cnt_) };
} }
template<class _Iter> template<class _Iter>
struct [[nodiscard]] event_t::timeout_all_awaiter : timeout_awaitor_impl<all_awaiter<_Iter>> struct [[nodiscard]] event_t::timeout_all_awaiter : timeout_awaitor_impl<all_awaiter<_Iter>>
{ {
template<class _Rep, class _Period, class _Cont COMMA_RESUMEF_ENABLE_IF_TYPENAME()> template<class _Rep, class _Period, class _Cont COMMA_RESUMEF_ENABLE_IF_TYPENAME()>
RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>) RESUMEF_REQUIRES(_ContainerOfT<_Cont, event_t>)
auto event_t::wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Cont& cnt_)
auto event_t::wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_)
->event_t::timeout_all_awaiter<decltype(std::begin(cnt_))> ->event_t::timeout_all_awaiter<decltype(std::begin(cnt_))>
{ {
clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt); clock_type::time_point tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt);

+ 1
- 1
librf/src/mutex_v2.h View File

/** /**
* @brief 在协程中加锁。 * @brief 在协程中加锁。
* @see 等同调用 co_await {lock}()。
* @see 等同调用 co_await lock()。
* @return [co_await] batch_unlock_t * @return [co_await] batch_unlock_t
*/ */
awaiter/*batch_unlock_t*/ operator co_await() const noexcept; awaiter/*batch_unlock_t*/ operator co_await() const noexcept;

+ 1
- 1
librf/src/rf_task.h View File

* @details 每启动一个新的协程,则对应一个协程任务类。\n * @details 每启动一个新的协程,则对应一个协程任务类。\n
* 一方面,task_t<>用于标记协程是否执行完毕;\n * 一方面,task_t<>用于标记协程是否执行完毕;\n
* 另一方面,对于通过函数对象(functor/lambda)启动的协程,有很大概率,此协程的内部变量,依赖此函数对象的生存期。\n * 另一方面,对于通过函数对象(functor/lambda)启动的协程,有很大概率,此协程的内部变量,依赖此函数对象的生存期。\n
* tast_t<>的针对函数对象的特化版本,会持有此函数对象的拷贝,从而保证协程内部变量的生存期。从而减少外部使用协程函数对象的工作量。\n
* tast_t<>的针对函数对象的特化版本,会持有此函数对象的拷贝,从而保证协程内部变量的生存期。这便于减少外部使用协程函数对象的工作量。\n
* 如果不希望task_t<>持有此函数对象,则通过调用此函数对象来启动协程,即:\n * 如果不希望task_t<>持有此函数对象,则通过调用此函数对象来启动协程,即:\n
* go functor; \n * go functor; \n
* 替换为\n * 替换为\n

+ 0
- 21
librf/src/scheduler.cpp View File

return future_error_string[(size_t)(fe)]; return future_error_string[(size_t)(fe)];
} }
#if RESUMEF_ENABLE_MULT_SCHEDULER
thread_local scheduler_t * th_scheduler_ptr = nullptr; thread_local scheduler_t * th_scheduler_ptr = nullptr;
//获得当前线程下的调度器 //获得当前线程下的调度器
{ {
return th_scheduler_ptr ? th_scheduler_ptr : &scheduler_t::g_scheduler; return th_scheduler_ptr ? th_scheduler_ptr : &scheduler_t::g_scheduler;
} }
#endif
local_scheduler::local_scheduler() local_scheduler::local_scheduler()
{ {
#if RESUMEF_ENABLE_MULT_SCHEDULER
if (th_scheduler_ptr == nullptr) if (th_scheduler_ptr == nullptr)
{ {
_scheduler_ptr = new scheduler_t; _scheduler_ptr = new scheduler_t;
{ {
_scheduler_ptr = nullptr; _scheduler_ptr = nullptr;
} }
#endif
} }
local_scheduler::local_scheduler(scheduler_t& sch) local_scheduler::local_scheduler(scheduler_t& sch)
{ {
#if RESUMEF_ENABLE_MULT_SCHEDULER
if (th_scheduler_ptr == nullptr) if (th_scheduler_ptr == nullptr)
{ {
th_scheduler_ptr = &sch; th_scheduler_ptr = &sch;
} }
_scheduler_ptr = nullptr; _scheduler_ptr = nullptr;
#endif
} }
local_scheduler::~local_scheduler() local_scheduler::~local_scheduler()
{ {
#if RESUMEF_ENABLE_MULT_SCHEDULER
if (th_scheduler_ptr == _scheduler_ptr) if (th_scheduler_ptr == _scheduler_ptr)
th_scheduler_ptr = nullptr; th_scheduler_ptr = nullptr;
delete _scheduler_ptr; delete _scheduler_ptr;
#endif
} }
scheduler_t::scheduler_t() scheduler_t::scheduler_t()
_runing_states.reserve(1024); _runing_states.reserve(1024);
_cached_states.reserve(1024); _cached_states.reserve(1024);
#if RESUMEF_ENABLE_MULT_SCHEDULER
if (th_scheduler_ptr == nullptr) if (th_scheduler_ptr == nullptr)
th_scheduler_ptr = this; th_scheduler_ptr = this;
#endif
} }
scheduler_t::~scheduler_t() scheduler_t::~scheduler_t()
{ {
//cancel_all_task_(); //cancel_all_task_();
#if RESUMEF_ENABLE_MULT_SCHEDULER
if (th_scheduler_ptr == this) if (th_scheduler_ptr == this)
th_scheduler_ptr = nullptr; th_scheduler_ptr = nullptr;
#endif
} }
void scheduler_t::new_task(task_base_t * task) void scheduler_t::new_task(task_base_t * task)
} }
} }
void scheduler_t::run()
{
for (;;)
{
this->run_one_batch();
std::this_thread::yield();
}
}
scheduler_t scheduler_t::g_scheduler; scheduler_t scheduler_t::g_scheduler;
} }

+ 55
- 16
librf/src/scheduler.h View File

namespace resumef namespace resumef
{ {
/**
* @brief 协程调度器。
* @details librf的设计原则之一,就是要将协程绑定在固定的调度器里执行。
* 通过控制调度器运行的线程和时机,从而控制协程所在的线程和运行时机。
*/
struct scheduler_t : public std::enable_shared_from_this<scheduler_t> struct scheduler_t : public std::enable_shared_from_this<scheduler_t>
{ {
private: private:
void new_task(task_base_t* task); void new_task(task_base_t* task);
//void cancel_all_task_(); //void cancel_all_task_();
public: public:
/**
* @brief 运行一批准备妥当的协程。
* @details 这是协程调度器提供的主要接口。同一个调度器非线程安全,不可重入。\n
* 调用者要保证此函数始终在同一个线程里调用。
*/
void run_one_batch(); void run_one_batch();
/**
* @brief 循环运行所有的协程,直到所有协程都运行完成。
* @details 通常用于测试代码。
*/
void run_until_notask(); void run_until_notask();
void run();
//void break_all(); //void break_all();
template<class _Ty
/**
* @brief 将一个协程加入到调度器里开始运行。
* @details 推荐使用go或者GO这两个宏来启动协程。\n
* go用于启动future_t<>/generator_t<>;\n
* GO用于启动一个所有变量按值捕获的lambda。
* @param coro 协程对象。future_t<>,generator_t<>,或者一个调用后返回future_t<>/generator_t<>的函数对象。
*/
template<class _Ty
#ifndef DOXYGEN_SKIP_PROPERTY #ifndef DOXYGEN_SKIP_PROPERTY
COMMA_RESUMEF_ENABLE_IF(traits::is_callable_v<_Ty> || traits::is_future_v<_Ty> || traits::is_generator_v<_Ty>) COMMA_RESUMEF_ENABLE_IF(traits::is_callable_v<_Ty> || traits::is_future_v<_Ty> || traits::is_generator_v<_Ty>)
#endif //DOXYGEN_SKIP_PROPERTY #endif //DOXYGEN_SKIP_PROPERTY
#ifndef DOXYGEN_SKIP_PROPERTY #ifndef DOXYGEN_SKIP_PROPERTY
RESUMEF_REQUIRES(traits::is_callable_v<_Ty> || traits::is_future_v<_Ty> || traits::is_generator_v<_Ty>) RESUMEF_REQUIRES(traits::is_callable_v<_Ty> || traits::is_future_v<_Ty> || traits::is_generator_v<_Ty>)
#endif //DOXYGEN_SKIP_PROPERTY #endif //DOXYGEN_SKIP_PROPERTY
void operator + (_Ty&& t_)
void operator + (_Ty&& coro)
{ {
if constexpr (traits::is_callable_v<_Ty>) if constexpr (traits::is_callable_v<_Ty>)
new_task(new ctx_task_t<_Ty>(t_));
new_task(new ctx_task_t<_Ty>(coro));
else else
new_task(new task_t<_Ty>(t_));
new_task(new task_t<_Ty>(coro));
} }
/**
* @brief 判断所有协程是否运行完毕。
* @retval bool 以下条件全部满足,返回true:\n
* 1、所有协程运行完毕\n
* 2、没有正在准备执行的state\n
* 3、定时管理器的empty()返回true。
*/
bool empty() const bool empty() const
{ {
scoped_lock<spinlock, spinlock> __guard(_lock_ready, _lock_running); scoped_lock<spinlock, spinlock> __guard(_lock_ready, _lock_running);
return _ready_task.empty() && _runing_states.empty() && _timer->empty(); return _ready_task.empty() && _runing_states.empty() && _timer->empty();
} }
/**
* @brief 获得定时管理器。
*/
timer_manager* timer() const noexcept timer_manager* timer() const noexcept
{ {
return _timer.get(); return _timer.get();
#endif //DOXYGEN_SKIP_PROPERTY #endif //DOXYGEN_SKIP_PROPERTY
}; };
/**
* @brief 创建一个线程相关的调度器。
* @details 如果线程之前已经创建了调度器,则第一个调度器会跟线程绑定,此后local_scheduler不会创建更多的调度器。\n
* 否则,local_scheduler会创建一个调度器,并绑定到创建local_scheduler的线程上。\n
* 如果local_scheduler成功创建了一个调度器,则在local_scheduler生命周期结束后,会销毁创建的调度器,并解绑线程。\n
* 典型用法,是在非主线程里,开始运行协程之前,申明一个local_scheduler的局部变量。
*/
struct local_scheduler struct local_scheduler
{ {
/**
* @brief 尽可能的创建一个线程相关的调度器。
*/
local_scheduler(); local_scheduler();
/**
* @brief 将指定的调度器绑定到当前线程上。
*/
local_scheduler(scheduler_t & sch); local_scheduler(scheduler_t & sch);
/**
* @brief 如果当前线程绑定的调度器由local_scheduler所创建,则会销毁调度器,并解绑线程。
*/
~local_scheduler(); ~local_scheduler();
local_scheduler(local_scheduler&& right_) = delete; local_scheduler(local_scheduler&& right_) = delete;
local_scheduler& operator = (local_scheduler&& right_) = delete; local_scheduler& operator = (local_scheduler&& right_) = delete;
local_scheduler(const local_scheduler&) = delete; local_scheduler(const local_scheduler&) = delete;
local_scheduler& operator = (const local_scheduler&) = delete; local_scheduler& operator = (const local_scheduler&) = delete;
#if RESUMEF_ENABLE_MULT_SCHEDULER
private: private:
scheduler_t* _scheduler_ptr; scheduler_t* _scheduler_ptr;
#endif
}; };
#if !RESUMEF_ENABLE_MULT_SCHEDULER
//获得当前线程下的调度器
inline scheduler_t* this_scheduler()
{
return &scheduler_t::g_scheduler;
}
#endif
} }

+ 11
- 11
librf/src/switch_scheduler.h View File

#pragma once
锘�#pragma once


namespace resumef namespace resumef
{ {
scheduler_t* _scheduler; scheduler_t* _scheduler;
}; };


//由于跟when_all/when_any混用的时候,在clang上编译失败:
//clang把scheduler_t判断成一个is_awaitable,且放弃选择when_all/any(scheduler_t& sch, ...)版本
//故放弃这种用法
//鐢变簬璺焪hen_all/when_any娣风敤鐨勬椂鍊欙紝鍦╟lang涓婄紪璇戝け璐ワ細
//clang鎶妔cheduler_t鍒ゆ柇鎴愪竴涓猧s_awaitable锛屼笖鏀惧純閫夋嫨when_all/any(scheduler_t& sch, ...)鐗堟湰
//鏁呮斁寮冭繖绉嶇敤娉�
//inline switch_scheduler_awaitor operator co_await(scheduler_t& sch) noexcept //inline switch_scheduler_awaitor operator co_await(scheduler_t& sch) noexcept
//{ //{
// return { &sch }; // return { &sch };
//} //}


/** /**
* @fn 将本协程切换到指定调度器上运行。
* @details 由于调度器必然在某个线程里运行,故达到了切换到特定线程里运行的目的。\n
* 如果指定的协程就是本协程的调度器,则协程不暂停直接运行接下来的代码。
* 如果指定的协程不是本协程的调度器,则协程暂停后放入到目的协程的调度队列,等待下一次运行。
* @param sch 将要运行此后代码的协程
* @fn 灏嗘湰鍗忕▼鍒囨崲鍒版寚瀹氳皟搴﹀櫒涓婅繍琛屻€�
* @details 鐢变簬璋冨害鍣ㄥ繀鐒跺湪鏌愪釜绾跨▼閲岃繍琛岋紝鏁呰揪鍒颁簡鍒囨崲鍒扮壒瀹氱嚎绋嬮噷杩愯�鐨勭洰鐨勩€俓n
* 濡傛灉鎸囧畾鐨勫崗绋嬪氨鏄�湰鍗忕▼鐨勮皟搴﹀櫒锛屽垯鍗忕▼涓嶆殏鍋滅洿鎺ヨ繍琛屾帴涓嬫潵鐨勪唬鐮併€�
* 濡傛灉鎸囧畾鐨勫崗绋嬩笉鏄�湰鍗忕▼鐨勮皟搴﹀櫒锛屽垯鍗忕▼鏆傚仠鍚庢斁鍏ュ埌鐩�殑鍗忕▼鐨勮皟搴﹂槦鍒楋紝绛夊緟涓嬩竴娆¤繍琛屻€�
* @param sch 灏嗚�杩愯�姝ゅ悗浠g爜鐨勫崗绋�
*/ */
inline switch_scheduler_awaitor via(scheduler_t& sch) noexcept inline switch_scheduler_awaitor via(scheduler_t& sch) noexcept
{ {
} }


/** /**
* @fn 将本协程切换到指定调度器上运行。
* @see 参考{via}(scheduler_t&)版本。
* @fn 灏嗘湰鍗忕▼鍒囨崲鍒版寚瀹氳皟搴﹀櫒涓婅繍琛屻€�
* @see 鍙傝€� via(scheduler_t&)鐗堟湰銆�
*/ */
inline switch_scheduler_awaitor via(scheduler_t* sch) noexcept inline switch_scheduler_awaitor via(scheduler_t* sch) noexcept
{ {

+ 2
- 2
librf/src/type_traits.inl View File

> >
> >
: std::conjunction< : std::conjunction<
is_iterator<decltype(std::begin(std::declval<_Ty>()))>,
std::is_same<_Ety&, decltype(*std::begin(std::declval<_Ty>()))>
is_iterator<decltype(std::begin(std::declval<_Ty>()))>
, std::is_same<_Ety, remove_cvref_t<decltype(*std::begin(std::declval<_Ty>()))>>
> {}; > {};
template<class _Ty, size_t _Size> template<class _Ty, size_t _Size>
struct is_container_of<_Ty[_Size], _Ty> : std::true_type {}; struct is_container_of<_Ty[_Size], _Ty> : std::true_type {};

+ 1
- 2
test_librf.cpp View File

//test_ring_queue<resumef::ring_queue_spinlock<int, false, uint32_t>>(); //test_ring_queue<resumef::ring_queue_spinlock<int, false, uint32_t>>();
//test_ring_queue<resumef::ring_queue_lockfree<int, uint64_t>>(); //test_ring_queue<resumef::ring_queue_lockfree<int, uint64_t>>();


resumable_main_channel();
resumable_main_channel_mult_thread();
resumable_main_event();
return 0; return 0;


//if (argc > 1) //if (argc > 1)

+ 2
- 6
tutorial/test_async_channel_mult_thread.cpp View File

{ {
local_scheduler my_scheduler; local_scheduler my_scheduler;
go test_channel_producer(c, READ_BATCH * READ_THREAD / WRITE_THREAD); go test_channel_producer(c, READ_BATCH * READ_THREAD / WRITE_THREAD);
#if RESUMEF_ENABLE_MULT_SCHEDULER
this_scheduler()->run_until_notask(); this_scheduler()->run_until_notask();
#endif
{ {
scoped_lock<std::mutex> __lock(cout_mutex); scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << "Write OK\r\n"; std::cout << "Write OK\r\n";
{ {
local_scheduler my_scheduler; local_scheduler my_scheduler;
go test_channel_consumer(c, READ_BATCH); go test_channel_consumer(c, READ_BATCH);
#if RESUMEF_ENABLE_MULT_SCHEDULER
this_scheduler()->run_until_notask(); this_scheduler()->run_until_notask();
#endif
{ {
scoped_lock<std::mutex> __lock(cout_mutex); scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << "Read OK\r\n"; std::cout << "Read OK\r\n";
}); });
} }
#if !RESUMEF_ENABLE_MULT_SCHEDULER
std::this_thread::sleep_for(100ms); std::this_thread::sleep_for(100ms);
scheduler_t::g_scheduler.run_until_notask(); scheduler_t::g_scheduler.run_until_notask();
#endif
for(auto & th : read_th) for(auto & th : read_th)
th.join(); th.join();

+ 30
- 0
tutorial/test_async_event.cpp View File

} }
} }
static void test_wait_three()
{
using namespace std::chrono;
event_t evt1, evt2, evt3;
go[&]() -> future_t<>
{
if (co_await event_t::wait_all(std::initializer_list{ evt1, evt2, evt3 }))
std::cout << "all event signal!" << std::endl;
else
std::cout << "time out!" << std::endl;
};
std::vector<std::thread> vtt;
srand((int)time(nullptr));
vtt.emplace_back(async_set_event(evt1, 1ms * (500 + rand() % 1000)));
vtt.emplace_back(async_set_event(evt2, 1ms * (500 + rand() % 1000)));
vtt.emplace_back(async_set_event(evt3, 1ms * (500 + rand() % 1000)));
this_scheduler()->run_until_notask();
for (auto& tt : vtt)
tt.join();
}
static void test_wait_any() static void test_wait_any()
{ {
using namespace std::chrono; using namespace std::chrono;
test_wait_one(); test_wait_one();
std::cout << std::endl; std::cout << std::endl;
test_wait_three();
std::cout << std::endl;
test_wait_any(); test_wait_any();
std::cout << std::endl; std::cout << std::endl;

Loading…
Cancel
Save