Browse Source

减少state加锁的次数

增加future对yield的测试
tags/v2.9.7
tearshark 4 years ago
parent
commit
fee671711d

+ 2
- 1
README.md View File



目前仅支持: 目前仅支持:


Windows (使用VS2017/VS2019编译)
Windows (使用VS2017/VS2019/clang编译)
Android (使用NDK 20.1 自带的clang编译)




librf有以下特点: librf有以下特点:

+ 6
- 3
benchmark/benchmark_async_mem.cpp View File

volatile size_t globalValue = 0; volatile size_t globalValue = 0;
void resumable_main_benchmark_mem()
void resumable_main_benchmark_mem(bool wait_key)
{ {
using namespace std::chrono; using namespace std::chrono;
} }
resumef::this_scheduler()->run_until_notask(); resumef::this_scheduler()->run_until_notask();
std::cout << "press any key to continue." << std::endl;
(void)_getch();
if (wait_key)
{
std::cout << "press any key to continue." << std::endl;
(void)_getch();
}
} }
//clang : 平均 210字节 //clang : 平均 210字节

+ 1
- 1
librf/src/def.h View File

#pragma once #pragma once
#define LIB_RESUMEF_VERSION 20302 // 2.3.2
#define LIB_RESUMEF_VERSION 20303 // 2.3.3
#if defined(RESUMEF_MODULE_EXPORT) #if defined(RESUMEF_MODULE_EXPORT)
#define RESUMEF_NS export namespace resumef #define RESUMEF_NS export namespace resumef

+ 1
- 0
librf/src/generator.h View File

void set_exception(std::exception_ptr e) void set_exception(std::exception_ptr e)
{ {
(void)e;
std::terminate(); std::terminate();
} }
#ifdef __clang__ #ifdef __clang__

+ 3
- 0
librf/src/promise.h View File

promise_impl_t& operator = (const promise_impl_t&) = delete; promise_impl_t& operator = (const promise_impl_t&) = delete;


auto get_state()->state_type*; auto get_state()->state_type*;

suspend_on_initial initial_suspend() noexcept; suspend_on_initial initial_suspend() noexcept;
suspend_on_final final_suspend() noexcept; suspend_on_final final_suspend() noexcept;
template <typename _Uty>
_Uty&& await_transform(_Uty&& _Whatever);
void set_exception(std::exception_ptr e); void set_exception(std::exception_ptr e);
#ifdef __clang__ #ifdef __clang__
void unhandled_exception(); //If the coroutine ends with an uncaught exception, it performs the following: void unhandled_exception(); //If the coroutine ends with an uncaught exception, it performs the following:

+ 11
- 0
librf/src/promise.inl View File

return {}; return {};
} }


template <typename _Ty>
template <typename _Uty>
_Uty&& promise_impl_t<_Ty>::await_transform(_Uty&& _Whatever)
{
if constexpr (is_future_v<_Uty> || is_awaitable_v<_Uty>)
{
_Whatever._state->set_scheduler(get_state()->get_scheduler());
}
return std::forward<_Uty>(_Whatever);
}

