Browse Source

完善mutex_v2代码

tags/v2.9.7
tearshark 4 years ago
parent
commit
8af94ea0fc
7 changed files with 131 additions and 63 deletions
  1. 13
    7
      README.md
  2. 7
    3
      librf/src/mutex_v2.cpp
  3. 5
    3
      librf/src/mutex_v2.h
  4. 24
    23
      librf/src/mutex_v2.inl
  5. 77
    22
      tutorial/test_async_mutex.cpp
  6. 2
    2
      vs_proj/librf.cpp
  7. 3
    3
      vs_proj/librf.vcxproj

+ 13
- 7
README.md View File

@@ -1,7 +1,12 @@
# librf 2.4
# librf 2.8

### librf - 协程库

2020-03-18 更新:
更新event/mutex/when_all/when_any实现。至此,2.x版本完整恢复1.x版本的所有功能。
版本号提升至 2.8.0。
3.0之前,只打算做修复BUG相关的工作。
3.0的目标,是根据executor的设计,重写scheduler代码。
2020-03-08 更新:
更新channel实现,效率提高了近三倍。
channel的新的实现方法,为event/mutex指明了新的修改方向。
@@ -13,17 +18,18 @@

目前仅支持:

Windows (使用VS2017/VS2019/clang编译)
Windows (使用VS2017/VS2019/clang 9编译)
Android (使用NDK 20.1 自带的clang编译)


librf有以下特点:

* 1.基于C++17提案'Stackless Resumable Functions'编写的非对称stackless协程库,可以以同步的方式编写简单的代码,同时获得异步的性能
* 2.理论上支持海量协程, 创建100万个协程只需使用<430M>物理内存
* 3.提供协程锁(mutex), 定时器, channel等特性, 帮助用户更加容易地编写程序
* 4.可以很好的跟asio,libuv等库结合,能跟现有的callback范式的异步/延迟代码很好的结合
* 5.目前还处于实验状态,不对今后正式的C++ Coroutines支持有任何正式的承诺
* 1.基于C++20提案'Stackless Resumable Functions'编写的非对称stackless协程库,可以以同步的方式编写简单的代码,同时获得异步的性能
* 2.理论上支持海量协程, 创建1000万个协程只需使用<2.2G>物理内存(使用clang编译)
* 3.拥有极小的协程调度,在I7 8100 3.6GHz的CPU上,1000个协程的平均切换开销是32纳秒
* 4.提供协程锁(mutex), 定时器, channel, event等特性, 帮助用户更加容易地编写程序
* 5.可以很好的跟asio, libuv等库结合,能跟现有的callback范式的异步/延迟代码很好的结合
* 6.目前已处于较为完善状态,已经小规模在生产项目中使用。不出意外,2.8以上版本就是C++20 Coroutines对应的版本

* 如果你发现了任何bug、有好的建议、或使用上有不明之处,可以提交到issue,也可以直接联系作者:
email: tearshark@163.net QQ交流群: 296561497

+ 7
- 3
librf/src/mutex_v2.cpp View File

@@ -50,7 +50,7 @@ RESUMEF_NS
}
}

