Browse Source

新版本的event实现

tags/v2.9.7
tearshark 4 years ago
parent
commit
a5459300b9

+ 1
- 1
benchmark/benchmark_async_mem.cpp View File

@@ -8,7 +8,7 @@
#include "librf.h"
const size_t N = 10000000;
const size_t LOOP_COUNT = 100;
const size_t LOOP_COUNT = 20;
volatile size_t globalValue = 0;

+ 1
- 1
librf/librf.h View File

@@ -56,11 +56,11 @@
#include "src/switch_scheduler.h"
#include "src/_awaker.h"
#include "src/event.h"
#include "src/mutex.h"
#include "src/ring_queue.h"
#include "src/intrusive_link_queue.h"
#include "src/channel.h"
#include "src/event.h"
#include "src/generator.h"
#include "src/sleep.h"

+ 1
- 1
librf/src/def.h View File

@@ -1,6 +1,6 @@
#pragma once
#define LIB_RESUMEF_VERSION 20401 // 2.4.1
#define LIB_RESUMEF_VERSION 20500 // 2.5.0
#if defined(RESUMEF_MODULE_EXPORT)
#define RESUMEF_NS export namespace resumef

+ 1
- 0
librf/src/event.h View File

@@ -2,3 +2,4 @@
#include "event_v1.h"
#include "event_v2.h"
#include "event_v2.inl"

+ 35
- 122
librf/src/event_v2.cpp View File

@@ -4,93 +4,59 @@ RESUMEF_NS
{
namespace detail
{
void event_v2_impl::notify_all() noexcept
void state_event_t::resume()
{
// Needs to be 'release' so that subsequent 'co_await' has
// visibility of our prior writes.
// Needs to be 'acquire' so that we have visibility of prior
// writes by awaiting coroutines.
void* oldValue = m_state.exchange(this, std::memory_order_acq_rel);
if (oldValue != this)
coroutine_handle<> handler = _coro;
if (handler)
{
// Wasn't already in 'set' state.
// Treat old value as head of a linked-list of waiters
// which we have now acquired and need to resume.
state_event_t* state = static_cast<state_event_t*>(oldValue);
while (state != nullptr)
{
// Read m_next before resuming the coroutine as resuming
// the coroutine will likely destroy the awaiter object.
auto* next = state->m_next;
state->on_notify();
state = next;
}
_coro = nullptr;
_scheduler->del_final(this);
handler.resume();
}
}

void event_v2_impl::notify_one() noexcept
bool state_event_t::has_handler() const noexcept
{
// Needs to be 'release' so that subsequent 'co_await' has
// visibility of our prior writes.
// Needs to be 'acquire' so that we have visibility of prior
// writes by awaiting coroutines.
void* oldValue = m_state.exchange(nullptr, std::memory_order_acq_rel);
if (oldValue != this)
{
// Wasn't already in 'set' state.
// Treat old value as head of a linked-list of waiters
// which we have now acquired and need to resume.
state_event_t* state = static_cast<state_event_t*>(oldValue);
if (state != nullptr)
{
// Read m_next before resuming the coroutine as resuming
// the coroutine will likely destroy the awaiter object.
auto* next = state->m_next;
state->on_notify();

if (next != nullptr)
add_notify_list(next);
}
}
return (bool)_coro;
}

bool event_v2_impl::add_notify_list(state_event_t* state) noexcept
void state_event_t::on_notify()
{
// Try to atomically push this awaiter onto the front of the list.
void* oldValue = m_state.load(std::memory_order_acquire);
do
{
// Resume immediately if already in 'set' state.
if (oldValue == this) return false;

// Update linked list to point at current head.
state->m_next = static_cast<state_event_t*>(oldValue);
*_value = true;

// Finally, try to swap the old list head, inserting this awaiter
// as the new list head.
} while (!m_state.compare_exchange_weak(
oldValue,
state,
std::memory_order_release,
std::memory_order_acquire));
assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);
}

return true;
void state_event_t::on_cancel() noexcept
{
*_value = false;
this->_coro = nullptr;
}

void state_event_t::resume()
void event_v2_impl::signal_all() noexcept
{
coroutine_handle<> handler = _coro;
if (handler)
scoped_lock<lock_type> lock_(_lock);

_counter.store(0, std::memory_order_release);

state_event_t* state;
for (; (state = _wait_awakes.try_pop()) != nullptr;)
{
_coro = nullptr;
_scheduler->del_final(this);
handler.resume();
state->on_notify();
}
}

bool state_event_t::has_handler() const noexcept
void event_v2_impl::signal() noexcept
{
return (bool)_coro;
scoped_lock<lock_type> lock_(_lock);

state_event_t* state = _wait_awakes.try_pop();
if (state == nullptr)
_counter.fetch_add(1, std::memory_order_acq_rel);
else
state->on_notify();
}
}

