Browse Source

mutex_v2增加超时等待功能

修改无死锁锁定多个锁的算法,以便于支持mutex_v2
tags/v2.9.7
tearshark 4 years ago
parent
commit
fb1d8d39e5

+ 2
- 2
librf/librf.h View File

@@ -40,10 +40,10 @@
#include "src/def.h"
#include "src/macro_def.inl"
#include "src/spinlock.h"
#include "src/counted_ptr.h"
#include "src/type_traits.inl"
#include "src/type_concept.inl"
#include "src/spinlock.h"
#include "src/state.h"
#include "src/future.h"
#include "src/promise.h"
@@ -65,8 +65,8 @@
#include "src/when.h"
#include "src/_awaker.h"
#include "src/mutex.h"
#include "src/ring_queue.h"
#include "src/intrusive_link_queue.h"
#include "src/channel.h"
#include "src/event.h"
#include "src/mutex.h"

+ 1
- 1
librf/src/def.h View File

@@ -1,6 +1,6 @@
#pragma once
#define LIB_RESUMEF_VERSION 20802 // 2.8.2
#define LIB_RESUMEF_VERSION 20803 // 2.8.3
#if defined(RESUMEF_MODULE_EXPORT)
#define RESUMEF_NS export namespace resumef

+ 0
- 6
librf/src/event_v2.h View File

@@ -38,12 +38,6 @@ RESUMEF_NS
timeout_awaiter wait_until(const std::chrono::time_point<_Clock, _Duration>& tp) const noexcept;


//以下wait_any/wait_all实现,借助when_any/when_all实现。
//其接口尽管兼容了v1版本的event_t,但是,其内部细节并没有兼容
//v1版本的event_t的wait_any,确保只触发了其中之一,或者超时
//而when_any会导致所有的event_t都被触发
//改日有空再补上

template<class _Iter>
struct [[nodiscard]] any_awaiter;


+ 17
- 27
librf/src/event_v2.inl View File

@@ -69,7 +69,6 @@ RESUMEF_NS
virtual bool on_notify(event_v2_impl* eptr) = 0;
virtual bool on_timeout() = 0;
//将自己加入到通知链表里
template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
scheduler_t* on_await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
@@ -83,8 +82,19 @@ RESUMEF_NS
return sch;
}
//为浸入式单向链表提供的next指针
inline void add_timeout_timer(std::chrono::system_clock::time_point tp)
{
this->_thandler = this->_scheduler->timer()->add_handler(tp,
[st = counted_ptr<state_event_base_t>{ this }](bool canceld)
{
if (!canceld)
st->on_timeout();
});
}
//为侵入式单向链表提供的next指针
//counted_ptr<state_event_base_t> _next = nullptr;
timer_handler _thandler;
};
struct state_event_t : public state_event_base_t
@@ -96,10 +106,6 @@ RESUMEF_NS
virtual void on_cancel() noexcept override;
virtual bool on_notify(event_v2_impl* eptr) override;
virtual bool on_timeout() override;
public:
//typedef spinlock lock_type;
timer_handler _thandler;
protected:
//_value引用awaitor保存的值,这样可以尽可能减少创建state的可能。而不必进入没有state就没有value实体被用于返回。
//在调用on_notify()或on_timeout()任意之一后,置为nullptr。
@@ -118,12 +124,7 @@ RESUMEF_NS
virtual void on_cancel() noexcept override;
virtual bool on_notify(event_v2_impl* eptr) override;
virtual bool on_timeout() override;
public:
//typedef spinlock lock_type;
//为浸入式单向链表提供的next指针
//counted_ptr<state_event_t> _next = nullptr;
timer_handler _thandler;
std::atomic<intptr_t> _counter;
protected:
bool* _value;
@@ -202,7 +203,7 @@ RESUMEF_NS
struct event_t::timeout_awaitor_impl : public _Btype
{
template<class... Args>
timeout_awaitor_impl(clock_type::time_point tp, Args... args)
timeout_awaitor_impl(clock_type::time_point tp, Args&&... args) noexcept(std::is_nothrow_constructible_v<_Btype, Args&&...>)
: _Btype(std::forward<Args>(args)...)
, _tp(tp)
{}
@@ -211,29 +212,18 @@ RESUMEF_NS
{
if (!_Btype::await_suspend(handler))
return false;
_PromiseT& promise = handler.promise();
auto* parent_state = promise.get_state();
scheduler_t* sch = parent_state->get_scheduler();
this->_state->_thandler = sch->timer()->add_handler(_tp, [st = this->_state](bool canceld)
{
if (!canceld)
st->on_timeout();
});
this->_state->add_timeout_timer(_tp);
return true;
}
protected:
clock_type::time_point _tp;
};
struct [[nodiscard]] event_t::timeout_awaiter : event_t::timeout_awaitor_impl<event_t::awaiter>
struct [[nodiscard]] event_t::timeout_awaiter : timeout_awaitor_impl<event_t::awaiter>
{
timeout_awaiter(clock_type::time_point tp, detail::event_v2_impl* evt) noexcept
: timeout_awaitor_impl(tp, evt)
{
}
: timeout_awaitor_impl<event_t::awaiter>(tp, evt)
{}
};
template<class _Rep, class _Period>