bool mutex_v2_impl::try_lock(void* sch) noexcept
bool mutex_v2_impl::try_lock(void* sch)
{
scoped_lock<detail::mutex_v2_impl::lock_type> lock_(_lock);
return try_lock_lockless(sch);
@@ -59,11 +59,15 @@ RESUMEF_NS
bool mutex_v2_impl::try_lock_lockless(void* sch) noexcept
{
void* oldValue = _owner.load(std::memory_order_relaxed);
if (oldValue == nullptr || oldValue == sch)
if (oldValue == nullptr)
{
_owner.store(sch, std::memory_order_relaxed);
_counter.fetch_add(1, std::memory_order_relaxed);

return true;
}
if (oldValue == sch)
{
_counter.fetch_add(1, std::memory_order_relaxed);
return true;
}
return false;

+ 5
- 3
librf/src/mutex_v2.h View File

@@ -11,6 +11,7 @@ RESUMEF_NS
{
struct scoped_lock_mutex_t;

//支持递归的锁
struct mutex_t
{
typedef std::shared_ptr<detail::mutex_v2_impl> mutex_impl_ptr;
@@ -22,11 +23,12 @@ RESUMEF_NS

struct [[nodiscard]] awaiter;

awaiter operator co_await() const noexcept;
awaiter lock() const noexcept;

scoped_lock_mutex_t lock(scheduler_t* sch) const noexcept;
bool try_lock(scheduler_t* sch) const noexcept;
void unlock(scheduler_t* sch) const noexcept;
scoped_lock_mutex_t lock(void* unique_address) const;
bool try_lock(void* unique_address) const;
void unlock(void* unique_address) const;

mutex_t(const mutex_t&) = default;
mutex_t(mutex_t&&) = default;

+ 24
- 23
librf/src/mutex_v2.inl View File

@@ -39,7 +39,7 @@ RESUMEF_NS
return _owner.load(std::memory_order_relaxed);
}

bool try_lock(void* sch) noexcept; //内部加锁
bool try_lock(void* sch); //内部加锁
bool unlock(void* sch); //内部加锁
void lock_until_succeed(void* sch); //内部加锁
public:
@@ -74,26 +74,26 @@ RESUMEF_NS

//此函数,应该在try_lock()获得锁后使用
//或者在协程里,由awaiter使用
scoped_lock_mutex_t(std::adopt_lock_t, detail::mutex_v2_impl* mtx, void* sch)
: _mutex(mtx->shared_from_this())
scoped_lock_mutex_t(std::adopt_lock_t, mutex_impl_ptr mtx, void* sch)
: _mutex(std::move(mtx))
, _owner(sch)
{}

//此函数,适合在非协程里使用
scoped_lock_mutex_t(detail::mutex_v2_impl* mtx, void* sch)
: _mutex(mtx->shared_from_this())
scoped_lock_mutex_t(mutex_impl_ptr mtx, void* sch)
: _mutex(std::move(mtx))
, _owner(sch)
{
if (sch != nullptr)
if (_mutex != nullptr)
_mutex->lock_until_succeed(sch);
}


scoped_lock_mutex_t(std::adopt_lock_t, const mutex_t& mtx, void* sch)
: scoped_lock_mutex_t(std::adopt_lock, mtx._mutex.get(), sch)
: scoped_lock_mutex_t(std::adopt_lock, mtx._mutex, sch)
{}
scoped_lock_mutex_t(const mutex_t& mtx, void* sch)
: scoped_lock_mutex_t(mtx._mutex.get(), sch)
: scoped_lock_mutex_t(mtx._mutex, sch)
{}

~scoped_lock_mutex_t()
@@ -123,9 +123,10 @@ RESUMEF_NS

struct [[nodiscard]] mutex_t::awaiter
{
awaiter(detail::mutex_v2_impl* evt) noexcept
: _mutex(evt)
awaiter(detail::mutex_v2_impl* mtx) noexcept
: _mutex(mtx)
{
assert(_mutex != nullptr);
}

~awaiter() noexcept(false)
@@ -163,7 +164,7 @@ RESUMEF_NS

scoped_lock_mutex_t await_resume() noexcept
{
detail::mutex_v2_impl* mtx = _mutex;
mutex_impl_ptr mtx = _root ? _mutex->shared_from_this() : nullptr;
_mutex = nullptr;

return { std::adopt_lock, mtx, _root };
@@ -174,30 +175,30 @@ RESUMEF_NS
state_base_t* _root = nullptr;
};

inline mutex_t::awaiter mutex_t::lock() const noexcept
inline mutex_t::awaiter mutex_t::operator co_await() const noexcept
{
return { _mutex.get() };
}

inline scoped_lock_mutex_t mutex_t::lock(scheduler_t* sch) const noexcept
inline mutex_t::awaiter mutex_t::lock() const noexcept
{
if (sch != nullptr)
_mutex->lock_until_succeed(sch);
return { _mutex.get() };
}

return { std::adopt_lock, _mutex.get(), sch };
inline scoped_lock_mutex_t mutex_t::lock(void* unique_address) const
{
_mutex->lock_until_succeed(unique_address);
return { std::adopt_lock, _mutex, unique_address };
}

inline bool mutex_t::try_lock(scheduler_t* sch) const noexcept
inline bool mutex_t::try_lock(void* unique_address) const
{
if (sch == nullptr)
return false;
return _mutex->try_lock(sch);
return _mutex->try_lock(unique_address);
}

inline void mutex_t::unlock(scheduler_t* sch) const noexcept
inline void mutex_t::unlock(void* unique_address) const
{
assert(sch != nullptr);
_mutex->unlock(sch);
_mutex->unlock(unique_address);
}
}
}

+ 77
- 22
tutorial/test_async_mutex.cpp View File

@@ -9,47 +9,102 @@
#include "librf.h"
using namespace resumef;
using namespace std::chrono;
mutex_t g_lock;
std::deque<size_t> g_queue;
static mutex_t g_lock;
static intptr_t g_counter = 0;
future_t<> test_mutex_pop(size_t idx)
{
using namespace std::chrono;
static const size_t N = 10;
for (size_t i = 0; i < 10; ++i)
//🔒-50ms-🔒🗝🗝-150ms-|
//-------------.........
static future_t<> test_mutex_pop(size_t idx)
{
for (size_t i = 0; i < N / 2; ++i)
{
auto _locker = co_await g_lock.lock();
if (g_queue.size() > 0)
{
size_t val = g_queue.front();
g_queue.pop_front();
auto _locker = co_await g_lock.lock(); //_locker析构后,会调用对应的unlock()函数。
--g_counter;
std::cout << "pop :" << g_counter << " on " << idx << std::endl;
std::cout << val << " on " << idx << std::endl;
co_await 50ms;
auto _locker_2 = co_await g_lock.lock();
--g_counter;
std::cout << "pop :" << g_counter << " on " << idx << std::endl;
}
co_await sleep_for(500ms);
co_await 150ms;
}
}
future_t<> test_mutex_push(size_t idx)
//🔒-50ms-🗝-50ms-🔒-50ms-🗝-50ms-|
//---------........---------.......
static future_t<> test_mutex_push(size_t idx)
{
using namespace std::chrono;
for (size_t i = 0; i < 10; ++i)
for (size_t i = 0; i < N; ++i)
{
auto _locker = co_await g_lock.lock();
g_queue.push_back(i);
std::cout << i << " on " << idx << std::endl;
{
auto _locker = co_await g_lock;
co_await sleep_for(500ms);
++g_counter;
std::cout << "push:" << g_counter << " on " << idx << std::endl;
co_await 50ms;
}
co_await 50ms;
}
}
void resumable_main_mutex()
//🔒-50ms-🗝-50ms-🔒-50ms-🗝-50ms-|
//---------........---------.......
static std::thread test_mutex_async_push(size_t idx)
{
return std::thread([=]
{
char provide_unique_address = 0;
for (size_t i = 0; i < N; ++i)
{
{
auto _locker = g_lock.lock(&provide_unique_address);
++g_counter;
std::cout << "push:" << g_counter << " on " << idx << std::endl;
std::this_thread::sleep_for(50ms);
}
std::this_thread::sleep_for(50ms);
}
});
}
static void resumable_mutex_synch()
{
go test_mutex_push(0);
go test_mutex_pop(1);
this_scheduler()->run_until_notask();
std::cout << "result:" << g_counter << std::endl;
}
static void resumable_mutex_async()
{
auto th = test_mutex_async_push(0);
std::this_thread::sleep_for(25ms);
go test_mutex_pop(1);
this_scheduler()->run_until_notask();
th.join();
std::cout << "result:" << g_counter << std::endl;
}
void resumable_main_mutex()
{
resumable_mutex_synch();
std::cout << std::endl;
resumable_mutex_async();
}

+ 2
- 2
vs_proj/librf.cpp View File

@@ -47,8 +47,8 @@ int main(int argc, const char* argv[])
//resumable_main_event();
//resumable_main_event_timeout();
//resumable_main_sleep();
resumable_main_mutex();
return 0;
resumable_main_resumable();
//return 0;
//if (argc > 1)
// resumable_main_benchmark_asio_client(atoi(argv[1]));

+ 3
- 3
vs_proj/librf.vcxproj View File

@@ -23,7 +23,7 @@
<ProjectGuid>{C1D4A6BD-592F-4E48-8178-7C87219BF80E}</ProjectGuid>
<Keyword>Win32Proj</Keyword>
<RootNamespace>librf</RootNamespace>
<WindowsTargetPlatformVersion>10.0</WindowsTargetPlatformVersion>
<WindowsTargetPlatformVersion>10.0.18362.0</WindowsTargetPlatformVersion>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
@@ -46,7 +46,7 @@
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v142</PlatformToolset>
<PlatformToolset>v141</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>NotSet</CharacterSet>
</PropertyGroup>
@@ -154,7 +154,7 @@
<AdditionalOptions>/await</AdditionalOptions>
<FavorSizeOrSpeed>Size</FavorSizeOrSpeed>
<StringPooling>true</StringPooling>
<LanguageStandard>stdcpplatest</LanguageStandard>
<LanguageStandard>stdcpp17</LanguageStandard>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<SDLCheck>
</SDLCheck>

Loading…
Cancel
Save