@@ -99,63 +65,10 @@ RESUMEF_NS
event_t::event_t(bool initially)
:_event(std::make_shared<detail::event_v2_impl>(initially))
{

}
}

void async_manual_reset_event::set() noexcept
{
// Needs to be 'release' so that subsequent 'co_await' has
// visibility of our prior writes.
// Needs to be 'acquire' so that we have visibility of prior
// writes by awaiting coroutines.
void* oldValue = m_state.exchange(this, std::memory_order_acq_rel);
if (oldValue != this)
event_t::~event_t()
{
// Wasn't already in 'set' state.
// Treat old value as head of a linked-list of waiters
// which we have now acquired and need to resume.
auto* waiters = static_cast<awaiter*>(oldValue);
while (waiters != nullptr)
{
// Read m_next before resuming the coroutine as resuming
// the coroutine will likely destroy the awaiter object.
auto* next = waiters->m_next;
waiters->m_awaitingCoroutine.resume();
waiters = next;
}
}
}

bool async_manual_reset_event::awaiter::await_suspend(
coroutine_handle<> awaitingCoroutine) noexcept
{
// Special m_state value that indicates the event is in the 'set' state.
const void* const setState = &m_event;

// Remember the handle of the awaiting coroutine.
m_awaitingCoroutine = awaitingCoroutine;

// Try to atomically push this awaiter onto the front of the list.
void* oldValue = m_event.m_state.load(std::memory_order_acquire);
do
{
// Resume immediately if already in 'set' state.
if (oldValue == setState) return false;

// Update linked list to point at current head.
m_next = static_cast<awaiter*>(oldValue);

// Finally, try to swap the old list head, inserting this awaiter
// as the new list head.
} while (!m_event.m_state.compare_exchange_weak(
oldValue,
this,
std::memory_order_release,
std::memory_order_acquire));

// Successfully enqueued. Remain suspended.
return true;
}

}

+ 9
- 180
librf/src/event_v2.h View File

@@ -4,198 +4,27 @@ RESUMEF_NS
{
namespace detail
{
struct state_event_t;

//仿照cppcoro的event是行不通的。
//虽然cppcoro的event的触发和等待之间是线程安全的,但是并不能实现只触发指定数量。并且多线程触发之间是不安全的。
//所以,还得用锁结构来实现(等待实现,今日不空)。
struct event_v2_impl : public std::enable_shared_from_this<event_v2_impl>
{
event_v2_impl(bool initially = false) noexcept
: m_state(initially ? this : nullptr)
{}

// No copying/moving
event_v2_impl(const event_v2_impl&) = delete;
event_v2_impl(event_v2_impl&&) = delete;
event_v2_impl& operator=(const event_v2_impl&) = delete;
event_v2_impl& operator=(event_v2_impl&&) = delete;

bool is_set() const noexcept
{
return m_state.load(std::memory_order_acquire) == this;
}
void reset() noexcept
{
void* oldValue = this;
m_state.compare_exchange_strong(oldValue, nullptr, std::memory_order_acquire);
}
void notify_all() noexcept; //多线程同时调用notify_one/notify_all是非线程安全的
void notify_one() noexcept; //多线程同时调用notify_one/notify_all是非线程安全的

bool add_notify_list(state_event_t* state) noexcept;
private:
mutable std::atomic<void*> m_state; //event_v2_impl or state_event_t
};

struct state_event_t : public state_base_t
{
state_event_t(event_v2_impl* e) noexcept
{
if (e != nullptr)
m_event = e->shared_from_this();
}

virtual void resume() override;
virtual bool has_handler() const noexcept override;

void on_notify()
{
assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);
}

void cancel_timeout()
{
this->_coro = nullptr;
}

//将自己加入到通知链表里
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
bool event_await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
_PromiseT& promise = handler.promise();
auto* parent_state = promise.get_state();
scheduler_t* sch = parent_state->get_scheduler();

this->_scheduler = sch;
this->_coro = handler;

return m_event->add_notify_list(this);
}

private:
friend struct event_v2_impl;

std::shared_ptr<event_v2_impl> m_event;
state_event_t* m_next = nullptr;
};
struct event_v2_impl;
}

