Browse Source

增加yield()支持

可以获得调用链的跟节点
切换协程方法的修改
mutex_t增加try_lock()实现
tags/v2.9.7
tearshark 4 years ago
parent
commit
18ff251233

+ 1
- 0
librf/librf.h View File

@@ -60,6 +60,7 @@
#include "src/switch_scheduler.h"
#include "src/current_scheduler.h"
#include "src/yield.h"
#include "src/sleep.h"
#include "src/when.h"

+ 30
- 1
librf/src/current_scheduler.h View File

@@ -22,7 +22,7 @@ RESUMEF_NS
return _scheduler;
}
private:
scheduler_t* _scheduler = nullptr;
scheduler_t* _scheduler;
};
inline get_current_scheduler_awaitor get_current_scheduler()
@@ -30,4 +30,33 @@ RESUMEF_NS
return {};
}
struct get_root_state_awaitor
{
bool await_ready() const noexcept
{
return false;
}
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
_PromiseT& promise = handler.promise();
auto* parent = promise.get_state();
this->_state = parent->get_root();
return false;
}
state_base_t* await_resume() const noexcept
{
return _state;
}
private:
state_base_t* _state;
};
inline get_root_state_awaitor get_root_state()
{
return {};
}
}

+ 1
- 1
librf/src/def.h View File

@@ -1,6 +1,6 @@
#pragma once
#define LIB_RESUMEF_VERSION 20800 // 2.8.0
#define LIB_RESUMEF_VERSION 20802 // 2.8.2
#if defined(RESUMEF_MODULE_EXPORT)
#define RESUMEF_NS export namespace resumef

+ 1
- 0
librf/src/macro_def.inl View File

@@ -14,3 +14,4 @@
#endif

#define current_scheduler() (co_await ::resumef::get_current_scheduler())
#define root_state() (co_await ::resumef::get_root_state())

+ 11
- 0
librf/src/mutex_v2.cpp View File

@@ -56,6 +56,17 @@ RESUMEF_NS
return try_lock_lockless(sch);
}

bool mutex_v2_impl::try_lock_until(clock_type::time_point tp, void* sch)
{
do
{
if (try_lock(sch))
return true;
std::this_thread::yield();
} while (clock_type::now() <= tp);
return false;
}

bool mutex_v2_impl::try_lock_lockless(void* sch) noexcept
{
void* oldValue = _owner.load(std::memory_order_relaxed);

+ 25
- 4
librf/src/mutex_v2.h View File

@@ -9,7 +9,7 @@ RESUMEF_NS

inline namespace mutex_v2
{
struct scoped_lock_mutex_t;
struct [[nodiscard]] scoped_lock_mutex_t;

//支持递归的锁
struct mutex_t
@@ -22,12 +22,33 @@ RESUMEF_NS
~mutex_t() noexcept;

struct [[nodiscard]] awaiter;

awaiter operator co_await() const noexcept;
awaiter lock() const noexcept;
awaiter operator co_await() const noexcept;

struct [[nodiscard]] try_awaiter;
//co_await try_lock()获得是否加锁成功。此操作无论成功与否都会立即返回。
//如果加锁成功,则需要调用co_await unlock()解锁。或者使用unlock(root_state())解锁。
//如果加锁失败,且要循环尝试加锁,则最好调用co_await yield()让出一次调度。否则,可能造成本调度器死循环。
try_awaiter try_lock() const noexcept;

//此操作立即返回
struct [[nodiscard]] unlock_awaiter;
unlock_awaiter unlock() const noexcept;


struct [[nodiscard]] timeout_awaiter;
template <class _Rep, class _Period>
timeout_awaiter try_lock_for(const std::chrono::duration<_Rep, _Period>& dt) const noexcept;
template <class _Rep, class _Period>
timeout_awaiter try_lock_until(const std::chrono::time_point<_Rep, _Period>& tp) const noexcept;


scoped_lock_mutex_t lock(void* unique_address) const;
void lock(void* unique_address) const;
bool try_lock(void* unique_address) const;
template <class _Rep, class _Period>
bool try_lock_for(const std::chrono::duration<_Rep, _Period>& dt, void* unique_address);
template <class _Rep, class _Period>
bool try_lock_until(const std::chrono::time_point<_Rep, _Period>& tp, void* unique_address);
void unlock(void* unique_address) const;

mutex_t(const mutex_t&) = default;

+ 169
- 3
librf/src/mutex_v2.inl View File

@@ -32,6 +32,8 @@ RESUMEF_NS
//做成递归锁?
struct mutex_v2_impl : public std::enable_shared_from_this<mutex_v2_impl>
{
using clock_type = std::chrono::system_clock;

mutex_v2_impl() {}

inline void* owner() const noexcept
@@ -40,6 +42,7 @@ RESUMEF_NS
}

bool try_lock(void* sch); //内部加锁
bool try_lock_until(clock_type::time_point tp, void* sch); //内部加锁
bool unlock(void* sch); //内部加锁
void lock_until_succeed(void* sch); //内部加锁
public:
@@ -68,10 +71,12 @@ RESUMEF_NS

inline namespace mutex_v2
{
struct scoped_lock_mutex_t
struct [[nodiscard]] scoped_lock_mutex_t
{
typedef std::shared_ptr<detail::mutex_v2_impl> mutex_impl_ptr;

scoped_lock_mutex_t() {}

//此函数,应该在try_lock()获得锁后使用
//或者在协程里,由awaiter使用
scoped_lock_mutex_t(std::adopt_lock_t, mutex_impl_ptr mtx, void* sch)
@@ -111,6 +116,11 @@ RESUMEF_NS
}
}

inline bool is_locked() const noexcept
{
return _mutex != nullptr && _mutex->owner() == _owner;
}

scoped_lock_mutex_t(const scoped_lock_mutex_t&) = delete;
scoped_lock_mutex_t& operator = (const scoped_lock_mutex_t&) = delete;
scoped_lock_mutex_t(scoped_lock_mutex_t&&) = default;
@@ -185,10 +195,154 @@ RESUMEF_NS
return { _mutex.get() };
}

inline scoped_lock_mutex_t mutex_t::lock(void* unique_address) const

struct [[nodiscard]] mutex_t::try_awaiter
{
try_awaiter(detail::mutex_v2_impl* mtx) noexcept
: _mutex(mtx)
{
assert(_mutex != nullptr);
}

bool await_ready() noexcept
{
return false;
}

template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
_PromiseT& promise = handler.promise();
auto* parent = promise.get_state();
if (!_mutex->try_lock(parent->get_root()))
_mutex = nullptr;

return false;
}

bool await_resume() noexcept
{
return _mutex != nullptr;
}
protected:
detail::mutex_v2_impl* _mutex;
};