+ 103
- 16
librf/src/mutex_v2.cpp View File

@@ -4,7 +4,7 @@ RESUMEF_NS
{
namespace detail
{
void state_mutex_t::resume()
void state_mutex_base_t::resume()
{
coroutine_handle<> handler = _coro;
if (handler)
@@ -15,28 +15,115 @@ RESUMEF_NS
}
}

bool state_mutex_t::has_handler() const noexcept
bool state_mutex_base_t::has_handler() const noexcept
{
return (bool)_coro;
}
state_base_t* state_mutex_t::get_parent() const noexcept
state_base_t* state_mutex_base_t::get_parent() const noexcept
{
return _root;
}

bool state_mutex_t::on_notify()

void state_mutex_t::on_cancel() noexcept
{
mutex_v2_impl** oldValue = _value.load(std::memory_order_acquire);
if (oldValue != nullptr && _value.compare_exchange_strong(oldValue, nullptr, std::memory_order_acq_rel))
{
*oldValue = nullptr;
_thandler.stop();

this->_coro = nullptr;
}
}

bool state_mutex_t::on_notify(mutex_v2_impl* eptr)
{
mutex_v2_impl** oldValue = _value.load(std::memory_order_acquire);
if (oldValue != nullptr && _value.compare_exchange_strong(oldValue, nullptr, std::memory_order_acq_rel))
{
*oldValue = eptr;
_thandler.stop();

assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);

return true;
}
return false;
}

bool state_mutex_t::on_timeout()
{
mutex_v2_impl** oldValue = _value.load(std::memory_order_acquire);
if (oldValue != nullptr && _value.compare_exchange_strong(oldValue, nullptr, std::memory_order_acq_rel))
{
*oldValue = nullptr;
_thandler.reset();

assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);

return true;
}
return false;
}


void state_mutex_all_t::on_cancel() noexcept
{
intptr_t oldValue = _counter.load(std::memory_order_acquire);
if (oldValue >= 0 && _counter.compare_exchange_strong(oldValue, -1, std::memory_order_acq_rel))
{
*_value = false;
_thandler.stop();

this->_coro = nullptr;
}
}

bool state_mutex_all_t::on_notify(event_v2_impl*)
{
assert(this->_scheduler != nullptr);
if (this->_coro)
intptr_t oldValue = _counter.load(std::memory_order_acquire);
if (oldValue <= 0) return false;

oldValue = _counter.fetch_add(-1, std::memory_order_acq_rel);
if (oldValue == 1)
{
this->_scheduler->add_generator(this);
*_value = true;
_thandler.stop();

assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);

return true;
}

return oldValue >= 1;
}

bool state_mutex_all_t::on_timeout()
{
intptr_t oldValue = _counter.load(std::memory_order_acquire);
if (oldValue >= 0 && _counter.compare_exchange_strong(oldValue, -1, std::memory_order_acq_rel))
{
*_value = false;
_thandler.reset();

assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);

return true;
}
return false;
}