namespace event_v2
{
struct event_t
{
typedef std::shared_ptr<detail::event_v2_impl> event_impl_ptr;
typedef std::weak_ptr<detail::event_v2_impl> event_impl_wptr;
typedef std::chrono::system_clock clock_type;
using event_impl_ptr = std::shared_ptr<detail::event_v2_impl>;

event_t(bool initially = false);
~event_t();

void notify_all() const noexcept
{
_event->notify_all();
}
void notify_one() const noexcept
{
_event->notify_one();
}
void reset() const noexcept
{
_event->reset();
}
void signal_all() const noexcept;
void signal() const noexcept;
void reset() const noexcept;

struct awaiter
{
awaiter(detail::event_v2_impl* e) noexcept
: _event(e)
{}

bool await_ready() const noexcept
{
return _event->is_set();
}
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
_state = new detail::state_event_t(_event);
return _state->event_await_suspend(handler);
}
void await_resume() noexcept
{
}
private:
detail::event_v2_impl * _event;
counted_ptr<detail::state_event_t> _state;
};

awaiter operator co_await() const noexcept
{
return { _event.get() };
}
struct [[nodiscard]] awaiter;
awaiter operator co_await() const noexcept;
awaiter wait() const noexcept;
private:
event_impl_ptr _event;
};
}

class async_manual_reset_event
{
public:
async_manual_reset_event(bool initiallySet = false) noexcept
: m_state(initiallySet ? this : nullptr)
{}

// No copying/moving
async_manual_reset_event(const async_manual_reset_event&) = delete;
async_manual_reset_event(async_manual_reset_event&&) = delete;
async_manual_reset_event& operator=(const async_manual_reset_event&) = delete;
async_manual_reset_event& operator=(async_manual_reset_event&&) = delete;

bool is_set() const noexcept
{
return m_state.load(std::memory_order_acquire) == this;
}

struct awaiter;
awaiter operator co_await() const noexcept;

void set() noexcept;
void reset() noexcept
{
void* oldValue = this;
m_state.compare_exchange_strong(oldValue, nullptr, std::memory_order_acquire);
}
private:
friend struct awaiter;

// - 'this' => set state
// - otherwise => not set, head of linked list of awaiter*.
mutable std::atomic<void*> m_state;
};

struct async_manual_reset_event::awaiter
{
awaiter(const async_manual_reset_event& event) noexcept
: m_event(event)
{}

bool await_ready() const noexcept
{
return m_event.is_set();
}
bool await_suspend(coroutine_handle<> awaitingCoroutine) noexcept;
void await_resume() noexcept {}
private:
friend class async_manual_reset_event;

const async_manual_reset_event& m_event;
coroutine_handle<> m_awaitingCoroutine;
awaiter* m_next;
};

inline async_manual_reset_event::awaiter async_manual_reset_event::operator co_await() const noexcept
{
return awaiter{ *this };
}
}

+ 156
- 0
librf/src/event_v2.inl View File