inline mutex_t::try_awaiter mutex_t::try_lock() const noexcept
{
return { _mutex.get() };
}

struct [[nodiscard]] mutex_t::unlock_awaiter
{
unlock_awaiter(detail::mutex_v2_impl* mtx) noexcept
: _mutex(mtx)
{
assert(_mutex != nullptr);
}

bool await_ready() noexcept
{
return false;
}

template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
_PromiseT& promise = handler.promise();
auto* parent = promise.get_state();
_mutex->unlock(parent->get_root());

return false;
}

void await_resume() noexcept
{
}
protected:
detail::mutex_v2_impl* _mutex;
};

inline mutex_t::unlock_awaiter mutex_t::unlock() const noexcept
{
return { _mutex.get() };
}



struct [[nodiscard]] mutex_t::timeout_awaiter
{
timeout_awaiter(detail::mutex_v2_impl* mtx, clock_type::time_point tp) noexcept
: _mutex(mtx)
, _tp(tp)
{
assert(_mutex != nullptr);
}

~timeout_awaiter() noexcept(false)
{
assert(_mutex == nullptr);
if (_mutex != nullptr)
{
throw lock_exception(error_code::not_await_lock);
}
}

bool await_ready() noexcept
{
return false;
}

template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
_PromiseT& promise = handler.promise();
auto* parent = promise.get_state();
_root = parent->get_root();

scoped_lock<detail::mutex_v2_impl::lock_type> lock_(_mutex->_lock);
if (_mutex->try_lock_lockless(_root))
return false;

_state = new detail::state_mutex_t();
_state->on_await_suspend(handler, parent->get_scheduler(), _root);

_mutex->add_wait_list_lockless(_state.get());

return true;
}

scoped_lock_mutex_t await_resume() noexcept
{
mutex_impl_ptr mtx = _root ? _mutex->shared_from_this() : nullptr;
_mutex = nullptr;

return { std::adopt_lock, mtx, _root };
}
protected:
detail::mutex_v2_impl* _mutex;
clock_type::time_point _tp;
counted_ptr<detail::state_mutex_t> _state;
state_base_t* _root = nullptr;
};

template <class _Rep, class _Period>
inline mutex_t::timeout_awaiter mutex_t::try_lock_until(const std::chrono::time_point<_Rep, _Period>& tp) const noexcept
{
return { _mutex.get(), tp };
}