template <typename _Ty> template <typename _Ty>
inline void promise_impl_t<_Ty>::set_exception(std::exception_ptr e) inline void promise_impl_t<_Ty>::set_exception(std::exception_ptr e)
{ {

+ 6
- 39
librf/src/scheduler.cpp View File

void scheduler_t::new_task(task_base_t * task) void scheduler_t::new_task(task_base_t * task)
{ {
state_base_t* sptr = task->get_state(); state_base_t* sptr = task->get_state();
{
scoped_lock<spinlock> __guard(_lock_ready);
this->_ready_task.emplace(sptr, task);
}
//如果是单独的future,没有被co_await过,则handler是nullptr。
sptr->set_scheduler(this); sptr->set_scheduler(this);
if (sptr->has_handler())
this->add_initial(sptr);
}
void scheduler_t::add_initial(state_base_t* sptr)
{
scoped_lock<spinlock, spinlock> __guard(_lock_ready, _lock_running);
_ready_task.try_emplace(sptr, nullptr);
_runing_states.emplace_back(sptr);
}
void scheduler_t::add_await(state_base_t* sptr)
{
if (sptr->is_ready())
{ {
scoped_lock<spinlock> __guard(_lock_running);
_runing_states.emplace_back(sptr);
scoped_lock<spinlock, spinlock> __guard(_lock_ready, _lock_running);
_ready_task.emplace(sptr, task);
} }
}
void scheduler_t::add_ready(state_base_t* sptr)
{
assert(sptr->is_ready());
//如果是单独的future,没有被co_await过,则handler是nullptr。
if (sptr->has_handler()) if (sptr->has_handler())
{ {
scoped_lock<spinlock> __guard(_lock_running);
_runing_states.emplace_back(sptr);
add_generator(sptr);
} }
} }
void scheduler_t::del_final(state_base_t* sptr) void scheduler_t::del_final(state_base_t* sptr)
{ {
{
scoped_lock<spinlock> __guard(_lock_ready);
this->_ready_task.erase(sptr);
}
if (sptr->has_handler())
{
scoped_lock<spinlock> __guard(_lock_running);
_runing_states.emplace_back(sptr);
}
scoped_lock<spinlock> __guard(_lock_ready);
this->_ready_task.erase(sptr);
} }
std::unique_ptr<task_base_t> scheduler_t::del_switch(state_base_t* sptr) std::unique_ptr<task_base_t> scheduler_t::del_switch(state_base_t* sptr)

+ 0
- 3
librf/src/scheduler.h View File

return _timer.get(); return _timer.get();
} }
void add_initial(state_base_t* sptr);
void add_await(state_base_t* sptr);
void add_ready(state_base_t* sptr);
void add_generator(state_base_t* sptr); void add_generator(state_base_t* sptr);
void del_final(state_base_t* sptr); void del_final(state_base_t* sptr);
std::unique_ptr<task_base_t> del_switch(state_base_t* sptr); std::unique_ptr<task_base_t> del_switch(state_base_t* sptr);

+ 17
- 22
librf/src/state.cpp View File

} }
} }
bool state_generator_t::is_ready() const
{
return (bool)_coro && !_coro.done();
}
bool state_generator_t::has_handler() const bool state_generator_t::has_handler() const
{ {
return (bool)_coro; return (bool)_coro;
bool state_future_t::has_handler() const bool state_future_t::has_handler() const
{ {
scoped_lock<lock_type> __guard(_mtx); scoped_lock<lock_type> __guard(_mtx);
return (bool)_coro || _is_initor != initor_type::None;
}
bool state_future_t::is_ready() const
{
scoped_lock<lock_type> __guard(this->_mtx);
return _exception != nullptr || _has_value.load(std::memory_order_acquire) || !_is_awaitor;
return has_handler_skip_lock();
} }
void state_future_t::set_exception(std::exception_ptr e) void state_future_t::set_exception(std::exception_ptr e)
{ {
{
scoped_lock<lock_type> __guard(this->_mtx);
this->_exception = std::move(e);
}
scoped_lock<lock_type> __guard(this->_mtx);
this->_exception = std::move(e);
scheduler_t* sch = this->get_scheduler(); scheduler_t* sch = this->get_scheduler();
if (sch != nullptr) if (sch != nullptr)
sch->add_ready(this);
{
if (this->has_handler_skip_lock())
sch->add_generator(this);
else
sch->del_final(this);
}
} }
bool state_future_t::switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<> handler) bool state_future_t::switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<> handler)
void state_t<void>::set_value() void state_t<void>::set_value()
{ {
{
scoped_lock<lock_type> __guard(this->_mtx);
this->_has_value.store(true, std::memory_order_release);
}
scoped_lock<lock_type> __guard(this->_mtx);
this->_has_value.store(true, std::memory_order_release);
scheduler_t* sch = this->get_scheduler(); scheduler_t* sch = this->get_scheduler();
if (sch != nullptr) if (sch != nullptr)
sch->add_ready(this);
{
if (this->has_handler_skip_lock())
sch->add_generator(this);
else
sch->del_final(this);
}
} }
} }

+ 15
- 9
librf/src/state.h View File