@@ -0,0 +1,156 @@
#pragma once
RESUMEF_NS
{
namespace detail
{
struct state_event_t;
//仿照cppcoro的event是行不通的。
//虽然cppcoro的event的触发和等待之间是线程安全的,但是并不能实现只触发指定数量。并且多线程触发之间是不安全的。
//所以,还得用锁结构来实现(等待实现,今日不空)。
struct event_v2_impl : public std::enable_shared_from_this<event_v2_impl>
{
event_v2_impl(bool initially) noexcept
: _counter(initially ? 1 : 0)
{}
bool is_signaled() const noexcept
{
return _counter.load(std::memory_order_acquire) > 0;
}
void reset() noexcept
{
_counter.store(0, std::memory_order_release);
}
void signal_all() noexcept;
void signal() noexcept;
public:
static constexpr bool USE_SPINLOCK = true;
using lock_type = std::conditional_t<USE_SPINLOCK, spinlock, std::deque<std::recursive_mutex>>;
using wait_queue_type = intrusive_link_queue<state_event_t>;
friend struct state_event_t;
bool try_wait_one() noexcept
{
if (_counter.fetch_add(-1, std::memory_order_acq_rel) > 0)
return true;
_counter.fetch_add(1);
return false;
}
void add_wait_list(state_event_t* state) noexcept
{
assert(state != nullptr);
_wait_awakes.push_back(state);
}
lock_type _lock; //保证访问本对象是线程安全的
private:
std::atomic<intptr_t> _counter;
wait_queue_type _wait_awakes; //等待队列
// No copying/moving
event_v2_impl(const event_v2_impl&) = delete;
event_v2_impl(event_v2_impl&&) = delete;
event_v2_impl& operator=(const event_v2_impl&) = delete;
event_v2_impl& operator=(event_v2_impl&&) = delete;
};
struct state_event_t : public state_base_t
{
state_event_t(bool& val)
: _value(&val)
{}
virtual void resume() override;
virtual bool has_handler() const noexcept override;
void on_notify();
void on_cancel() noexcept;
//将自己加入到通知链表里
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void on_await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
_PromiseT& promise = handler.promise();
auto* parent_state = promise.get_state();
scheduler_t* sch = parent_state->get_scheduler();
this->_scheduler = sch;
this->_coro = handler;
}
public:
//为浸入式单向链表提供的next指针
state_event_t* _next = nullptr;
bool* _value;
};
}
namespace event_v2
{
struct [[nodiscard]] event_t::awaiter
{
awaiter(event_impl_ptr evt) noexcept
: _event(std::move(evt))
{
}
bool await_ready() noexcept
{
return (_value = _event->try_wait_one());
}
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
scoped_lock<detail::event_v2_impl::lock_type> lock_(_event->_lock);
if (_event->try_wait_one())
return false;
_state = new detail::state_event_t(_value);
_state->on_await_suspend(handler);
_event->add_wait_list(_state.get());
return true;
}
bool await_resume() noexcept
{
return _value;
}
private:
std::shared_ptr<detail::event_v2_impl> _event;
counted_ptr<detail::state_event_t> _state;
bool _value = false;
};
inline void event_t::signal_all() const noexcept
{
_event->signal_all();
}
inline void event_t::signal() const noexcept
{
_event->signal();
}
inline void event_t::reset() const noexcept
{
_event->reset();
}
inline event_t::awaiter event_t::operator co_await() const noexcept
{
return { _event };
}
inline event_t::awaiter event_t::wait() const noexcept
{
return { _event };
}
}
}

+ 6
- 6
tutorial/test_async_event.cpp View File