template <class _Rep, class _Period>
inline mutex_t::timeout_awaiter mutex_t::try_lock_for(const std::chrono::duration<_Rep, _Period>& dt) const noexcept
{
auto tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt);
return { _mutex.get(), tp };
}


inline void mutex_t::lock(void* unique_address) const
{
_mutex->lock_until_succeed(unique_address);
return { std::adopt_lock, _mutex, unique_address };
}

inline bool mutex_t::try_lock(void* unique_address) const
@@ -196,6 +350,18 @@ RESUMEF_NS
return _mutex->try_lock(unique_address);
}

template <class _Rep, class _Period>
inline bool mutex_t::try_lock_for(const std::chrono::duration<_Rep, _Period>& dt, void* unique_address)
{
return _mutex->try_lock_until(clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt), unique_address);
}

template <class _Rep, class _Period>
inline bool mutex_t::try_lock_until(const std::chrono::time_point<_Rep, _Period>& tp, void* unique_address)
{
return _mutex->try_lock_until(std::chrono::time_point_cast<clock_type::time_point>(tp), unique_address);
}

inline void mutex_t::unlock(void* unique_address) const
{
_mutex->unlock(unique_address);

+ 3
- 11
librf/src/state.cpp View File

@@ -69,7 +69,7 @@ RESUMEF_NS
return (bool)_coro;
}
bool state_generator_t::switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<>)
bool state_generator_t::switch_scheduler_await_suspend(scheduler_t* sch)
{
assert(sch != nullptr);
@@ -88,8 +88,6 @@ RESUMEF_NS
_scheduler = sch;
}
sch->add_generator(this);
return true;
}
@@ -143,7 +141,7 @@ RESUMEF_NS
return has_handler_skip_lock();
}
bool state_future_t::switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<> handler)
bool state_future_t::switch_scheduler_await_suspend(scheduler_t* sch)
{
assert(sch != nullptr);
scoped_lock<lock_type> __guard(this->_mtx);
@@ -164,13 +162,7 @@ RESUMEF_NS
}
if (_parent != nullptr)
_parent->switch_scheduler_await_suspend(sch, nullptr);
if (handler)
{
_coro = handler;
sch->add_generator(this);
}
_parent->switch_scheduler_await_suspend(sch);
return true;
}

+ 6
- 7
librf/src/state.h View File