public: public:
virtual void resume() = 0; virtual void resume() = 0;
virtual bool has_handler() const = 0; virtual bool has_handler() const = 0;
virtual bool is_ready() const = 0;
virtual bool switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<> handler) = 0; virtual bool switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<> handler) = 0;
void set_scheduler(scheduler_t* sch) void set_scheduler(scheduler_t* sch)
public: public:
virtual void resume() override; virtual void resume() override;
virtual bool has_handler() const override; virtual bool has_handler() const override;
virtual bool is_ready() const override;
virtual bool switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<> handler) override; virtual bool switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<> handler) override;
void set_initial_suspend(coroutine_handle<> handler) void set_initial_suspend(coroutine_handle<> handler)
virtual void destroy_deallocate() override; virtual void destroy_deallocate() override;
virtual void resume() override; virtual void resume() override;
virtual bool has_handler() const override; virtual bool has_handler() const override;
virtual bool is_ready() const override;
virtual bool switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<> handler) override; virtual bool switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<> handler) override;
inline bool is_ready() const
{
return _exception != nullptr || _has_value.load(std::memory_order_acquire) || !_is_awaitor;
}
inline bool has_handler_skip_lock() const
{
return (bool)_coro || _is_initor != initor_type::None;
}
scheduler_t* get_scheduler() const
inline scheduler_t* get_scheduler() const
{ {
return _parent ? _parent->get_scheduler() : _scheduler; return _parent ? _parent->get_scheduler() : _scheduler;
} }
state_base_t * get_parent() const
inline state_base_t * get_parent() const
{ {
return _parent; return _parent;
} }
uint32_t get_alloc_size() const
inline uint32_t get_alloc_size() const
{ {
return _alloc_size; return _alloc_size;
} }
void set_exception(std::exception_ptr e); void set_exception(std::exception_ptr e);
template<class _Exp> template<class _Exp>
void throw_exception(_Exp e)
inline void throw_exception(_Exp e)
{ {
set_exception(std::make_exception_ptr(std::move(e))); set_exception(std::make_exception_ptr(std::move(e)));
} }
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>> template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void future_await_suspend(coroutine_handle<_PromiseT> handler); void future_await_suspend(coroutine_handle<_PromiseT> handler);
bool future_await_ready()
inline bool future_await_ready()
{ {
//scoped_lock<lock_type> __guard(this->_mtx); //scoped_lock<lock_type> __guard(this->_mtx);
return _has_value.load(std::memory_order_acquire); return _has_value.load(std::memory_order_acquire);
void promise_final_suspend(coroutine_handle<_PromiseT> handler); void promise_final_suspend(coroutine_handle<_PromiseT> handler);
template<class _Sty> template<class _Sty>
static _Sty* _Alloc_state(bool awaitor)
static inline _Sty* _Alloc_state(bool awaitor)
{ {
_Alloc_char _Al; _Alloc_char _Al;
size_t _Size = sizeof(_Sty); size_t _Size = sizeof(_Sty);

+ 53
- 55
librf/src/state.inl View File

template<class _PromiseT, typename _Enable> template<class _PromiseT, typename _Enable>
void state_future_t::promise_final_suspend(coroutine_handle<_PromiseT> handler) void state_future_t::promise_final_suspend(coroutine_handle<_PromiseT> handler)
{ {
{
scoped_lock<lock_type> __guard(this->_mtx);
scoped_lock<lock_type> __guard(this->_mtx);


this->_initor = handler;
this->_is_initor = initor_type::Final;
}
this->_initor = handler;
this->_is_initor = initor_type::Final;


scheduler_t* sch = this->get_scheduler(); scheduler_t* sch = this->get_scheduler();
assert(sch != nullptr); assert(sch != nullptr);

if (this->has_handler_skip_lock())
sch->add_generator(this);
sch->del_final(this); sch->del_final(this);
} }


auto* parent_state = promise.get_state(); auto* parent_state = promise.get_state();
scheduler_t* sch = parent_state->get_scheduler(); scheduler_t* sch = parent_state->get_scheduler();


{
scoped_lock<lock_type> __guard(this->_mtx);

if (this != parent_state)
{
this->_parent = parent_state;
this->_scheduler = sch;
}
scoped_lock<lock_type> __guard(this->_mtx);


if (!this->_coro)
this->_coro = handler;
if (this != parent_state)
{
this->_parent = parent_state;
this->_scheduler = sch;
} }


if (sch != nullptr)
sch->add_await(this);
if (!this->_coro)
this->_coro = handler;

if (sch != nullptr && this->is_ready())
sch->add_generator(this);
} }


template<class _PromiseT, typename _Enable > template<class _PromiseT, typename _Enable >
{ {
coroutine_handle<_PromiseT> handler = coroutine_handle<_PromiseT>::from_promise(*promise); coroutine_handle<_PromiseT> handler = coroutine_handle<_PromiseT>::from_promise(*promise);


{
scoped_lock<lock_type> __guard(this->_mtx);

if (!handler.done())
{
if (!this->_coro)
this->_coro = handler;
}
scoped_lock<lock_type> __guard(this->_mtx);


this->_has_value.store(true, std::memory_order_release);
if (!handler.done())
{
if (!this->_coro)
this->_coro = handler;
} }


this->_has_value.store(true, std::memory_order_release);

if (!handler.done()) if (!handler.done())
{ {
scheduler_t* sch = this->get_scheduler(); scheduler_t* sch = this->get_scheduler();
{ {
coroutine_handle<_PromiseT> handler = coroutine_handle<_PromiseT>::from_promise(*promise); coroutine_handle<_PromiseT> handler = coroutine_handle<_PromiseT>::from_promise(*promise);


scoped_lock<lock_type> __guard(this->_mtx);

if (!handler.done())
{ {
scoped_lock<lock_type> __guard(this->_mtx);

if (!handler.done())
{
if (this->_coro == nullptr)
this->_coro = handler;
}

if (this->_has_value.load(std::memory_order_acquire))
{
*this->cast_value_ptr() = std::forward<U>(val);
}
else
{
new (this->cast_value_ptr()) value_type(std::forward<U>(val));
this->_has_value.store(true, std::memory_order_release);
}
if (this->_coro == nullptr)
this->_coro = handler;
}

if (this->_has_value.load(std::memory_order_acquire))
{
*this->cast_value_ptr() = std::forward<U>(val);
}
else
{
new (this->cast_value_ptr()) value_type(std::forward<U>(val));
this->_has_value.store(true, std::memory_order_release);
} }


if (!handler.done()) if (!handler.done())
template<typename U> template<typename U>
void state_t<_Ty>::set_value(U&& val) void state_t<_Ty>::set_value(U&& val)
{ {
{
scoped_lock<lock_type> __guard(this->_mtx);
scoped_lock<lock_type> __guard(this->_mtx);


if (this->_has_value.load(std::memory_order_acquire))
{
*this->cast_value_ptr() = std::forward<U>(val);
}
else
{
new (this->cast_value_ptr()) value_type(std::forward<U>(val));
this->_has_value.store(true, std::memory_order_release);
}
if (this->_has_value.load(std::memory_order_acquire))
{
*this->cast_value_ptr() = std::forward<U>(val);
}
else
{
new (this->cast_value_ptr()) value_type(std::forward<U>(val));
this->_has_value.store(true, std::memory_order_release);
} }


scheduler_t* sch = this->get_scheduler(); scheduler_t* sch = this->get_scheduler();
if (sch != nullptr) if (sch != nullptr)
sch->add_ready(this);
{
if (this->has_handler_skip_lock())
sch->add_generator(this);
else
sch->del_final(this);
}
} }
} }



+ 0
- 1
librf/src/type_traits.inl View File



template<class _Ty> template<class _Ty>
constexpr bool is_await_suspend_v = is_future_v<_Ty> constexpr bool is_await_suspend_v = is_future_v<_Ty>
|| is_generator_v<_Ty>
|| is_awaitable_v<_Ty> || is_awaitable_v<_Ty>
|| std::is_same_v<remove_cvref_t<_Ty>, switch_scheduler_t> || std::is_same_v<remove_cvref_t<_Ty>, switch_scheduler_t>
; ;

+ 5
- 1
tutorial/test_async_cb.cpp View File

void resumable_main_cb() void resumable_main_cb()
{ {
std::cout << std::this_thread::get_id() << std::endl;
//由于使用者可能不能明确的区分是resume function返回的awaitor还是awaitable function返回的awaitor
//导致均有可能加入到协程里去调度。
//所以,协程调度器应该需要能处理这种情况。
go async_get_long(3);
resumef::this_scheduler()->run_until_notask();
GO GO
{ {

+ 34
- 0
tutorial/test_async_channel.cpp View File

#include "librf.h" #include "librf.h"
using namespace resumef; using namespace resumef;
using namespace std::chrono;
const size_t MAX_CHANNEL_QUEUE = 5; //0, 1, 5, 10, -1 const size_t MAX_CHANNEL_QUEUE = 5; //0, 1, 5, 10, -1
this_scheduler()->run_until_notask(); this_scheduler()->run_until_notask();
} }
static const int N = 1000000;
void test_channel_performance()
{
channel_t<int> c{1};
go[&]() -> future_t<>
{
for (int i = N - 1; i >= 0; --i)
{
co_await(c << i);
}
};
go[&]() -> future_t<>
{
auto tstart = high_resolution_clock::now();
int i;
do
{
i = co_await c;
} while (i > 0);
auto dt = duration_cast<duration<double>>(high_resolution_clock::now() - tstart).count();
std::cout << "channel w/r " << N << " times, cost time " << dt << "s" << std::endl;
};
this_scheduler()->run_until_notask();
}
void resumable_main_channel() void resumable_main_channel()
{ {
test_channel_read_first(); test_channel_read_first();
std::cout << std::endl; std::cout << std::endl;
test_channel_write_first(); test_channel_write_first();
std::cout << std::endl;
test_channel_performance();
} }

+ 18
- 0
tutorial/test_async_yield_return.cpp View File

co_yield_void; co_yield_void;
} }
auto test_yield_future() -> future_t<int64_t>
{
std::cout << "future 1 will yield return" << std::endl;
co_yield 1;
std::cout << "future 2 will yield return" << std::endl;
co_yield 2;
std::cout << "future 3 will yield return" << std::endl;
co_yield 3;
std::cout << "future 4 will return" << std::endl;
co_return 4;
std::cout << "future 5 will never yield return" << std::endl;
co_yield 5;
}
void resumable_main_yield_return() void resumable_main_yield_return()
{ {
for (int i : test_yield_int()) for (int i : test_yield_int())
go test_yield_void(); go test_yield_void();
this_scheduler()->run_until_notask(); this_scheduler()->run_until_notask();
go test_yield_future();
this_scheduler()->run_until_notask();
} }

+ 4
- 4
vs_proj/librf.cpp View File

extern void resumable_main_layout(); extern void resumable_main_layout();
extern void resumable_main_switch_scheduler(); extern void resumable_main_switch_scheduler();
extern void resumable_main_benchmark_mem();
extern void resumable_main_benchmark_mem(bool wait_key);
extern void benchmark_main_channel_passing_next(); extern void benchmark_main_channel_passing_next();
extern void resumable_main_benchmark_asio_server(); extern void resumable_main_benchmark_asio_server();
extern void resumable_main_benchmark_asio_client(intptr_t nNum); extern void resumable_main_benchmark_asio_client(intptr_t nNum);
{ {
(void)argc; (void)argc;
(void)argv; (void)argv;
resumable_main_layout();
return 0;
//resumable_main_resumable();
//return 0;
//if (argc > 1) //if (argc > 1)
// resumable_main_benchmark_asio_client(atoi(argv[1])); // resumable_main_benchmark_asio_client(atoi(argv[1]));
resumable_main_dynamic_go(); resumable_main_dynamic_go();
resumable_main_multi_thread(); resumable_main_multi_thread();
resumable_main_timer(); resumable_main_timer();
resumable_main_benchmark_mem();
resumable_main_benchmark_mem(false);
resumable_main_mutex(); resumable_main_mutex();
resumable_main_event(); resumable_main_event();
resumable_main_event_timeout(); resumable_main_event_timeout();

Loading…
Cancel
Save