@@ -10,7 +10,7 @@
using namespace resumef;
//非协程的逻辑线程,或异步代码,可以通过event_t通知到协程,并且不会阻塞协程所在的线程。
std::thread async_set_event(const event_t & e, std::chrono::milliseconds dt)
static std::thread async_set_event(const event_t & e, std::chrono::milliseconds dt)
{
return std::thread([=]
{
@@ -20,7 +20,7 @@ std::thread async_set_event(const event_t & e, std::chrono::milliseconds dt)
}
future_t<> resumable_wait_event(const event_t & e)
static future_t<> resumable_wait_event(const event_t & e)
{
using namespace std::chrono;
@@ -30,7 +30,7 @@ future_t<> resumable_wait_event(const event_t & e)
std::cout << "event signal!" << std::endl;
}
void test_wait_one()
static void test_wait_one()
{
using namespace std::chrono;
@@ -63,7 +63,7 @@ void test_wait_one()
}
}
void test_wait_any()
static void test_wait_any()
{
using namespace std::chrono;
@@ -92,7 +92,7 @@ void test_wait_any()
tt.join();
}
void test_wait_all()
static void test_wait_all()
{
using namespace std::chrono;
@@ -120,7 +120,7 @@ void test_wait_all()
tt.join();
}
void test_wait_all_timeout()
static void test_wait_all_timeout()
{
using namespace std::chrono;

+ 11
- 9
tutorial/test_async_event_v2.cpp View File

@@ -10,32 +10,34 @@
using namespace resumef;

//非协程的逻辑线程,或异步代码,可以通过event_t通知到协程,并且不会阻塞协程所在的线程。
std::thread async_set_event_all(const event_v2::event_t & e, std::chrono::milliseconds dt)
static std::thread async_set_event_all(const event_v2::event_t & e, std::chrono::milliseconds dt)
{
return std::thread([=]
{
std::this_thread::sleep_for(dt);
e.notify_all();
e.signal_all();
});
}

std::thread async_set_event_one(const event_v2::event_t& e, std::chrono::milliseconds dt)
static std::thread async_set_event_one(event_v2::event_t e, std::chrono::milliseconds dt)
{
return std::thread([=]
{
std::this_thread::sleep_for(dt);
e.notify_one();
e.signal();
});
}


future_t<> resumable_wait_event(const event_v2::event_t & e, int idx)
static future_t<> resumable_wait_event(event_v2::event_t e, int idx)
{
co_await e;
std::cout << "[" << idx << "]event signal!" << std::endl;
if (co_await e)
std::cout << "[" << idx << "]event signal!" << std::endl;
else
std::cout << "time out!" << std::endl;
}

void test_notify_all()
static void test_notify_all()
{
using namespace std::chrono;

@@ -53,7 +55,7 @@ void test_notify_all()
}

//目前还没法测试在多线程调度下,是否线程安全
void test_notify_one()
static void test_notify_one()
{
using namespace std::chrono;


+ 2
- 0
tutorial/test_async_resumable.cpp View File

@@ -54,6 +54,8 @@ void resumable_switch(intptr_t coro, size_t idx)
void resumable_main_resumable()
{
resumable_switch(1, 99);
resumable_switch(1, 0);
resumable_switch(10, 0);
resumable_switch(100, 0);

+ 2
- 8
vs_proj/librf.cpp View File

@@ -43,14 +43,8 @@ int main(int argc, const char* argv[])
//test_ring_queue<resumef::ring_queue_spinlock<int, false, uint32_t>>();
//test_ring_queue<resumef::ring_queue_lockfree<int, uint64_t>>();
resumable_main_resumable();
/*
resumable_main_channel();
resumable_main_channel_mult_thread();
resumable_main_switch_scheduler();
benchmark_main_channel_passing_next();
resumable_main_event_v2();
return 0;
*/
//if (argc > 1)
// resumable_main_benchmark_asio_client(atoi(argv[1]));
@@ -71,7 +65,7 @@ int main(int argc, const char* argv[])
resumable_main_benchmark_mem(false);
resumable_main_mutex();
resumable_main_event();
//resumable_main_event_v2();
resumable_main_event_v2();
resumable_main_event_timeout();
resumable_main_channel();
resumable_main_channel_mult_thread();

+ 3
- 2
vs_proj/librf.vcxproj View File

@@ -40,13 +40,13 @@
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<PlatformToolset>ClangCL</PlatformToolset>
<PlatformToolset>v142</PlatformToolset>
<UseDebugLibraries>true</UseDebugLibraries>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>ClangCL</PlatformToolset>
<PlatformToolset>v142</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>NotSet</CharacterSet>
</PropertyGroup>
@@ -256,6 +256,7 @@
<None Include="..\librf\src\asio_task_1.10.0.inl" />
<None Include="..\librf\src\asio_task_1.12.0.inl" />
<None Include="..\librf\src\channel_v2.inl" />
<None Include="..\librf\src\event_v2.inl" />
<None Include="..\librf\src\exception.inl" />
<None Include="..\librf\src\macro_def.inl" />
<None Include="..\librf\src\promise.inl" />

+ 3
- 0
vs_proj/librf.vcxproj.filters View File

@@ -252,5 +252,8 @@
<None Include="..\librf\src\channel_v2.inl">
<Filter>librf\src</Filter>
</None>
<None Include="..\librf\src\event_v2.inl">
<Filter>librf\src</Filter>
</None>
</ItemGroup>
</Project>

Loading…
Cancel
Save