@@ -55,6 +55,10 @@ RESUMEF_NS
}
return root;
}
scheduler_t* get_scheduler() const
{
return get_root()->_scheduler;
}
};
struct state_generator_t : public state_base_t
@@ -65,7 +69,7 @@ RESUMEF_NS
virtual void resume() override;
virtual bool has_handler() const noexcept override;
bool switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<> handler);
bool switch_scheduler_await_suspend(scheduler_t* sch);
void set_initial_suspend(coroutine_handle<> handler)
{
@@ -158,11 +162,6 @@ RESUMEF_NS
return (bool)_coro || _is_initor != initor_type::None;
}
inline scheduler_t* get_scheduler() const
{
return _parent ? _parent->get_scheduler() : _scheduler;
}
inline uint32_t get_alloc_size() const noexcept
{
return _alloc_size;
@@ -176,7 +175,7 @@ RESUMEF_NS
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
void future_await_suspend(coroutine_handle<_PromiseT> handler);
bool switch_scheduler_await_suspend(scheduler_t* sch, coroutine_handle<> handler);
bool switch_scheduler_await_suspend(scheduler_t* sch);
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
void promise_initial_suspend(coroutine_handle<_PromiseT> handler);

+ 14
- 2
librf/src/switch_scheduler.h View File

@@ -4,6 +4,11 @@ RESUMEF_NS
{
struct switch_scheduler_awaitor
{
using value_type = void;
using state_type = state_t<value_type>;
using promise_type = promise_t<value_type>;
using lock_type = typename state_type::lock_type;

switch_scheduler_awaitor(scheduler_t* sch)
:_scheduler(sch) {}
switch_scheduler_awaitor(const switch_scheduler_awaitor&) = default;
@@ -21,9 +26,16 @@ RESUMEF_NS
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
_PromiseT& promise = handler.promise();

auto* sptr = promise.get_state();
return sptr->switch_scheduler_await_suspend(_scheduler, handler);
if (sptr->switch_scheduler_await_suspend(_scheduler))
{
counted_ptr<state_t<void>> _state = state_future_t::_Alloc_state<state_type>(true);
_state->set_value();
_state->future_await_suspend(handler);

return true;
}
return false;
}

void await_resume() noexcept

+ 35
- 0
librf/src/yield.h View File

@@ -0,0 +1,35 @@
#pragma once

RESUMEF_NS
{
struct yield_awaitor
{
using value_type = void;
using state_type = state_t<value_type>;
using promise_type = promise_t<value_type>;
using lock_type = typename state_type::lock_type;

bool await_ready() const noexcept
{
return false;
}
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
counted_ptr<state_t<void>> _state = state_future_t::_Alloc_state<state_type>(true);
_state->set_value();
_state->future_await_suspend(handler);

return true;
}
void await_resume() const noexcept
{
}
};

inline yield_awaitor yield()
{
return {};
}

}

+ 26
- 4
tutorial/test_async_mutex.cpp View File

@@ -30,7 +30,7 @@ static future_t<> test_mutex_pop(size_t idx)
co_await 50ms;
auto _locker_2 = co_await g_lock.lock();
auto _locker_2 = co_await g_lock;
--g_counter;
std::cout << "pop :" << g_counter << " on " << idx << std::endl;
@@ -41,12 +41,13 @@ static future_t<> test_mutex_pop(size_t idx)
//🔒-50ms-🗝-50ms-🔒-50ms-🗝-50ms-|
//---------........---------.......
//方法之一
static future_t<> test_mutex_push(size_t idx)
{
for (size_t i = 0; i < N; ++i)
{
{
auto _locker = co_await g_lock;
auto _locker = co_await g_lock.lock();
++g_counter;
std::cout << "push:" << g_counter << " on " << idx << std::endl;
@@ -57,6 +58,24 @@ static future_t<> test_mutex_push(size_t idx)
}
}
static future_t<> test_mutex_try_push(size_t idx)
{
for (size_t i = 0; i < N; ++i)
{
{
while (!co_await g_lock.try_lock())
co_await yield();
++g_counter;
std::cout << "push:" << g_counter << " on " << idx << std::endl;
co_await 50ms;
co_await g_lock.unlock();
}
co_await 50ms;
}
}
//🔒-50ms-🗝-50ms-🔒-50ms-🗝-50ms-|
//---------........---------.......
static std::thread test_mutex_async_push(size_t idx)
@@ -66,12 +85,15 @@ static std::thread test_mutex_async_push(size_t idx)
char provide_unique_address = 0;
for (size_t i = 0; i < N; ++i)
{
if (g_lock.try_lock_for(500ms, &provide_unique_address))
{
auto _locker = g_lock.lock(&provide_unique_address);
//scoped_lock_mutex_t _locker(std::adopt_lock, g_lock, &provide_unique_address);
++g_counter;
std::cout << "push:" << g_counter << " on " << idx << std::endl;
std::this_thread::sleep_for(50ms);
g_lock.unlock(&provide_unique_address);
}
std::this_thread::sleep_for(50ms);
@@ -81,7 +103,7 @@ static std::thread test_mutex_async_push(size_t idx)
static void resumable_mutex_synch()
{
go test_mutex_push(0);
go test_mutex_try_push(0);
go test_mutex_pop(1);
this_scheduler()->run_until_notask();

+ 1
- 2
vs_proj/librf.cpp View File

@@ -43,8 +43,7 @@ int main(int argc, const char* argv[])
//test_ring_queue<resumef::ring_queue_spinlock<int, false, uint32_t>>();
//test_ring_queue<resumef::ring_queue_lockfree<int, uint64_t>>();
resumable_main_channel_mult_thread();
resumable_main_channel();
resumable_main_mutex();
return 0;
//if (argc > 1)

+ 1
- 0
vs_proj/librf.vcxproj View File

@@ -252,6 +252,7 @@
<ClInclude Include="..\librf\src\unix\coroutine.h" />
<ClInclude Include="..\librf\src\when.h" />
<ClInclude Include="..\librf\src\when_v2.h" />
<ClInclude Include="..\librf\src\yield.h" />
<ClInclude Include="..\librf\src\_awaker.h" />
<ClInclude Include="..\tutorial\test_ring_queue.h" />
<ClInclude Include="dcas.h" />

+ 3
- 0
vs_proj/librf.vcxproj.filters View File

@@ -237,6 +237,9 @@
<ClInclude Include="..\librf\src\mutex_v2.h">
<Filter>librf\src</Filter>
</ClInclude>
<ClInclude Include="..\librf\src\yield.h">
<Filter>librf\src</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<None Include="..\librf\src\asio_task_1.12.0.inl">

Loading…
Cancel
Save