Browse Source

完成无死锁mutex批量加锁

tags/v2.9.7
tearshark 4 years ago
parent
commit
e22ae5dc12

+ 1
- 1
librf/src/def.h View File

@@ -1,6 +1,6 @@
#pragma once
#define LIB_RESUMEF_VERSION 20900 // 2.9.0
#define LIB_RESUMEF_VERSION 20901 // 2.9.1
#if defined(RESUMEF_MODULE_EXPORT)
#define RESUMEF_NS export namespace resumef

+ 25
- 13
librf/src/mutex_v2.cpp View File

@@ -40,6 +40,8 @@ RESUMEF_NS

bool state_mutex_t::on_notify(mutex_v2_impl* eptr)
{
assert(eptr != nullptr);

mutex_v2_impl** oldValue = _value.load(std::memory_order_acquire);
if (oldValue != nullptr && _value.compare_exchange_strong(oldValue, nullptr, std::memory_order_acq_rel))
{
@@ -139,12 +141,16 @@ RESUMEF_NS

bool mutex_v2_impl::try_lock(void* sch)
{
assert(sch != nullptr);

scoped_lock<detail::mutex_v2_impl::lock_type> lock_(_lock);
return try_lock_lockless(sch);
}

bool mutex_v2_impl::try_lock_until(clock_type::time_point tp, void* sch)
{
assert(sch != nullptr);

do
{
if (try_lock(sch))
@@ -156,16 +162,18 @@ RESUMEF_NS

bool mutex_v2_impl::try_lock_lockless(void* sch) noexcept
{
void* oldValue = _owner.load(std::memory_order_relaxed);
assert(sch != nullptr);

void* oldValue = _owner.load(std::memory_order_acquire);
if (oldValue == nullptr)
{
_owner.store(sch, std::memory_order_relaxed);
_counter.fetch_add(1, std::memory_order_relaxed);
_owner.store(sch, std::memory_order_release);
_counter.fetch_add(1, std::memory_order_acq_rel);
return true;
}
if (oldValue == sch)
{
_counter.fetch_add(1, std::memory_order_relaxed);
_counter.fetch_add(1, std::memory_order_acq_rel);
return true;
}
return false;
@@ -173,27 +181,31 @@ RESUMEF_NS

bool mutex_v2_impl::unlock(void* sch)
{
assert(sch != nullptr);

scoped_lock<lock_type> lock_(_lock);

void* oldValue = _owner.load(std::memory_order_relaxed);
void* oldValue = _owner.load(std::memory_order_acquire);
if (oldValue == sch)
{
if (_counter.fetch_sub(1, std::memory_order_relaxed) == 1)
if (_counter.fetch_sub(1, std::memory_order_acquire) == 1)
{
_owner.store(nullptr, std::memory_order_relaxed);
_owner.store(nullptr, std::memory_order_release);
while (!_wait_awakes.empty())
{
state_mutex_ptr state = _wait_awakes.front();
_wait_awakes.pop_front();

//先将锁定状态转移到新的state上
_owner.store(state->get_root(), std::memory_order_release);
_counter.fetch_add(1, std::memory_order_acq_rel);
if (state->on_notify(this))
{
//锁定状态转移到新的state上
_owner.store(state->get_root(), std::memory_order_relaxed);
_counter.fetch_add(1, std::memory_order_relaxed);

break;
}

//转移状态失败,恢复成空
_owner.store(nullptr, std::memory_order_release);
_counter.fetch_sub(1, std::memory_order_acq_rel);
}
}


+ 5
- 0
librf/src/mutex_v2.h View File

@@ -22,7 +22,9 @@ RESUMEF_NS
mutex_t(std::adopt_lock_t) noexcept;
~mutex_t() noexcept;

struct lock_awaiter;
struct [[nodiscard]] awaiter;

awaiter/*scoped_lock_mutex_t*/ lock() const noexcept;
awaiter/*scoped_lock_mutex_t*/ operator co_await() const noexcept;

@@ -72,6 +74,9 @@ RESUMEF_NS
>
static void unlock(void* unique_address, _Mtxs&... mtxs);

bool is_locked() const;
auto lock(std::defer_lock_t) const noexcept;

mutex_t(const mutex_t&) = default;
mutex_t(mutex_t&&) = default;
mutex_t& operator = (const mutex_t&) = default;

+ 111
- 70
librf/src/mutex_v2.inl View File

@@ -81,7 +81,7 @@ RESUMEF_NS

inline void* owner() const noexcept
{
return _owner.load(std::memory_order_relaxed);
return _owner.load(std::memory_order_acquire);
}

bool try_lock(void* sch); //内部加锁
@@ -218,23 +218,38 @@ RESUMEF_NS

scoped_lock_mutex_t(const scoped_lock_mutex_t&) = delete;
scoped_lock_mutex_t& operator = (const scoped_lock_mutex_t&) = delete;
scoped_lock_mutex_t(scoped_lock_mutex_t&&) = default;
scoped_lock_mutex_t& operator = (scoped_lock_mutex_t&&) = default;

scoped_lock_mutex_t(scoped_lock_mutex_t&& _Right) noexcept
: _mutex(std::move(_Right._mutex))
, _owner(_Right._owner)
{
assert(_Right._mutex == nullptr);
}

scoped_lock_mutex_t& operator = (scoped_lock_mutex_t&& _Right) noexcept
{
if (this != &_Right)
{
_mutex = std::move(_Right._mutex);
assert(_Right._mutex == nullptr);
_owner = _Right._owner;
}
return *this;
}
private:
mutex_impl_ptr _mutex;
void* _owner;
};


struct [[nodiscard]] mutex_t::awaiter
struct mutex_t::lock_awaiter
{
awaiter(detail::mutex_v2_impl* mtx) noexcept
lock_awaiter(detail::mutex_v2_impl* mtx) noexcept
: _mutex(mtx)
{
assert(_mutex != nullptr);
}

~awaiter() noexcept(false)
~lock_awaiter() noexcept(false)
{
assert(_mutex == nullptr);
if (_mutex != nullptr)
@@ -254,12 +269,8 @@ RESUMEF_NS
_PromiseT& promise = handler.promise();
auto* parent = promise.get_state();
_root = parent->get_root();
if (_root == nullptr)
{
assert(false);
_mutex = nullptr;
return false;
}
assert(_root != nullptr);
assert(_root->get_parent() == nullptr);

scoped_lock<detail::mutex_v2_impl::lock_type> lock_(_mutex->_lock);
if (_mutex->try_lock_lockless(_root))
@@ -272,6 +283,15 @@ RESUMEF_NS

return true;
}
protected:
detail::mutex_v2_impl* _mutex;
counted_ptr<detail::state_mutex_t> _state;
state_base_t* _root = nullptr;
};

struct [[nodiscard]] mutex_t::awaiter : public lock_awaiter
{
using lock_awaiter::lock_awaiter;

scoped_lock_mutex_t await_resume() noexcept
{
@@ -280,10 +300,6 @@ RESUMEF_NS

return { std::adopt_lock, mtx, _root };
}
protected:
detail::mutex_v2_impl* _mutex;
counted_ptr<detail::state_mutex_t> _state;
state_base_t* _root = nullptr;
};

inline mutex_t::awaiter mutex_t::operator co_await() const noexcept
@@ -296,6 +312,24 @@ RESUMEF_NS
return { _mutex.get() };
}

inline auto mutex_t::lock(std::defer_lock_t) const noexcept
{
struct discard_unlock_awaiter : lock_awaiter
{
using lock_awaiter::lock_awaiter;
void await_resume() noexcept
{
_mutex = nullptr;
}
};

return discard_unlock_awaiter{ _mutex.get() };
}

inline bool mutex_t::is_locked() const
{
return _mutex->owner() != nullptr;
}

struct [[nodiscard]] mutex_t::try_awaiter
{
@@ -421,91 +455,58 @@ RESUMEF_NS

inline void mutex_t::lock(void* unique_address) const
{
assert(unique_address != nullptr);
_mutex->lock_until_succeed(unique_address);
}

inline bool mutex_t::try_lock(void* unique_address) const
{
assert(unique_address != nullptr);
return _mutex->try_lock(unique_address);
}

template <class _Rep, class _Period>
inline bool mutex_t::try_lock_for(const std::chrono::duration<_Rep, _Period>& dt, void* unique_address)
{
assert(unique_address != nullptr);
return _mutex->try_lock_until(clock_type::now() + std::chrono::duration_cast<clock_type::duration>(dt), unique_address);
}

template <class _Rep, class _Period>
inline bool mutex_t::try_lock_until(const std::chrono::time_point<_Rep, _Period>& tp, void* unique_address)
{
assert(unique_address != nullptr);
return _mutex->try_lock_until(std::chrono::time_point_cast<clock_type::time_point>(tp), unique_address);
}

inline void mutex_t::unlock(void* unique_address) const
{
assert(unique_address != nullptr);
_mutex->unlock(unique_address);
}

struct [[nodiscard]] scoped_unlock_range_t
{
//此函数,应该在try_lock()获得锁后使用
//或者在协程里,由awaiter使用
scoped_unlock_range_t(std::vector<mutex_t>&& mtxs, void* sch)
: _mutex(std::move(mtxs))
, _owner(sch)
{}

~scoped_unlock_range_t()
{
if (_owner != nullptr)
{
for(mutex_t& mtx : _mutex)
mtx.unlock(_owner);
}
}

inline void unlock() noexcept
{
if (_owner != nullptr)
{
for (mutex_t& mtx : _mutex)
mtx.unlock(_owner);
_owner = nullptr;
}
}

scoped_unlock_range_t(const scoped_unlock_range_t&) = delete;
scoped_unlock_range_t& operator = (const scoped_unlock_range_t&) = delete;
scoped_unlock_range_t(scoped_unlock_range_t&&) = default;
scoped_unlock_range_t& operator = (scoped_unlock_range_t&&) = default;
private:
std::vector<mutex_t> _mutex;
void* _owner;
};

struct mutex_t::_MutexAwaitAssembleT
{
private:
void* _Address;
public:
std::vector<mutex_t> _Lks;
std::vector<mutex_t> _mutex;
void* _owner;

template<class... _Mtxs>
_MutexAwaitAssembleT(void* unique_address, _Mtxs&... mtxs)
: _Address(unique_address)
, _Lks({ mtxs... })
: _mutex({ mtxs... })
, _owner(unique_address)
{}
size_t size() const
{
return _Lks.size();
return _mutex.size();
}
mutex_t& operator[](int _Idx)
{
return _Lks[_Idx];
return _mutex[_Idx];
}
auto _Lock_ref(mutex_t& _LkN) const
{
return _LkN.lock();
return _LkN.lock(std::defer_lock);
}
auto _Try_lock_ref(mutex_t& _LkN) const
{
@@ -513,32 +514,70 @@ RESUMEF_NS
}
void _Unlock_ref(mutex_t& _LkN) const
{
_LkN.unlock(_Address);
_LkN.unlock(_owner);
}
future_t<> _Yield() const
{
for (int cnt = rand() % (1 + _Lks.size()); cnt >= 0; --cnt)
for (int cnt = rand() % (1 + _mutex.size()); cnt >= 0; --cnt)
{
std::this_thread::yield(); //还要考虑多线程里运行的情况
co_await ::resumef::yield();
}
}
future_t<> _ReturnValue() const;
template<class U>
future_t<U> _ReturnValue(U v) const;
};

struct [[nodiscard]] scoped_unlock_range_t
{
mutex_t::_MutexAwaitAssembleT _MAA;

//此函数,应该在try_lock()获得锁后使用
//或者在协程里,由awaiter使用
template<class... _Mtxs>
scoped_unlock_range_t(void* unique_address, _Mtxs&&... mtxs)
: _MAA(unique_address, std::forward<_Mtxs>(mtxs)...)
{}

~scoped_unlock_range_t()
{
if (_MAA._owner != nullptr)
{
for(mutex_t& mtx : _MAA._mutex)
mtx.unlock(_MAA._owner);
}
}

inline void unlock() noexcept
{
if (_MAA._owner != nullptr)
{
for (mutex_t& mtx : _MAA._mutex)
mtx.unlock(_MAA._owner);
_MAA._owner = nullptr;
}
}

scoped_unlock_range_t(const scoped_unlock_range_t&) = delete;
scoped_unlock_range_t& operator = (const scoped_unlock_range_t&) = delete;
scoped_unlock_range_t(scoped_unlock_range_t&& _Right) noexcept = default;
scoped_unlock_range_t& operator = (scoped_unlock_range_t&& _Right) noexcept = default;
};

template<class... _Mtxs, typename>
inline future_t<scoped_unlock_range_t> mutex_t::lock(_Mtxs&... mtxs)
{
auto* root = root_state();
_MutexAwaitAssembleT MAA(root, mtxs...);
co_await detail::mutex_lock_await_lock_impl::_Lock_range(MAA);

co_return scoped_unlock_range_t{ std::move(MAA._Lks), root };
scoped_unlock_range_t unlock_guard{ root_state(), mtxs... };
co_await detail::mutex_lock_await_lock_impl::_Lock_range(unlock_guard._MAA);
co_return unlock_guard;
}


template<class... _Mtxs, typename>
inline scoped_unlock_range_t mutex_t::lock(void* unique_address, _Mtxs&... mtxs)
{
assert(unique_address != nullptr);

detail::_MutexAddressAssembleT MAA(unique_address, mtxs...);
detail::scoped_lock_range_lock_impl::_Lock_range(MAA);
@@ -548,6 +587,8 @@ RESUMEF_NS
template<class... _Mtxs, typename>
inline void mutex_t::unlock(void* unique_address, _Mtxs&... mtxs)
{
assert(unique_address != nullptr);

int _Ignored[] = { (mtxs.unlock(unique_address), 0)... };
(void)_Ignored;
}

+ 3
- 3
librf/src/without_deadlock_assemble.inl View File

@@ -40,7 +40,7 @@ struct LOCK_ASSEMBLE_NAME(lock_impl)
->decltype(_LkN._ReturnValue(123))
{
// attempt to lock 3 or more locks, starting by locking _LkN[_Hard_lock] and trying to lock the rest
(void)LOCK_ASSEMBLE_AWAIT(_LkN._Lock_ref(_LkN[_Hard_lock]));
LOCK_ASSEMBLE_AWAIT(_LkN._Lock_ref(_LkN[_Hard_lock]));
int _Failed = -1;
int _Backout_start = _Hard_lock; // that is, unlock _Hard_lock
@@ -82,7 +82,7 @@ struct LOCK_ASSEMBLE_NAME(lock_impl)
->decltype(_LkN._ReturnValue(false))
{
// attempt to lock 2 locks, by first locking _Lk0, and then trying to lock _Lk1 returns whether to try again
(void)LOCK_ASSEMBLE_AWAIT(_LkN._Lock_ref(_LkN[_Idx0]));
LOCK_ASSEMBLE_AWAIT(_LkN._Lock_ref(_LkN[_Idx0]));
try {
if (LOCK_ASSEMBLE_AWAIT(_LkN._Try_lock_ref(_LkN[_Idx1])))
LOCK_ASSEMBLE_RETURN(false);
@@ -116,7 +116,7 @@ struct LOCK_ASSEMBLE_NAME(lock_impl)
}
else if (lockes.size() == 1)
{
(void)LOCK_ASSEMBLE_AWAIT(lockes._Lock_ref(lockes[0]));
LOCK_ASSEMBLE_AWAIT(lockes._Lock_ref(lockes[0]));
}
else if (lockes.size() == 2)
{

+ 16
- 10
tutorial/test_async_mutex.cpp View File

@@ -143,27 +143,33 @@ static void resumable_mutex_async()
static future_t<> resumable_mutex_range_push(size_t idx, mutex_t a, mutex_t b, mutex_t c)
{
for (int i = 0; i < 10; ++i)
for (int i = 0; i < 1000001; ++i)
{
auto __lockers = co_await mutex_t::lock(a, b, c);
assert(a.is_locked());
assert(b.is_locked());
assert(c.is_locked());
++g_counter;
std::cout << "push:" << g_counter << " on " << idx << std::endl;
//std::cout << "push:" << g_counter << " on " << idx << std::endl;
co_await 5ms;
//co_await 5ms;
}
}
static future_t<> resumable_mutex_range_pop(size_t idx, mutex_t a, mutex_t b, mutex_t c)
{
for (int i = 0; i < 10; ++i)
for (int i = 0; i < 1000000; ++i)
{
auto __lockers = co_await mutex_t::lock(a, b, c);
assert(a.is_locked());
assert(b.is_locked());
assert(c.is_locked());
--g_counter;
std::cout << "pop :" << g_counter << " on " << idx << std::endl;
//std::cout << "pop :" << g_counter << " on " << idx << std::endl;
co_await 5ms;
//co_await 5ms;
}
}
@@ -193,11 +199,11 @@ static void resumable_mutex_lock_range()
void resumable_main_mutex()
{
resumable_mutex_synch();
std::cout << std::endl;
//resumable_mutex_synch();
//std::cout << std::endl;
resumable_mutex_async();
std::cout << std::endl;
//resumable_mutex_async();
//std::cout << std::endl;
resumable_mutex_lock_range();
}

Loading…
Cancel
Save