void mutex_v2_impl::lock_until_succeed(void* sch)
{
@@ -93,20 +180,20 @@ RESUMEF_NS
{
if (_counter.fetch_sub(1, std::memory_order_relaxed) == 1)
{
if (!_wait_awakes.empty())
_owner.store(nullptr, std::memory_order_relaxed);
while (!_wait_awakes.empty())
{
state_mutex_ptr state = _wait_awakes.front();
_wait_awakes.pop_front();

//锁定状态转移到新的state上
_owner.store(state->get_root(), std::memory_order_relaxed);
_counter.fetch_add(1, std::memory_order_relaxed);
if (state->on_notify(this))
{
//锁定状态转移到新的state上
_owner.store(state->get_root(), std::memory_order_relaxed);
_counter.fetch_add(1, std::memory_order_relaxed);

state->on_notify();
}
else
{
_owner.store(nullptr, std::memory_order_relaxed);
break;
}
}
}


+ 6
- 6
librf/src/mutex_v2.h View File

@@ -22,25 +22,25 @@ RESUMEF_NS
~mutex_t() noexcept;

struct [[nodiscard]] awaiter;
awaiter lock() const noexcept;
awaiter operator co_await() const noexcept;
awaiter/*scoped_lock_mutex_t*/ lock() const noexcept;
awaiter/*scoped_lock_mutex_t*/ operator co_await() const noexcept;

struct [[nodiscard]] try_awaiter;
//co_await try_lock()获得是否加锁成功。此操作无论成功与否都会立即返回。
//如果加锁成功,则需要调用co_await unlock()解锁。或者使用unlock(root_state())解锁。
//如果加锁失败,且要循环尝试加锁,则最好调用co_await yield()让出一次调度。否则,可能造成本调度器死循环。
try_awaiter try_lock() const noexcept;
try_awaiter/*bool*/ try_lock() const noexcept;

//此操作立即返回
struct [[nodiscard]] unlock_awaiter;
unlock_awaiter unlock() const noexcept;
unlock_awaiter/*void*/ unlock() const noexcept;


struct [[nodiscard]] timeout_awaiter;
template <class _Rep, class _Period>
timeout_awaiter try_lock_for(const std::chrono::duration<_Rep, _Period>& dt) const noexcept;
timeout_awaiter/*bool*/ try_lock_for(const std::chrono::duration<_Rep, _Period>& dt) const noexcept;
template <class _Rep, class _Period>
timeout_awaiter try_lock_until(const std::chrono::time_point<_Rep, _Period>& tp) const noexcept;
timeout_awaiter/*bool*/ try_lock_until(const std::chrono::time_point<_Rep, _Period>& tp) const noexcept;


void lock(void* unique_address) const;

+ 87
- 60
librf/src/mutex_v2.inl View File

@@ -4,13 +4,15 @@ RESUMEF_NS
{
namespace detail
{
struct state_mutex_t : public state_base_t
struct state_mutex_base_t : public state_base_t
{
virtual void resume() override;
virtual bool has_handler() const noexcept override;
virtual state_base_t* get_parent() const noexcept override;

bool on_notify();
virtual void on_cancel() noexcept = 0;
virtual bool on_notify(mutex_v2_impl* eptr) = 0;
virtual bool on_timeout() = 0;

inline scheduler_t* get_scheduler() const noexcept
{
@@ -24,12 +26,53 @@ RESUMEF_NS
this->_root = root;
}

private:
inline void add_timeout_timer(std::chrono::system_clock::time_point tp)
{
this->_thandler = this->_scheduler->timer()->add_handler(tp,
[st = counted_ptr<state_mutex_base_t>{ this }](bool canceld)
{
if (!canceld)
st->on_timeout();
});
}

timer_handler _thandler;
protected:
state_base_t* _root;
//friend mutex_v2::mutex_t;
};

//做成递归锁?
struct state_mutex_t : public state_mutex_base_t
{
state_mutex_t(mutex_v2_impl*& val)
: _value(&val)
{}

virtual void on_cancel() noexcept override;
virtual bool on_notify(mutex_v2_impl* eptr) override;
virtual bool on_timeout() override;
public:
timer_handler _thandler;
protected:
std::atomic<mutex_v2_impl**> _value;
};

struct state_mutex_all_t : public state_event_base_t
{
state_mutex_all_t(intptr_t count, bool& val)
: _counter(count)
, _value(&val)
{}

virtual void on_cancel() noexcept override;
virtual bool on_notify(event_v2_impl* eptr) override;
virtual bool on_timeout() override;
public:
timer_handler _thandler;
std::atomic<intptr_t> _counter;
protected:
bool* _value;
};

struct mutex_v2_impl : public std::enable_shared_from_this<mutex_v2_impl>
{
using clock_type = std::chrono::system_clock;
@@ -159,12 +202,18 @@ RESUMEF_NS
_PromiseT& promise = handler.promise();
auto* parent = promise.get_state();
_root = parent->get_root();
if (_root == nullptr)
{
assert(false);
_mutex = nullptr;
return false;
}

scoped_lock<detail::mutex_v2_impl::lock_type> lock_(_mutex->_lock);
if (_mutex->try_lock_lockless(_root))
return false;

_state = new detail::state_mutex_t();
_state = new detail::state_mutex_t(_mutex);
_state->on_await_suspend(handler, parent->get_scheduler(), _root);

_mutex->add_wait_list_lockless(_state.get());
@@ -174,7 +223,7 @@ RESUMEF_NS

scoped_lock_mutex_t await_resume() noexcept
{
mutex_impl_ptr mtx = _root ? _mutex->shared_from_this() : nullptr;
mutex_impl_ptr mtx = _mutex ? _mutex->shared_from_this() : nullptr;
_mutex = nullptr;

return { std::adopt_lock, mtx, _root };
@@ -203,6 +252,14 @@ RESUMEF_NS
{
assert(_mutex != nullptr);
}
~try_awaiter() noexcept(false)
{
assert(_mutex == nullptr);
if (_mutex != nullptr)
{
throw lock_exception(error_code::not_await_lock);
}
}

bool await_ready() noexcept
{
@@ -222,7 +279,9 @@ RESUMEF_NS

bool await_resume() noexcept
{
return _mutex != nullptr;
detail::mutex_v2_impl* mtx = _mutex;
_mutex = nullptr;
return mtx != nullptr;
}
protected:
detail::mutex_v2_impl* _mutex;
@@ -240,6 +299,14 @@ RESUMEF_NS
{
assert(_mutex != nullptr);
}
~unlock_awaiter() noexcept(false)
{
assert(_mutex == nullptr);
if (_mutex != nullptr)
{
throw lock_exception(error_code::not_await_lock);
}
}

bool await_ready() noexcept
{
@@ -258,6 +325,7 @@ RESUMEF_NS

void await_resume() noexcept
{
_mutex = nullptr;
}
protected:
detail::mutex_v2_impl* _mutex;
@@ -270,73 +338,32 @@ RESUMEF_NS



struct [[nodiscard]] mutex_t::timeout_awaiter
{
timeout_awaiter(detail::mutex_v2_impl* mtx, clock_type::time_point tp) noexcept
: _mutex(mtx)
, _tp(tp)
{
assert(_mutex != nullptr);
}

~timeout_awaiter() noexcept(false)
{
assert(_mutex == nullptr);
if (_mutex != nullptr)
{
throw lock_exception(error_code::not_await_lock);
}
}

bool await_ready() noexcept
{
return false;
}

template<class _PromiseT, typename = std::enable_if_t<traits::is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
_PromiseT& promise = handler.promise();
auto* parent = promise.get_state();
_root = parent->get_root();

scoped_lock<detail::mutex_v2_impl::lock_type> lock_(_mutex->_lock);
if (_mutex->try_lock_lockless(_root))
return false;

_state = new detail::state_mutex_t();
_state->on_await_suspend(handler, parent->get_scheduler(), _root);

_mutex->add_wait_list_lockless(_state.get());

return true;
}
struct [[nodiscard]] mutex_t::timeout_awaiter : public event_t::timeout_awaitor_impl<awaiter>
{
timeout_awaiter(clock_type::time_point tp, detail::mutex_v2_impl * mtx) noexcept
: event_t::timeout_awaitor_impl<mutex_t::awaiter>(tp, mtx)
{}

scoped_lock_mutex_t await_resume() noexcept
bool await_resume() noexcept
{
mutex_impl_ptr mtx = _root ? _mutex->shared_from_this() : nullptr;
_mutex = nullptr;

return { std::adopt_lock, mtx, _root };
detail::mutex_v2_impl* mtx = this->_mutex;
this->_mutex = nullptr;
return mtx != nullptr;
}
protected:
detail::mutex_v2_impl* _mutex;
clock_type::time_point _tp;
counted_ptr<detail::state_mutex_t> _state;
state_base_t* _root = nullptr;
};

template <class _Rep, class _Period>
inline mutex_t::timeout_awaiter mutex_t::try_lock_until(const std::chrono::time_point<_Rep, _Period>& tp) const noexcept
{
return { _mutex.get(), tp };
return { tp, _mutex.get() };
}

template <class _Rep, class _Period>
inline mutex_t::timeout_awaiter mutex_t::try_lock_for(const std::chrono::duration<_Rep, _Period>& dt) const noexcept
{
auto tp = clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt);
return { _mutex.get(), tp };
return { tp, _mutex.get() };
}



+ 91
- 135
librf/src/spinlock.h View File

@@ -88,172 +88,127 @@ RESUMEF_NS
};
#endif
namespace detail
{
template<class _Ty>
void _Lock_ref(_Ty& _LkN)
{
_LkN.lock();
}
template<class _Ty>
void _Lock_ref(std::reference_wrapper<_Ty> _LkN)
{
_LkN.get().lock();
}
template<class _Ty>
void _Unlock_ref(_Ty& _LkN)
{
_LkN.unlock();
}
template<class _Ty>
void _Unlock_ref(std::reference_wrapper<_Ty> _LkN)
{
_LkN.get().unlock();
}
template<class _Ty>
bool _Try_lock_ref(_Ty& _LkN)
{
return _LkN.try_lock();
}
template<class _Ty>
bool _Try_lock_ref(std::reference_wrapper<_Ty> _LkN)
#if RESUMEF_ENABLE_CONCEPT
template<typename T>
concept _LockAssembleT = requires(T && v)
{
return _LkN.get().try_lock();
}
template<class _Ty>
void _Lock_from_locks(const int _Target, std::vector<_Ty>& _LkN) { // lock _LkN[_Target]
_Lock_ref(_LkN[_Target]);
}
// FUNCTION TEMPLATE _Try_lock_from_locks
template<class _Ty>
bool _Try_lock_from_locks(const int _Target, std::vector<_Ty>& _LkN) { // try to lock _LkN[_Target]
return _Try_lock_ref(_LkN[_Target]);
}
{ v.size() };
{ v[0] };
{ v._Lock_ref(v[0]) } ->void;
{ v._Try_lock_ref(v[0]) } ->bool;
{ v._Unlock_ref(v[0]) } ->void;
{ v._Yield() };
{ v._ReturnValue() };
{ v._ReturnValue(0) };
requires std::is_integral_v<decltype(v.size())>;
};
#else
#define _LockAssembleT typename
#endif
// FUNCTION TEMPLATE _Unlock_locks
template<class _Ty>
void _Unlock_locks(int _First, int _Last, std::vector<_Ty>& _LkN) noexcept /* terminates */ {
for (; _First != _Last; ++_First) {
_Unlock_ref(_LkN[_First]);
struct _LockVectorAssembleT
{
private:
std::vector<_Ty>& _Lks;
public:
_LockVectorAssembleT(std::vector<_Ty>& _LkN)
: _Lks(_LkN)
{}
size_t size() const
{
return _Lks.size();
}
}
// FUNCTION TEMPLATE try_lock
template<class _Ty>
int _Try_lock_range(const int _First, const int _Last, std::vector<_Ty>& _LkN) {
int _Next = _First;
try {
for (; _Next != _Last; ++_Next) {
if (!_Try_lock_from_locks(_Next, _LkN)) { // try_lock failed, backout
_Unlock_locks(_First, _Next, _LkN);
return _Next;
}
}
_Ty& operator[](int _Idx)
{
return _Lks[_Idx];
}
void _Lock_ref(_Ty& _LkN) const
{
_LkN.lock();
}
catch (...) {
_Unlock_locks(_First, _Next, _LkN);
throw;
bool _Try_lock_ref(_Ty& _LkN) const
{
return _LkN.try_lock();
}
return -1;
}
// FUNCTION TEMPLATE lock
template<class _Ty>
int _Lock_attempt(const int _Hard_lock, std::vector<_Ty>& _LkN) {
// attempt to lock 3 or more locks, starting by locking _LkN[_Hard_lock] and trying to lock the rest
_Lock_from_locks(_Hard_lock, _LkN);
int _Failed = -1;
int _Backout_start = _Hard_lock; // that is, unlock _Hard_lock
try {
_Failed = _Try_lock_range(0, _Hard_lock, _LkN);
if (_Failed == -1) {
_Backout_start = 0; // that is, unlock [0, _Hard_lock] if the next throws
_Failed = _Try_lock_range(_Hard_lock + 1, (int)_LkN.size(), _LkN);
if (_Failed == -1) { // we got all the locks
return -1;
}
}
void _Unlock_ref(_Ty& _LkN) const
{
_LkN.unlock();
}
catch (...) {
_Unlock_locks(_Backout_start, _Hard_lock + 1, _LkN);
throw;
void _Yield() const
{
std::this_thread::yield();
}
// we didn't get all the locks, backout
_Unlock_locks(_Backout_start, _Hard_lock + 1, _LkN);
std::this_thread::yield();
return _Failed;
}
void _ReturnValue() const noexcept {}
template<class U>
U _ReturnValue(U v) const noexcept
{
return v;
}
};
template<class _Ty>
void _Lock_nonmember3(std::vector<_Ty>& _LkN) {
// lock 3 or more locks, without deadlock
int _Hard_lock = 0;
while (_Hard_lock != -1) {
_Hard_lock = _Lock_attempt(_Hard_lock, _LkN);
}
}
template <class _Lock0, class _Lock1>
bool _Lock_attempt_small2(_Lock0& _Lk0, _Lock1& _Lk1)
struct _LockVectorAssembleT<std::reference_wrapper<_Ty>>
{
// attempt to lock 2 locks, by first locking _Lk0, and then trying to lock _Lk1 returns whether to try again
_Lock_ref(_Lk0);
try {
if (_Try_lock_ref(_Lk1))
return false;
private:
std::vector<std::reference_wrapper<_Ty>>& _Lks;
public:
_LockVectorAssembleT(std::vector<std::reference_wrapper<_Ty>>& _LkN)
: _Lks(_LkN)
{}
size_t size() const
{
return _Lks.size();
}
catch (...) {
_Unlock_ref(_Lk0);
throw;
std::reference_wrapper<_Ty> operator[](int _Idx)
{
return _Lks[_Idx];
}
_Unlock_ref(_Lk0);
std::this_thread::yield();
return true;
}
template <class _Lock0, class _Lock1>
void _Lock_nonmember2(_Lock0& _Lk0, _Lock1& _Lk1)
{
// lock 2 locks, without deadlock, special case for better codegen and reduced metaprogramming for common case
while (_Lock_attempt_small2(_Lk0, _Lk1) && _Lock_attempt_small2(_Lk1, _Lk0)) { // keep trying
void _Lock_ref(std::reference_wrapper<_Ty> _LkN) const
{
_LkN.get().lock();
}
}
template<class _Ty>
void _Lock_range(std::vector<_Ty>& lockes)
{
if (lockes.size() == 0)
void _Unlock_ref(std::reference_wrapper<_Ty> _LkN) const
{
_LkN.get().unlock();
}
else if (lockes.size() == 1)
bool _Try_lock_ref(std::reference_wrapper<_Ty> _LkN) const
{
_Lock_ref(lockes[0]);
return _LkN.get().try_lock();
}
else if (lockes.size() == 2)
void _Yield() const
{
_Lock_nonmember2(lockes[0], lockes[1]);
std::this_thread::yield();
}
else
void _ReturnValue() const noexcept {}
template<class U>
U _ReturnValue(U v) const noexcept
{
_Lock_nonmember3(lockes);
return v;
}
}
};
}
}
#define LOCK_ASSEMBLE_AWAIT(a) (a)
#define LOCK_ASSEMBLE_RETURN(a) return (a)
#include "without_deadlock_assemble.inl"
#undef LOCK_ASSEMBLE_AWAIT
#undef LOCK_ASSEMBLE_RETURN
RESUMEF_NS
{
template<class _Ty>
class scoped_lock_range { // class with destructor that unlocks mutexes
public:
explicit scoped_lock_range(std::vector<_Ty>& locks_)
: _LkN(locks_)
{
detail::_Lock_range(locks_);
detail::_LockVectorAssembleT<_Ty> LA{ _LkN };
detail::_Lock_range(LA);
}
explicit scoped_lock_range(std::adopt_lock_t, std::vector<_Ty>& locks_)
@@ -263,7 +218,8 @@ RESUMEF_NS
~scoped_lock_range() noexcept
{
detail::_Unlock_locks(0, (int)_LkN.size(), _LkN);
detail::_LockVectorAssembleT<_Ty> LA{ _LkN };
detail::_Unlock_locks(0, (int)_LkN.size(), LA);
}
scoped_lock_range(const scoped_lock_range&) = delete;

+ 2
- 2
librf/src/type_concept.inl View File

@@ -2,9 +2,9 @@
#ifndef RESUMEF_ENABLE_CONCEPT
#ifdef __cpp_lib_concepts
#define RESUMEF_ENABLE_CONCEPT 0
#define RESUMEF_ENABLE_CONCEPT 1
#else
#define RESUMEF_ENABLE_CONCEPT 0
#define RESUMEF_ENABLE_CONCEPT 1
#endif //#ifdef __cpp_lib_concepts
#endif //#ifndef RESUMEF_ENABLE_CONCEPT

+ 134
- 0
librf/src/without_deadlock_assemble.inl View File

@@ -0,0 +1,134 @@

RESUMEF_NS
{
namespace detail
{
// FUNCTION TEMPLATE _Unlock_locks
template<_LockAssembleT _LA>
auto _Unlock_locks(int _First, int _Last, _LA& _LkN) noexcept /* terminates */
->decltype(_LkN._ReturnValue())
{
for (; _First != _Last; ++_First) {
LOCK_ASSEMBLE_AWAIT(_LkN._Unlock_ref(_LkN[_First]));
}
}
// FUNCTION TEMPLATE try_lock
template<_LockAssembleT _LA>
auto _Try_lock_range(const int _First, const int _Last, _LA& _LkN)
->decltype(_LkN._ReturnValue<int>(0))
{
int _Next = _First;
try {
for (; _Next != _Last; ++_Next)
{
if (!LOCK_ASSEMBLE_AWAIT(_LkN._Try_lock_ref(_LkN[_Next])))
{ // try_lock failed, backout
LOCK_ASSEMBLE_AWAIT(_Unlock_locks(_First, _Next, _LkN));
LOCK_ASSEMBLE_RETURN(_Next);
}
}
}
catch (...) {
LOCK_ASSEMBLE_AWAIT(_Unlock_locks(_First, _Next, _LkN));
throw;
}
LOCK_ASSEMBLE_RETURN(-1);
}
// FUNCTION TEMPLATE lock
template<_LockAssembleT _LA>
auto _Lock_attempt(const int _Hard_lock, _LA& _LkN)
->decltype(_LkN._ReturnValue<int>(0))
{
// attempt to lock 3 or more locks, starting by locking _LkN[_Hard_lock] and trying to lock the rest
LOCK_ASSEMBLE_AWAIT(_LkN._Lock_ref(_LkN[_Hard_lock]));
int _Failed = -1;
int _Backout_start = _Hard_lock; // that is, unlock _Hard_lock
try {
_Failed = LOCK_ASSEMBLE_AWAIT(_Try_lock_range(0, _Hard_lock, _LkN));
if (_Failed == -1)
{
_Backout_start = 0; // that is, unlock [0, _Hard_lock] if the next throws
_Failed = LOCK_ASSEMBLE_AWAIT(_Try_lock_range(_Hard_lock + 1, (int)_LkN.size(), _LkN));
if (_Failed == -1) { // we got all the locks
LOCK_ASSEMBLE_RETURN(-1);
}
}
}
catch (...) {
LOCK_ASSEMBLE_AWAIT(_Unlock_locks(_Backout_start, _Hard_lock + 1, _LkN));
throw;
}
// we didn't get all the locks, backout
LOCK_ASSEMBLE_AWAIT(_Unlock_locks(_Backout_start, _Hard_lock + 1, _LkN));
LOCK_ASSEMBLE_AWAIT(_LkN._Yield());
LOCK_ASSEMBLE_RETURN(_Failed);
}
template<_LockAssembleT _LA>
auto _Lock_nonmember3(_LA& _LkN) ->decltype(_LkN._ReturnValue())
{
// lock 3 or more locks, without deadlock
int _Hard_lock = 0;
while (_Hard_lock != -1) {
_Hard_lock = LOCK_ASSEMBLE_AWAIT(_Lock_attempt(_Hard_lock, _LkN));
}
}
template<_LockAssembleT _LA>
auto _Lock_attempt_small2(_LA& _LkN, const int _Idx0, const int _Idx1)
->decltype(_LkN._ReturnValue<bool>(false))
{
// attempt to lock 2 locks, by first locking _Lk0, and then trying to lock _Lk1 returns whether to try again
LOCK_ASSEMBLE_AWAIT(_LkN._Lock_ref(_LkN[_Idx0]));
try {
if (LOCK_ASSEMBLE_AWAIT(_LkN._Try_lock_ref(_LkN[_Idx1])))
LOCK_ASSEMBLE_RETURN(false);
}
catch (...) {
LOCK_ASSEMBLE_AWAIT(_LkN._Unlock_ref(_LkN[_Idx0]));
throw;
}
LOCK_ASSEMBLE_AWAIT(_LkN._Unlock_ref(_LkN[_Idx0]));
LOCK_ASSEMBLE_AWAIT(_LkN._Yield());
LOCK_ASSEMBLE_RETURN(true);
}
template<_LockAssembleT _LA>
auto _Lock_nonmember2(_LA& _LkN) ->decltype(_LkN._ReturnValue())
{
// lock 2 locks, without deadlock, special case for better codegen and reduced metaprogramming for common case
while (LOCK_ASSEMBLE_AWAIT(_Lock_attempt_small2(_LkN, 0, 1)) &&
LOCK_ASSEMBLE_AWAIT(_Lock_attempt_small2(_LkN, 1, 0)))
{ // keep trying
}
}
template<_LockAssembleT _LA>
auto _Lock_range(_LA& lockes) ->decltype(lockes._ReturnValue())
{
if (lockes.size() == 0)
{
}
else if (lockes.size() == 1)
{
LOCK_ASSEMBLE_AWAIT(lockes._Lock_ref(lockes[0]));
}
else if (lockes.size() == 2)
{
LOCK_ASSEMBLE_AWAIT(_Lock_nonmember2(lockes));
}
else
{
LOCK_ASSEMBLE_AWAIT(_Lock_nonmember3(lockes));
}
}
}
}

+ 19
- 1
tutorial/test_async_mutex.cpp View File

@@ -76,6 +76,24 @@ static future_t<> test_mutex_try_push(size_t idx)
}
}
static future_t<> test_mutex_timeout_push(size_t idx)
{
for (size_t i = 0; i < N; ++i)
{
{
while (!co_await g_lock.try_lock_for(10ms))
co_await yield();
++g_counter;
std::cout << "push:" << g_counter << " on " << idx << std::endl;
co_await 50ms;
co_await g_lock.unlock();
}
co_await 50ms;
}
}
//🔒-50ms-🗝-50ms-🔒-50ms-🗝-50ms-|
//---------........---------.......
static std::thread test_mutex_async_push(size_t idx)
@@ -103,7 +121,7 @@ static std::thread test_mutex_async_push(size_t idx)
static void resumable_mutex_synch()
{
go test_mutex_try_push(0);
go test_mutex_timeout_push(0);
go test_mutex_pop(1);
this_scheduler()->run_until_notask();

+ 1
- 0
vs_proj/librf.cpp View File

@@ -43,6 +43,7 @@ int main(int argc, const char* argv[])
//test_ring_queue<resumef::ring_queue_spinlock<int, false, uint32_t>>();
//test_ring_queue<resumef::ring_queue_lockfree<int, uint64_t>>();
resumable_main_event();
resumable_main_mutex();
return 0;

+ 1
- 0
vs_proj/librf.vcxproj View File

@@ -263,6 +263,7 @@
<None Include="..\librf\src\channel_v2.inl" />
<None Include="..\librf\src\event_v2.inl" />
<None Include="..\librf\src\exception.inl" />
<None Include="..\librf\src\without_deadlock_assemble.inl" />
<None Include="..\librf\src\macro_def.inl" />
<None Include="..\librf\src\mutex_v2.inl" />
<None Include="..\librf\src\promise.inl" />

+ 3
- 0
vs_proj/librf.vcxproj.filters View File

@@ -276,5 +276,8 @@
<None Include="..\librf\src\mutex_v2.inl">
<Filter>librf\src</Filter>
</None>
<None Include="..\librf\src\without_deadlock_assemble.inl">
<Filter>librf\src</Filter>
</None>
</ItemGroup>
</Project>

Loading…
Cancel
Save