Browse Source

优化channel实现,使用循环队列完成channel数据缓存

tags/v2.9.7
tearshark 4 years ago
parent
commit
ac11ce9f6f

+ 3
- 0
librf/librf.h View File

@@ -16,6 +16,7 @@
#pragma once
#include <type_traits>
#include <atomic>
#include <chrono>
#include <array>
@@ -57,6 +58,8 @@
#include "src/_awaker.h"
#include "src/event.h"
#include "src/mutex.h"
#include "src/ring_queue.h"
#include "src/ring_queue_spinlock.h"
#include "src/channel.h"
#include "src/generator.h"

+ 2
- 2
librf/src/_awaker.h View File

@@ -16,8 +16,8 @@ RESUMEF_NS
// 返回false表示此事件已经无效,event内部只删除此awaker
typedef std::function<bool(_Ety * e, _Types...)> callee_type;
private:
//typedef spinlock lock_type;
typedef std::recursive_mutex lock_type;
typedef spinlock lock_type;
//typedef std::recursive_mutex lock_type;
lock_type _lock;
callee_type _callee;

+ 2
- 226
librf/src/channel.h View File

@@ -1,228 +1,4 @@
#pragma once
RESUMEF_NS
{
namespace detail
{
template<class _Ty>
struct channel_impl : public std::enable_shared_from_this<channel_impl<_Ty>>
{
typedef _awaker<channel_impl<_Ty>, _Ty *, error_code> channel_read_awaker;
typedef std::shared_ptr<channel_read_awaker> channel_read_awaker_ptr;
typedef _awaker<channel_impl<_Ty>> channel_write_awaker;
typedef std::shared_ptr<channel_write_awaker> channel_write_awaker_ptr;
typedef std::pair<channel_write_awaker_ptr, _Ty> write_tuple_type;
private:
//typedef spinlock lock_type;
typedef std::recursive_mutex lock_type;
lock_type _lock; //保证访问本对象是线程安全的
const size_t _max_counter; //数据队列的容量上限
std::deque<_Ty> _values; //数据队列
std::list<channel_read_awaker_ptr> _read_awakes; //读队列
std::list<write_tuple_type> _write_awakes; //写队列
public:
channel_impl(size_t max_counter_)
:_max_counter(max_counter_)
{
}
#if _DEBUG
const std::deque<_Ty> & debug_queue() const
{
return _values;
}
#endif
template<class callee_t, class = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, channel_read_awaker_ptr>::value>>
decltype(auto) read(callee_t && awaker)
{
return read_(std::make_shared<channel_read_awaker>(std::forward<callee_t>(awaker)));
}
template<class callee_t, class _Ty2, class = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, channel_write_awaker_ptr>::value>>
decltype(auto) write(callee_t && awaker, _Ty2&& val)
{
return write_(std::make_shared<channel_write_awaker>(std::forward<callee_t>(awaker)), std::forward<_Ty2>(val));
}
//如果已经触发了awaker,则返回true
//设计目标是线程安全的,实际情况待考察
bool read_(channel_read_awaker_ptr && r_awaker)
{
assert(r_awaker);
scoped_lock<lock_type> lock_(this->_lock);
bool ret_value;
if (_values.size() > 0)
{
//如果数据队列有数据,则可以直接读数据
auto val = std::move(_values.front());
_values.pop_front();
r_awaker->awake(this, 1, &val, error_code::none);
ret_value = true;
}
else
{
//否则,将“读等待”放入“读队列”
_read_awakes.push_back(r_awaker);
ret_value = false;
}
//如果已有写队列,则唤醒一个“写等待”
awake_one_writer_();
return ret_value;
}
//设计目标是线程安全的,实际情况待考察
template<class _Ty2>
void write_(channel_write_awaker_ptr && w_awaker, _Ty2&& val)
{
assert(w_awaker);
scoped_lock<lock_type> lock_(this->_lock);
//如果满了,则不添加到数据队列,而是将“写等待”和值,放入“写队列”
bool is_full = _values.size() >= _max_counter;
if (is_full)
_write_awakes.push_back(std::make_pair(std::forward<channel_write_awaker_ptr>(w_awaker), std::forward<_Ty2>(val)));
else
_values.push_back(std::forward<_Ty2>(val));
//如果已有读队列,则唤醒一个“读等待”
awake_one_reader_();
//触发 没有放入“写队列”的“写等待”
if(!is_full) w_awaker->awake(this, 1);
}
private:
//只能被write_函数调用,内部不再需要加锁
void awake_one_reader_()
{
//assert(!(_read_awakes.size() >= 0 && _values.size() == 0));
for (auto iter = _read_awakes.begin(); iter != _read_awakes.end(); )
{
auto r_awaker = *iter;
iter = _read_awakes.erase(iter);
if (r_awaker->awake(this, 1, _values.size() ? &_values.front() : nullptr, error_code::read_before_write))
{
if(_values.size()) _values.pop_front();
//唤醒一个“读等待”后,尝试唤醒一个“写等待”,以处理“数据队列”满后的“写等待”
awake_one_writer_();
break;
}
}
}
//只能被read_函数调用,内部不再需要加锁
void awake_one_writer_()
{
for (auto iter = _write_awakes.begin(); iter != _write_awakes.end(); )
{
auto w_awaker = std::move(*iter);
iter = _write_awakes.erase(iter);
if (w_awaker.first->awake(this, 1))
{
//一个“写等待”唤醒后,将“写等待”绑定的值,放入“数据队列”
_values.push_back(std::move(w_awaker.second));
break;
}
}
}
channel_impl(const channel_impl &) = delete;
channel_impl(channel_impl &&) = delete;
channel_impl & operator = (const channel_impl &) = delete;
channel_impl & operator = (channel_impl &&) = delete;
};
}
template<class _Ty>
struct channel_t
{
typedef detail::channel_impl<_Ty> channel_impl_type;
typedef typename channel_impl_type::channel_read_awaker channel_read_awaker;
typedef typename channel_impl_type::channel_write_awaker channel_write_awaker;
typedef std::shared_ptr<channel_impl_type> channel_impl_ptr;
typedef std::weak_ptr<channel_impl_type> channel_impl_wptr;
typedef std::chrono::system_clock clock_type;
private:
channel_impl_ptr _chan;
public:
channel_t(size_t max_counter = 0)
:_chan(std::make_shared<channel_impl_type>(max_counter))
{
}
template<class _Ty2>
future_t<bool> write(_Ty2&& val) const
{
awaitable_t<bool> awaitable;
auto awaker = std::make_shared<channel_write_awaker>(
[st = awaitable._state](channel_impl_type * chan) -> bool
{
st->set_value(chan ? true : false);
return true;
});
_chan->write_(std::move(awaker), std::forward<_Ty2>(val));
return awaitable.get_future();
}
future_t<_Ty> read() const
{
awaitable_t<_Ty> awaitable;
auto awaker = std::make_shared<channel_read_awaker>(
[st = awaitable._state](channel_impl_type *, _Ty * val, error_code fe) -> bool
{
if(val)
st->set_value(std::move(*val));
else
st->throw_exception(channel_exception{ fe });
return true;
});
_chan->read_(std::move(awaker));
return awaitable.get_future();
}
template<class _Ty2>
future_t<bool> operator << (_Ty2&& val) const
{
return std::move(write(std::forward<_Ty2>(val)));
}
future_t<_Ty> operator co_await () const
{
return read();
}
#if _DEBUG
//非线程安全,返回的队列也不是线程安全的
const auto & debug_queue() const
{
return _chan->debug_queue();
}
#endif
channel_t(const channel_t &) = default;
channel_t(channel_t &&) = default;
channel_t & operator = (const channel_t &) = default;
channel_t & operator = (channel_t &&) = default;
};
using semaphore_t = channel_t<bool>;
}
#include "channel_v1.h"
#include "channel_v2.h"

+ 243
- 0
librf/src/channel_v1.h View File

@@ -0,0 +1,243 @@
#pragma once

RESUMEF_NS
{
namespace detail
{
template<class _Ty>
struct channel_impl : public std::enable_shared_from_this<channel_impl<_Ty>>
{
typedef _awaker<channel_impl<_Ty>, _Ty*, error_code> channel_read_awaker;
typedef std::shared_ptr<channel_read_awaker> channel_read_awaker_ptr;

typedef _awaker<channel_impl<_Ty>> channel_write_awaker;
typedef std::shared_ptr<channel_write_awaker> channel_write_awaker_ptr;
typedef std::pair<channel_write_awaker_ptr, _Ty> write_tuple_type;
private:
//typedef spinlock lock_type;
typedef std::recursive_mutex lock_type;

lock_type _lock; //保证访问本对象是线程安全的
const size_t _max_counter; //数据队列的容量上限
std::deque<_Ty> _values; //数据队列
std::list<channel_read_awaker_ptr> _read_awakes; //读队列
std::list<write_tuple_type> _write_awakes; //写队列
public:
channel_impl(size_t max_counter_)
:_max_counter(max_counter_)
{
}

#if _DEBUG
const std::deque<_Ty>& debug_queue() const
{
return _values;
}
#endif

template<class callee_t, class = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, channel_read_awaker_ptr>::value>>
decltype(auto) read(callee_t&& awaker)
{
return read_(std::make_shared<channel_read_awaker>(std::forward<callee_t>(awaker)));
}
template<class callee_t, class _Ty2, class = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, channel_write_awaker_ptr>::value>>
decltype(auto) write(callee_t&& awaker, _Ty2&& val)
{
return write_(std::make_shared<channel_write_awaker>(std::forward<callee_t>(awaker)), std::forward<_Ty2>(val));
}

//如果已经触发了awaker,则返回true
//设计目标是线程安全的,实际情况待考察
bool read_(channel_read_awaker_ptr&& r_awaker)
{
assert(r_awaker);

scoped_lock<lock_type> lock_(this->_lock);

bool ret_value;
if (_values.size() > 0)
{
//如果数据队列有数据,则可以直接读数据
auto val = std::move(_values.front());
_values.pop_front();

r_awaker->awake(this, 1, &val, error_code::none);
ret_value = true;
}
else
{
//否则,将“读等待”放入“读队列”
_read_awakes.push_back(r_awaker);
ret_value = false;
}

//如果已有写队列,则唤醒一个“写等待”
awake_one_writer_();

return ret_value;
}

//设计目标是线程安全的,实际情况待考察
template<class _Ty2>
void write_(channel_write_awaker_ptr&& w_awaker, _Ty2&& val)
{
assert(w_awaker);
scoped_lock<lock_type> lock_(this->_lock);

//如果满了,则不添加到数据队列,而是将“写等待”和值,放入“写队列”
bool is_full = _values.size() >= _max_counter;
if (is_full)
_write_awakes.push_back(std::make_pair(std::forward<channel_write_awaker_ptr>(w_awaker), std::forward<_Ty2>(val)));
else
_values.push_back(std::forward<_Ty2>(val));

//如果已有读队列,则唤醒一个“读等待”
awake_one_reader_();

//触发 没有放入“写队列”的“写等待”
if (!is_full) w_awaker->awake(this, 1);
}

private:
//只能被write_函数调用,内部不再需要加锁
void awake_one_reader_()
{
//assert(!(_read_awakes.size() >= 0 && _values.size() == 0));

for (auto iter = _read_awakes.begin(); iter != _read_awakes.end(); )
{
auto r_awaker = *iter;
iter = _read_awakes.erase(iter);

if (r_awaker->awake(this, 1, _values.size() ? &_values.front() : nullptr, error_code::read_before_write))
{
if (_values.size()) _values.pop_front();

//唤醒一个“读等待”后,尝试唤醒一个“写等待”,以处理“数据队列”满后的“写等待”
awake_one_writer_();
break;
}
}
}

//只能被read_函数调用,内部不再需要加锁
void awake_one_writer_()
{
for (auto iter = _write_awakes.begin(); iter != _write_awakes.end(); )
{
auto w_awaker = std::move(*iter);
iter = _write_awakes.erase(iter);

if (w_awaker.first->awake(this, 1))
{
//一个“写等待”唤醒后,将“写等待”绑定的值,放入“数据队列”
_values.push_back(std::move(w_awaker.second));
break;
}
}
}

size_t capacity() const noexcept
{
return _max_counter;
}

channel_impl(const channel_impl&) = delete;
channel_impl(channel_impl&&) = delete;
channel_impl& operator = (const channel_impl&) = delete;
channel_impl& operator = (channel_impl&&) = delete;
};
} //namespace detail

namespace channel_v1
{

template<class _Ty>
struct channel_t
{
typedef detail::channel_impl<_Ty> channel_impl_type;
typedef typename channel_impl_type::channel_read_awaker channel_read_awaker;
typedef typename channel_impl_type::channel_write_awaker channel_write_awaker;

typedef std::shared_ptr<channel_impl_type> channel_impl_ptr;
typedef std::weak_ptr<channel_impl_type> channel_impl_wptr;
typedef std::chrono::system_clock clock_type;
private:
channel_impl_ptr _chan;
public:
channel_t(size_t max_counter = 0)
:_chan(std::make_shared<channel_impl_type>(max_counter))
{

}

template<class _Ty2>
future_t<bool> write(_Ty2&& val) const
{
awaitable_t<bool> awaitable;

auto awaker = std::make_shared<channel_write_awaker>(
[st = awaitable._state](channel_impl_type* chan) -> bool
{
st->set_value(chan ? true : false);
return true;
});
_chan->write_(std::move(awaker), std::forward<_Ty2>(val));

return awaitable.get_future();
}

future_t<_Ty> read() const
{
awaitable_t<_Ty> awaitable;

auto awaker = std::make_shared<channel_read_awaker>(
[st = awaitable._state](channel_impl_type*, _Ty* val, error_code fe) -> bool
{
if (val)
st->set_value(std::move(*val));
else
st->throw_exception(channel_exception{ fe });

return true;
});
_chan->read_(std::move(awaker));

return awaitable.get_future();
}

template<class _Ty2>
future_t<bool> operator << (_Ty2&& val) const
{
return std::move(write(std::forward<_Ty2>(val)));
}

future_t<_Ty> operator co_await () const
{
return read();
}

#if _DEBUG
//非线程安全,返回的队列也不是线程安全的
const auto& debug_queue() const
{
return _chan->debug_queue();
}
#endif

size_t capacity() const noexcept
{
return _chan->capacity();
}

channel_t(const channel_t&) = default;
channel_t(channel_t&&) = default;
channel_t& operator = (const channel_t&) = default;
channel_t& operator = (channel_t&&) = default;
};


using semaphore_t = channel_t<bool>;

} //namespace v1
} //RESUMEF_NS

+ 487
- 0
librf/src/channel_v2.h View File

@@ -0,0 +1,487 @@
#pragma once

RESUMEF_NS
{
namespace detail
{
template<class _Ty>
struct channel_impl_v2 : public std::enable_shared_from_this<channel_impl_v2<_Ty>>
{
using value_type = _Ty;
struct state_read_t;
struct state_write_t;

channel_impl_v2(size_t max_counter_);

bool try_read(value_type& val);
bool add_read_list(state_read_t* state);

bool try_write(value_type& val);
bool add_write_list(state_write_t* state);

size_t capacity() const noexcept
{
return _max_counter;
}

struct state_channel_t : public state_base_t
{
state_channel_t(channel_impl_v2* ch, value_type& val) noexcept
: m_channel(ch->shared_from_this())
, _value(&val)
{
}

virtual void resume() override
{
coroutine_handle<> handler = _coro;
if (handler)
{
_coro = nullptr;
_scheduler->del_final(this);
handler.resume();
}
}

virtual bool has_handler() const noexcept override
{
return (bool)_coro;
}

void on_notify()
{
assert(this->_scheduler != nullptr);
if (this->_coro)
this->_scheduler->add_generator(this);
}

void on_cancel() noexcept
{
this->_coro = nullptr;
}

template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
void on_await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
_PromiseT& promise = handler.promise();
auto* parent_state = promise.get_state();
scheduler_t* sch = parent_state->get_scheduler();

this->_scheduler = sch;
this->_coro = handler;
}

void on_await_resume()
{
if (_error != error_code::none)
{
std::rethrow_exception(std::make_exception_ptr(channel_exception{ _error }));
}
}
protected:
friend channel_impl_v2;

std::shared_ptr<channel_impl_v2> m_channel;
state_channel_t* m_next = nullptr;
value_type* _value;
error_code _error = error_code::none;
};

struct state_read_t : public state_channel_t
{
using state_channel_t::state_channel_t;

//将自己加入到通知链表里
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
bool on_await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
state_channel_t::on_await_suspend(handler);
if (!this->m_channel->add_read_list(this))
{
this->_coro = nullptr;
return false;
}
return true;
}
};

struct state_write_t : public state_channel_t
{
using state_channel_t::state_channel_t;

//将自己加入到通知链表里
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
bool on_await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
state_channel_t::on_await_suspend(handler);
if (!this->m_channel->add_write_list(this))
{
this->_coro = nullptr;
return false;
}
return true;
}
};
private:
void awake_one_reader_();
void awake_one_writer_();

channel_impl_v2(const channel_impl_v2&) = delete;
channel_impl_v2(channel_impl_v2&&) = delete;
channel_impl_v2& operator = (const channel_impl_v2&) = delete;
channel_impl_v2& operator = (channel_impl_v2&&) = delete;

static constexpr bool USE_SPINLOCK = true;
static constexpr bool USE_RING_QUEUE = true;

using lock_type = std::conditional_t<USE_SPINLOCK, spinlock, std::deque<std::recursive_mutex>>;
//using queue_type = std::conditional_t<USE_RING_QUEUE, ring_queue_spinlock<value_type, false, uint32_t>, std::deque<value_type>>;
using queue_type = std::conditional_t<USE_RING_QUEUE, ring_queue<value_type, false, uint32_t>, std::deque<value_type>>;

const size_t _max_counter; //数据队列的容量上限

lock_type _lock; //保证访问本对象是线程安全的
queue_type _values; //数据队列
std::list<state_read_t*> _read_awakes; //读队列
std::list<state_write_t*> _write_awakes; //写队列
};

template<class _Ty>
channel_impl_v2<_Ty>::channel_impl_v2(size_t max_counter_)
: _max_counter(max_counter_)
, _values(USE_RING_QUEUE ? (std::max)((size_t)1, max_counter_) : 0)
{
}

template<class _Ty>
bool channel_impl_v2<_Ty>::try_read(value_type& val)
{
if constexpr (USE_RING_QUEUE)
{
scoped_lock<lock_type> lock_(this->_lock);

if (_values.try_pop(val))
{
awake_one_writer_();
return true;
}
return false;
}
else
{
scoped_lock<lock_type> lock_(this->_lock);

if (_values.size() > 0)
{
val = std::move(_values.front());
_values.pop_front();
awake_one_writer_();

return true;
}
return false;
}
}

template<class _Ty>
bool channel_impl_v2<_Ty>::add_read_list(state_read_t* state)
{
assert(state != nullptr);

if constexpr (USE_RING_QUEUE)
{
scoped_lock<lock_type> lock_(this->_lock);

if (_values.try_pop(*state->_value))
{
awake_one_writer_();
return false;
}

_read_awakes.push_back(state);
awake_one_writer_();

return true;
}
else
{
scoped_lock<lock_type> lock_(this->_lock);

if (_values.size() > 0)
{
*state->_value = std::move(_values.front());
_values.pop_front();
awake_one_writer_();

return false;
}

_read_awakes.push_back(state);
awake_one_writer_();

return true;
}
}

template<class _Ty>
bool channel_impl_v2<_Ty>::try_write(value_type& val)
{
if constexpr (USE_RING_QUEUE)
{
scoped_lock<lock_type> lock_(this->_lock);

if (_values.try_push(std::move(val)))
{
awake_one_reader_();
return true;
}
return false;
}
else
{
scoped_lock<lock_type> lock_(this->_lock);

if (_values.size() < _max_counter)
{
_values.push_back(std::move(val));
awake_one_reader_();

return true;
}
return false;
}
}

template<class _Ty>
bool channel_impl_v2<_Ty>::add_write_list(state_write_t* state)
{
assert(state != nullptr);

if constexpr (USE_RING_QUEUE)
{
scoped_lock<lock_type> lock_(this->_lock);

if (_values.try_push(std::move(*state->_value)))
{
awake_one_reader_();
return false;
}

_write_awakes.push_back(state);
awake_one_reader_();

return true;
}
else
{
scoped_lock<lock_type> lock_(this->_lock);

if (_values.size() < _max_counter)
{
_values.push_back(std::move(*state->_value));
awake_one_reader_();

return false;
}

_write_awakes.push_back(state);
awake_one_reader_();

return true;
}
}

template<class _Ty>
void channel_impl_v2<_Ty>::awake_one_reader_()
{
for (auto iter = _read_awakes.begin(); iter != _read_awakes.end(); )
{
state_read_t* state = *iter;
iter = _read_awakes.erase(iter);

if constexpr (USE_RING_QUEUE)
{
if (!_values.try_pop(*state->_value))
state->_error = error_code::read_before_write;
}
else
{
if (_values.size() > 0)
{
*state->_value = std::move(_values.front());
_values.pop_front();
}
else
{
state->_error = error_code::read_before_write;
}
}
state->on_notify();

break;
}
}

template<class _Ty>
void channel_impl_v2<_Ty>::awake_one_writer_()
{
for (auto iter = _write_awakes.begin(); iter != _write_awakes.end(); )
{
state_write_t* state = std::move(*iter);
iter = _write_awakes.erase(iter);

if constexpr (USE_RING_QUEUE)
{
bool ret = _values.try_push(std::move(*state->_value));
(void)ret;
assert(ret);
}
else
{
assert(_values.size() < _max_counter);
_values.push_back(*state->_value);
}
state->on_notify();

break;
}
}
} //namespace detail

inline namespace channel_v2
{
template<class _Ty>
struct channel_t
{
using value_type = _Ty;
using channel_type = detail::channel_impl_v2<value_type>;

channel_t(size_t max_counter = 0)
:_chan(std::make_shared<channel_type>(max_counter))
{

}

size_t capacity() const noexcept
{
return _chan->capacity();
}

struct read_awaiter
{
using state_type = typename channel_type::state_read_t;

read_awaiter(channel_type* ch) noexcept
: _channel(ch)
{}

~read_awaiter()
{//为了不在协程中也能正常使用
if (_channel != nullptr)
_channel->try_read(_value);
}

bool await_ready()
{
if (_channel->try_read(static_cast<value_type&>(_value)))
{
_channel = nullptr;
return true;
}
return false;
}
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
_state = new state_type(_channel, static_cast<value_type&>(_value));
_channel = nullptr;
return _state->on_await_suspend(handler);
}
value_type await_resume()
{
if (_state.get() != nullptr)
_state->on_await_resume();
return std::move(_value);
}
private:
channel_type* _channel;
counted_ptr<state_type> _state;
mutable value_type _value;
};

read_awaiter operator co_await() const noexcept
{
return { _chan.get() };
}
read_awaiter read() const noexcept
{
return { _chan.get() };
}

struct write_awaiter
{
using state_type = typename channel_type::state_write_t;

write_awaiter(channel_type* ch, value_type val) noexcept(std::is_move_constructible_v<value_type>)
: _channel(ch)
, _value(std::move(val))
{}

~write_awaiter()
{//为了不在协程中也能正常使用
if (_channel != nullptr)
_channel->try_write(static_cast<value_type&>(_value));
}

bool await_ready()
{
if (_channel->try_write(static_cast<value_type&>(_value)))
{
_channel = nullptr;
return true;
}
return false;
}
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler)
{
_state = new state_type(_channel, static_cast<value_type&>(_value));
_channel = nullptr;
return _state->on_await_suspend(handler);
}
void await_resume()
{
if (_state.get() != nullptr)
_state->on_await_resume();
}
private:
channel_type* _channel;
counted_ptr<state_type> _state;
mutable value_type _value;
};

template<class U>
write_awaiter write(U&& val) const noexcept(std::is_move_constructible_v<U>)
{
return write_awaiter{ _chan.get(), std::move(val) };
}

template<class U>
write_awaiter operator << (U&& val) const noexcept(std::is_move_constructible_v<U>)
{
return write_awaiter{ _chan.get(), std::move(val) };
}

channel_t(const channel_t&) = default;
channel_t(channel_t&&) = default;
channel_t& operator = (const channel_t&) = default;
channel_t& operator = (channel_t&&) = default;
private:
std::shared_ptr<channel_type> _chan;
};


using semaphore_t = channel_t<bool>;

} //namespace v2
} //RESUMEF_NS

+ 1
- 1
librf/src/event_v1.cpp View File

@@ -59,7 +59,7 @@ RESUMEF_NS

}

inline namespace v1
inline namespace event_v1
{
event_t::event_t(intptr_t initial_counter_)
: _event(std::make_shared<detail::event_impl>(initial_counter_))

+ 1
- 1
librf/src/event_v1.h View File

@@ -40,7 +40,7 @@ RESUMEF_NS
};
}

inline namespace v1
inline namespace event_v1
{

//提供一种在协程和非协程之间同步的手段。

+ 1
- 13
librf/src/event_v2.cpp View File

@@ -77,18 +77,6 @@ RESUMEF_NS
return true;
}

void state_event_t::destroy_deallocate()
{
size_t _Size = sizeof(state_event_t);
#if RESUMEF_DEBUG_COUNTER
std::cout << "destroy_deallocate, size=" << _Size << std::endl;
#endif
this->~state_event_t();

_Alloc_char _Al;
return _Al.deallocate(reinterpret_cast<char*>(this), _Size);
}

void state_event_t::resume()
{
coroutine_handle<> handler = _coro;
@@ -106,7 +94,7 @@ RESUMEF_NS
}
}

namespace v2
namespace event_v2
{
event_t::event_t(bool initially)
:_event(std::make_shared<detail::event_v2_impl>(initially))

+ 13
- 19
librf/src/event_v2.h View File

@@ -40,6 +40,12 @@ RESUMEF_NS

struct state_event_t : public state_base_t
{
state_event_t(event_v2_impl* e) noexcept
{
if (e != nullptr)
m_event = e->shared_from_this();
}

virtual void resume() override;
virtual bool has_handler() const noexcept override;

@@ -50,6 +56,11 @@ RESUMEF_NS
this->_scheduler->add_generator(this);
}

void cancel_timeout()
{
this->_coro = nullptr;
}

//将自己加入到通知链表里
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
bool event_await_suspend(coroutine_handle<_PromiseT> handler) noexcept
@@ -64,32 +75,15 @@ RESUMEF_NS
return m_event->add_notify_list(this);
}

static state_event_t* _Alloc_state(event_v2_impl * e)
{
_Alloc_char _Al;
size_t _Size = sizeof(state_event_t);
#if RESUMEF_DEBUG_COUNTER
std::cout << "state_event_t::alloc, size=" << sizeof(state_event_t) << std::endl;
#endif
char* _Ptr = _Al.allocate(_Size);
return new(_Ptr) state_event_t(e);
}
private:
friend struct event_v2_impl;

state_event_t(event_v2_impl * e) noexcept
{
if (e != nullptr)
m_event = e->shared_from_this();
}
std::shared_ptr<event_v2_impl> m_event;
state_event_t* m_next = nullptr;

virtual void destroy_deallocate() override;
};
}

namespace v2
namespace event_v2
{
struct event_t
{
@@ -125,7 +119,7 @@ RESUMEF_NS
template<class _PromiseT, typename = std::enable_if_t<is_promise_v<_PromiseT>>>
bool await_suspend(coroutine_handle<_PromiseT> handler) noexcept
{
_state = detail::state_event_t::_Alloc_state(_event);
_state = new detail::state_event_t(_event);
return _state->event_await_suspend(handler);
}
void await_resume() noexcept

+ 1
- 0
librf/src/promise.inl View File

@@ -63,6 +63,7 @@ RESUMEF_NS
{
_Whatever._state->set_scheduler(get_state()->get_scheduler());
}

return std::forward<_Uty>(_Whatever);
}


+ 154
- 0
librf/src/ring_queue.h View File

@@ -0,0 +1,154 @@
#pragma once

#include <memory>
#include <atomic>
#include <cassert>
#include <optional>
#include "src/spinlock.h"

//使用自旋锁完成的线程安全的环形队列。
//支持多个线程同时push和pop。
//_Option : 如果队列保存的数据不支持拷贝只支持移动,则需要设置为true;或者数据希望pop后销毁,都需要设置为true。
//_Sty : 内存保持数量和索引的整数类型。用于外部控制队列的结构体大小。
template<class _Ty, bool _Option = false, class _Sty = uint32_t>
struct ring_queue
{
using value_type = _Ty;
using size_type = _Sty;

static constexpr bool use_option = _Option;
using optional_type = std::conditional_t<use_option, std::optional<value_type>, value_type>;
public:
ring_queue(size_t sz);

ring_queue(const ring_queue&) = delete;
ring_queue(ring_queue&&) = default;
ring_queue& operator =(const ring_queue&) = delete;
ring_queue& operator =(ring_queue&&) = default;

auto size() const noexcept->size_type;
auto capacity() const noexcept->size_type;
bool empty() const noexcept;
bool full() const noexcept;
template<class U>
bool try_push(U&& value) noexcept(std::is_nothrow_move_assignable_v<U>);
bool try_pop(value_type& value) noexcept(std::is_nothrow_move_assignable_v<value_type>);
private:
using container_type = std::conditional_t<std::is_same_v<value_type, bool>, std::unique_ptr<optional_type[]>, std::vector<optional_type>>;
container_type m_bufferPtr;
size_type m_bufferSize;

size_type m_writeIndex;
size_type m_readIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
std::atomic<size_type> m_count;
#endif

auto nextIndex(size_type a_count) const noexcept->size_type;
};

template<class _Ty, bool _Option, class _Sty>
ring_queue<_Ty, _Option, _Sty>::ring_queue(size_t sz)
: m_bufferSize(static_cast<size_type>(sz + 1))
, m_writeIndex(0)
, m_readIndex(0)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
, m_count(0)
#endif
{
if constexpr (std::is_same_v<value_type, bool>)
m_bufferPtr = container_type{ new optional_type[sz + 1] };
else
m_bufferPtr.resize(sz + 1);

assert(sz < (std::numeric_limits<size_type>::max)());
}

template<class _Ty, bool _Option, class _Sty>
auto ring_queue<_Ty, _Option, _Sty>::nextIndex(size_type a_count) const noexcept->size_type
{
return static_cast<size_type>((a_count + 1) % m_bufferSize);
}

template<class _Ty, bool _Option, class _Sty>
auto ring_queue<_Ty, _Option, _Sty>::size() const noexcept->size_type
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire);
#else
if (m_writeIndex >= m_readIndex)
return (m_writeIndex - m_readIndex);
else
return (m_bufferSize + m_writeIndex - m_readIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
auto ring_queue<_Ty, _Option, _Sty>::capacity() const noexcept->size_type
{
return m_bufferSize - 1;
}

template<class _Ty, bool _Option, class _Sty>
bool ring_queue<_Ty, _Option, _Sty>::empty() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire) == 0;
#else
return m_writeIndex == m_readIndex;
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
bool ring_queue<_Ty, _Option, _Sty>::full() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return (m_count.load(std::memory_order_acquire) == (m_bufferSize - 1));
#else
return nextIndex(m_writeIndex) == m_readIndex;
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
template<class U>
bool ring_queue<_Ty, _Option, _Sty>::try_push(U&& value) noexcept(std::is_nothrow_move_assignable_v<U>)
{
auto nextWriteIndex = nextIndex(m_writeIndex);
if (nextWriteIndex == m_readIndex)
return false;

assert(m_writeIndex < m_bufferSize);

m_bufferPtr[m_writeIndex] = std::move(value);
m_writeIndex = nextWriteIndex;

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_add(1, std::memory_order_acq_rel);
#endif
return true;
}

template<class _Ty, bool _Option, class _Sty>
bool ring_queue<_Ty, _Option, _Sty>::try_pop(value_type& value) noexcept(std::is_nothrow_move_assignable_v<value_type>)
{
if (m_readIndex == m_writeIndex)
return false;

optional_type& ov = m_bufferPtr[m_readIndex];
if constexpr (use_option)
{
value = std::move(ov.value());
ov = std::nullopt;
}
else
{
value = std::move(ov);
}

m_readIndex = nextIndex(m_readIndex);

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_sub(1, std::memory_order_acq_rel);
#endif
return true;
}

+ 221
- 0
librf/src/ring_queue_lockfree.h View File

@@ -0,0 +1,221 @@
#pragma once

#include <memory>
#include <atomic>
#include <cassert>
#include <thread>

//目前无法解决三个索引数值回绕导致的问题
//如果为了避免索引回绕的问题,索引采用uint64_t类型,
//则在与spinlock<T, false, uint32_t>版本的对比中速度反而慢了
//pop时无法使用move语义来获取数据。因为算法要求先获取值,且获取后有可能失败,从而重新获取其它值。
template<class _Ty, class _Sty = uint32_t>
struct ring_queue_lockfree
{
using value_type = _Ty;
using size_type = _Sty;
public:
ring_queue_lockfree(size_t sz);

ring_queue_lockfree(const ring_queue_lockfree&) = delete;
ring_queue_lockfree(ring_queue_lockfree&&) = default;
ring_queue_lockfree& operator =(const ring_queue_lockfree&) = delete;
ring_queue_lockfree& operator =(ring_queue_lockfree&&) = default;

auto size() const noexcept->size_type;
auto capacity() const noexcept->size_type;
bool empty() const noexcept;
bool full() const noexcept;
template<class U>
bool try_push(U&& value) noexcept(std::is_nothrow_move_constructible_v<U>);
bool try_pop(value_type& value) noexcept(std::is_nothrow_move_constructible_v<value_type>);
private:
std::unique_ptr<value_type[]> m_bufferPtr;
size_type m_bufferSize;

std::atomic<size_type> m_writeIndex; //Where a new element will be inserted to.
std::atomic<size_type> m_readIndex; //Where the next element where be extracted from.
std::atomic<size_type> m_maximumReadIndex; //It points to the place where the latest "commited" data has been inserted.
//If it's not the same as writeIndex it means there are writes pending to be "commited" to the queue,
//that means that the place for the data was reserved (the index in the array)
//but the data is still not in the queue,
//so the thread trying to read will have to wait for those other threads to
//save the data into the queue.
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
std::atomic<size_type> m_count;
#endif

auto countToIndex(size_type a_count) const noexcept->size_type;
auto nextIndex(size_type a_count) const noexcept->size_type;
};

template<class _Ty, class _Sty>
ring_queue_lockfree<_Ty, _Sty>::ring_queue_lockfree(size_t sz)
: m_bufferPtr(new value_type[sz + 1])
, m_bufferSize(static_cast<size_type>(sz + 1))
, m_writeIndex(0)
, m_readIndex(0)
, m_maximumReadIndex(0)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
, m_count(0)
#endif
{
assert(sz < (std::numeric_limits<size_type>::max)());
}

template<class _Ty, class _Sty>
auto ring_queue_lockfree<_Ty, _Sty>::countToIndex(size_type a_count) const noexcept->size_type
{
//return (a_count % m_bufferSize);
return a_count;
}

template<class _Ty, class _Sty>
auto ring_queue_lockfree<_Ty, _Sty>::nextIndex(size_type a_count) const noexcept->size_type
{
//return static_cast<size_type>((a_count + 1));
return static_cast<size_type>((a_count + 1) % m_bufferSize);
}

template<class _Ty, class _Sty>
auto ring_queue_lockfree<_Ty, _Sty>::size() const noexcept->size_type
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load();
#else
auto currentWriteIndex = m_maximumReadIndex.load(std::memory_order_acquire);
currentWriteIndex = countToIndex(currentWriteIndex);

auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
currentReadIndex = countToIndex(currentReadIndex);

if (currentWriteIndex >= currentReadIndex)
return (currentWriteIndex - currentReadIndex);
else
return (m_bufferSize + currentWriteIndex - currentReadIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, class _Sty>
auto ring_queue_lockfree<_Ty, _Sty>::capacity() const noexcept->size_type
{
return m_bufferSize - 1;
}

template<class _Ty, class _Sty>
bool ring_queue_lockfree<_Ty, _Sty>::empty() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load() == 0;
#else
auto currentWriteIndex = m_maximumReadIndex.load(std::memory_order_acquire);
auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
return countToIndex(currentWriteIndex) == countToIndex(currentReadIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, class _Sty>
bool ring_queue_lockfree<_Ty, _Sty>::full() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return (m_count.load() == (m_bufferSize - 1));
#else
auto currentWriteIndex = m_writeIndex.load(std::memory_order_acquire);
auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
return countToIndex(nextIndex(currentWriteIndex)) == countToIndex(currentReadIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, class _Sty>
template<class U>
bool ring_queue_lockfree<_Ty, _Sty>::try_push(U&& value) noexcept(std::is_nothrow_move_constructible_v<U>)
{
auto currentWriteIndex = m_writeIndex.load(std::memory_order_acquire);

do
{
if (countToIndex(nextIndex(currentWriteIndex)) == countToIndex(m_readIndex.load(std::memory_order_acquire)))
{
// the queue is full
return false;
}

// There is more than one producer. Keep looping till this thread is able
// to allocate space for current piece of data
//
// using compare_exchange_strong because it isn't allowed to fail spuriously
// When the compare_exchange operation is in a loop the weak version
// will yield better performance on some platforms, but here we'd have to
// load m_writeIndex all over again
} while (!m_writeIndex.compare_exchange_strong(currentWriteIndex, nextIndex(currentWriteIndex), std::memory_order_acq_rel));

// Just made sure this index is reserved for this thread.
m_bufferPtr[countToIndex(currentWriteIndex)] = std::move(value);

// update the maximum read index after saving the piece of data. It can't
// fail if there is only one thread inserting in the queue. It might fail
// if there is more than 1 producer thread because this operation has to
// be done in the same order as the previous CAS
//
// using compare_exchange_weak because they are allowed to fail spuriously
// (act as if *this != expected, even if they are equal), but when the
// compare_exchange operation is in a loop the weak version will yield
// better performance on some platforms.
auto savedWriteIndex = currentWriteIndex;
while (!m_maximumReadIndex.compare_exchange_weak(currentWriteIndex, nextIndex(currentWriteIndex), std::memory_order_acq_rel))
{
currentWriteIndex = savedWriteIndex;
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield (POSIX.1b)
std::this_thread::yield();
}

// The value was successfully inserted into the queue
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_add(1);
#endif
return true;
}

template<class _Ty, class _Sty>
bool ring_queue_lockfree<_Ty, _Sty>::try_pop(value_type & value) noexcept(std::is_nothrow_move_constructible_v<value_type>)
{
auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);

for(;;)
{
auto idx = countToIndex(currentReadIndex);

// to ensure thread-safety when there is more than 1 producer
// thread a second index is defined (m_maximumReadIndex)
if (idx == countToIndex(m_maximumReadIndex.load(std::memory_order_acquire)))
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
}

// retrieve the data from the queue
value = m_bufferPtr[idx]; //但是,这里的方法不适合。如果只支持移动怎么办?

// try to perfrom now the CAS operation on the read index. If we succeed
// a_data already contains what m_readIndex pointed to before we
// increased it
if (m_readIndex.compare_exchange_strong(currentReadIndex, nextIndex(currentReadIndex), std::memory_order_acq_rel))
{
// got here. The value was retrieved from the queue. Note that the
// data inside the m_queue array is not deleted nor reseted
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_sub(1);
#endif
return true;
}

// it failed retrieving the element off the queue. Someone else must
// have read the element stored at countToIndex(currentReadIndex)
// before we could perform the CAS operation
} // keep looping to try again!
}

+ 166
- 0
librf/src/ring_queue_spinlock.h View File

@@ -0,0 +1,166 @@
#pragma once

#include <memory>
#include <atomic>
#include <cassert>
#include <optional>
#include "src/spinlock.h"

//使用自旋锁完成的线程安全的环形队列。
//支持多个线程同时push和pop。
//_Option : 如果队列保存的数据不支持拷贝只支持移动,则需要设置为true;或者数据希望pop后销毁,都需要设置为true。
//_Sty : 内存保持数量和索引的整数类型。用于外部控制队列的结构体大小。
template<class _Ty, bool _Option = false, class _Sty = uint32_t>
struct ring_queue_spinlock
{
using value_type = _Ty;
using size_type = _Sty;

static constexpr bool use_option = _Option;
using optional_type = std::conditional_t<use_option, std::optional<value_type>, value_type>;
public:
ring_queue_spinlock(size_t sz);

ring_queue_spinlock(const ring_queue_spinlock&) = delete;
ring_queue_spinlock(ring_queue_spinlock&&) = default;
ring_queue_spinlock& operator =(const ring_queue_spinlock&) = delete;
ring_queue_spinlock& operator =(ring_queue_spinlock&&) = default;

auto size() const noexcept->size_type;
auto capacity() const noexcept->size_type;
bool empty() const noexcept;
bool full() const noexcept;
template<class U>
bool try_push(U&& value) noexcept(std::is_nothrow_move_assignable_v<U>);
bool try_pop(value_type& value) noexcept(std::is_nothrow_move_assignable_v<value_type>);
private:
using container_type = std::conditional_t<std::is_same_v<value_type, bool>, std::unique_ptr<optional_type[]>, std::vector<optional_type>>;
container_type m_bufferPtr;
size_type m_bufferSize;

size_type m_writeIndex;
size_type m_readIndex;
mutable resumef::spinlock m_lock;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
std::atomic<size_type> m_count;
#endif

auto nextIndex(size_type a_count) const noexcept->size_type;
};

template<class _Ty, bool _Option, class _Sty>
ring_queue_spinlock<_Ty, _Option, _Sty>::ring_queue_spinlock(size_t sz)
: m_bufferSize(static_cast<size_type>(sz + 1))
, m_writeIndex(0)
, m_readIndex(0)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
, m_count(0)
#endif
{
if constexpr (std::is_same_v<value_type, bool>)
m_bufferPtr = container_type{ new optional_type[sz + 1] };
else
m_bufferPtr.resize(sz + 1);
assert(sz < (std::numeric_limits<size_type>::max)());
}

template<class _Ty, bool _Option, class _Sty>
auto ring_queue_spinlock<_Ty, _Option, _Sty>::nextIndex(size_type a_count) const noexcept->size_type
{
return static_cast<size_type>((a_count + 1) % m_bufferSize);
}

template<class _Ty, bool _Option, class _Sty>
auto ring_queue_spinlock<_Ty, _Option, _Sty>::size() const noexcept->size_type
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire);
#else
std::scoped_lock __guard(this->m_lock);

if (m_writeIndex >= m_readIndex)
return (m_writeIndex - m_readIndex);
else
return (m_bufferSize + m_writeIndex - m_readIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
auto ring_queue_spinlock<_Ty, _Option, _Sty>::capacity() const noexcept->size_type
{
return m_bufferSize - 1;
}

template<class _Ty, bool _Option, class _Sty>
bool ring_queue_spinlock<_Ty, _Option, _Sty>::empty() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire) == 0;
#else
std::scoped_lock __guard(this->m_lock);

return m_writeIndex == m_readIndex;
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
bool ring_queue_spinlock<_Ty, _Option, _Sty>::full() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return (m_count.load(std::memory_order_acquire) == (m_bufferSize - 1));
#else
std::scoped_lock __guard(this->m_lock);

return nextIndex(m_writeIndex) == m_readIndex;
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
template<class U>
bool ring_queue_spinlock<_Ty, _Option, _Sty>::try_push(U&& value) noexcept(std::is_nothrow_move_assignable_v<U>)
{
std::scoped_lock __guard(this->m_lock);

auto nextWriteIndex = nextIndex(m_writeIndex);
if (nextWriteIndex == m_readIndex)
return false;

assert(m_writeIndex < m_bufferSize);

m_bufferPtr[m_writeIndex] = std::move(value);
m_writeIndex = nextWriteIndex;

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_add(1, std::memory_order_acq_rel);
#endif
return true;
}

template<class _Ty, bool _Option, class _Sty>
bool ring_queue_spinlock<_Ty, _Option, _Sty>::try_pop(value_type& value) noexcept(std::is_nothrow_move_assignable_v<value_type>)
{
std::scoped_lock __guard(this->m_lock);

if (m_readIndex == m_writeIndex)
return false;

optional_type& ov = m_bufferPtr[m_readIndex];
if constexpr (use_option)
{
value = std::move(ov.value());
ov = std::nullopt;
}
else
{
value = std::move(ov);
}

m_readIndex = nextIndex(m_readIndex);

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_sub(1, std::memory_order_acq_rel);
#endif
return true;
}


+ 2
- 0
librf/src/scheduler.cpp View File

@@ -109,6 +109,8 @@ RESUMEF_NS
void scheduler_t::add_generator(state_base_t* sptr)
{
assert(sptr != nullptr);
scoped_lock<spinlock> __guard(_lock_running);
_runing_states.emplace_back(sptr);
}

+ 5
- 0
librf/src/state.cpp View File

@@ -5,6 +5,11 @@ RESUMEF_NS
state_base_t::~state_base_t()
{
}
void state_base_t::destroy_deallocate()
{
delete this;
}
void state_future_t::destroy_deallocate()
{

+ 1
- 1
librf/src/state.h View File

@@ -29,7 +29,7 @@ RESUMEF_NS
virtual ~state_base_t();
private:
virtual void destroy_deallocate() = 0;
virtual void destroy_deallocate();
public:
virtual void resume() = 0;
virtual bool has_handler() const noexcept = 0;

+ 1
- 1
librf/src/state.inl View File

@@ -242,7 +242,7 @@ RESUMEF_NS
case result_type::Exception:
_exception.~exception_ptr();
default:
this->_value = &val;
this->_value = std::addressof(val);
this->_has_value.store(result_type::Value, std::memory_order_release);
break;
}

+ 14
- 17
tutorial/test_async_channel.cpp View File

@@ -24,14 +24,10 @@ future_t<> test_channel_read(const channel_t<std::string> & c)
try
#endif
{
auto val = co_await c.read();
//auto val = co_await c; //第二种从channel读出数据的方法。利用重载operator co_await(),而不是c是一个awaitable_t。
//auto val = co_await c.read();
auto val = co_await c; //第二种从channel读出数据的方法。利用重载operator co_await(),而不是c是一个awaitable_t。
std::cout << val << ":";
#if _DEBUG
for (auto v2 : c.debug_queue())
std::cout << v2 << ",";
#endif
std::cout << std::endl;
}
#ifndef __clang__
@@ -52,19 +48,14 @@ future_t<> test_channel_write(const channel_t<std::string> & c)
for (size_t i = 0; i < 10; ++i)
{
co_await c.write(std::to_string(i));
//co_await (c << std::to_string(i)); //第二种写入数据到channel的方法。因为优先级关系,需要将'c << i'括起来
//co_await c.write(std::to_string(i));
co_await (c << std::to_string(i)); //第二种写入数据到channel的方法。因为优先级关系,需要将'c << i'括起来
std::cout << "<" << i << ">:";
#if _DEBUG
for (auto val : c.debug_queue())
std::cout << val << ",";
#endif
std::cout << std::endl;
}
}
void test_channel_read_first()
{
channel_t<std::string> c(MAX_CHANNEL_QUEUE);
@@ -87,9 +78,11 @@ void test_channel_write_first()
static const int N = 1000000;
void test_channel_performance()
void test_channel_performance(size_t buff_size)
{
channel_t<int> c{1};
//1的话,效率跟golang比,有点惨不忍睹。
//1000的话,由于几乎不需要调度器接入,效率就很高了,随便过千万数量级。
channel_t<int> c{ buff_size };
go[&]() -> future_t<>
{
@@ -109,7 +102,7 @@ void test_channel_performance()
} while (i > 0);
auto dt = duration_cast<duration<double>>(high_resolution_clock::now() - tstart).count();
std::cout << "channel w/r " << N << " times, cost time " << dt << "s" << std::endl;
std::cout << "channel buff=" << c.capacity() << ", w/r " << N << " times, cost time " << dt << "s" << std::endl;
};
this_scheduler()->run_until_notask();
@@ -123,5 +116,9 @@ void resumable_main_channel()
test_channel_write_first();
std::cout << std::endl;
test_channel_performance();
test_channel_performance(1);
test_channel_performance(10);
test_channel_performance(100);
test_channel_performance(1000);
test_channel_performance(10000);
}

+ 5
- 5
tutorial/test_async_event_v2.cpp View File

@@ -10,7 +10,7 @@
using namespace resumef;

//非协程的逻辑线程,或异步代码,可以通过event_t通知到协程,并且不会阻塞协程所在的线程。
std::thread async_set_event_all(const v2::event_t & e, std::chrono::milliseconds dt)
std::thread async_set_event_all(const event_v2::event_t & e, std::chrono::milliseconds dt)
{
return std::thread([=]
{
@@ -19,7 +19,7 @@ std::thread async_set_event_all(const v2::event_t & e, std::chrono::milliseconds
});
}

std::thread async_set_event_one(const v2::event_t& e, std::chrono::milliseconds dt)
std::thread async_set_event_one(const event_v2::event_t& e, std::chrono::milliseconds dt)
{
return std::thread([=]
{
@@ -29,7 +29,7 @@ std::thread async_set_event_one(const v2::event_t& e, std::chrono::milliseconds
}


future_t<> resumable_wait_event(const v2::event_t & e, int idx)
future_t<> resumable_wait_event(const event_v2::event_t & e, int idx)
{
co_await e;
std::cout << "[" << idx << "]event signal!" << std::endl;
@@ -40,7 +40,7 @@ void test_notify_all()
using namespace std::chrono;

{
v2::event_t evt;
event_v2::event_t evt;
go resumable_wait_event(evt, 0);
go resumable_wait_event(evt, 1);
go resumable_wait_event(evt, 2);
@@ -58,7 +58,7 @@ void test_notify_one()
using namespace std::chrono;

{
v2::event_t evt;
event_v2::event_t evt;
go resumable_wait_event(evt, 10);
go resumable_wait_event(evt, 11);
go resumable_wait_event(evt, 12);

+ 148
- 0
tutorial/test_ring_queue.h View File

@@ -0,0 +1,148 @@
#pragma once

#include <cstdint>
#include <thread>
#include <cstdlib>
#include <iostream>
#include <random>

template<class ring_queue>
bool test_ring_queue_thread(uint32_t seed, bool printResult)
{
ring_queue q{ 12 };

const size_t N = 1000000;
std::atomic<int64_t> total_push = 0, total_pop = 0;

std::thread th_push[2];
for (auto& th : th_push)
{
th = std::thread([=, &q, &total_push]
{
std::mt19937 rd_generator{ seed };
std::uniform_int_distribution<uint32_t> distribution{ 0, 9 };

for (size_t i = 0; i < N; ++i)
{
int value = (int)distribution(rd_generator);
total_push += value;

while (!q.try_push(value))
std::this_thread::yield();
}

if (printResult)
std::cout << "+";
});
}

std::thread th_pop[2];
for (auto& th : th_pop)
{
th = std::thread([=, &q, &total_pop]
{
for (size_t i = 0; i < N; ++i)
{
int value;
while (!q.try_pop(value))
std::this_thread::yield();

total_pop += value;
}

if (printResult)
std::cout << "-";
});
}

for (auto& th : th_push)
th.join();
for (auto& th : th_pop)
th.join();

//assert(total_push.load() == total_pop.load());

if (printResult)
{
std::cout << std::endl;
std::cout << "push = " << total_push.load() << ", pop = " << total_pop.load() << std::endl;
}

return total_push.load() == total_pop.load();
}

template<class ring_queue>
void test_ring_queue_simple()
{
ring_queue rq_zero{ 0 };

assert(rq_zero.empty());
assert(rq_zero.full());
assert(rq_zero.size() == 0);

ring_queue rq{ 2 };

assert(rq.size() == 0);
assert(rq.empty());
assert(!rq.full());

assert(rq.try_push(10));
assert(rq.size() == 1);
assert(!rq.empty());
assert(!rq.full());

assert(rq.try_push(11));
assert(rq.size() == 2);
assert(!rq.empty());
assert(rq.full());

assert(!rq.try_push(12));

int value;
(void)value;
assert(rq.try_pop(value) && value == 10);
assert(rq.size() == 1);
assert(!rq.empty());
assert(!rq.full());

assert(rq.try_pop(value) && value == 11);
assert(rq.size() == 0);
assert(rq.empty());
assert(!rq.full());

assert(!rq.try_pop(value));

assert(rq.try_push(13));
assert(rq.size() == 1);
assert(!rq.empty());
assert(!rq.full());

assert(rq.try_pop(value) && value == 13);
assert(rq.size() == 0);
assert(rq.empty());
assert(!rq.full());
}

template<class ring_queue>
void test_ring_queue()
{
using namespace std::chrono;

std::random_device rd{};

test_ring_queue_simple<ring_queue>();

auto tp_start = system_clock::now();

for (int i = 0; i < 20; ++i)
{
if (test_ring_queue_thread<ring_queue>(rd(), false))
std::cout << ".";
else
std::cout << "E";
}

auto dt = system_clock::now() - tp_start;
std::cout << std::endl;
std::cout << "cost time: " << duration_cast<duration<double>>(dt).count() << "s." << std::endl;
}

+ 16
- 2
vs_proj/librf.cpp View File

@@ -2,6 +2,12 @@
#include "librf.h"
#include <optional>
//#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE 1
#include "src/ring_queue.h"
#include "src/ring_queue_spinlock.h"
#include "src/ring_queue_lockfree.h"
#include "../tutorial/test_ring_queue.h"
extern void resumable_main_yield_return();
extern void resumable_main_timer();
extern void resumable_main_suspend_always();
@@ -32,7 +38,15 @@ int main(int argc, const char* argv[])
{
(void)argc;
(void)argv;
resumable_main_event_v2();
//test_ring_queue_simple<ring_queue_single_thread<int>>();
//test_ring_queue<ring_queue_spinlock<int, false, uint32_t>>();
//test_ring_queue<ring_queue_lockfree<int, uint64_t>>();
resumable_main_channel();
resumable_main_channel_mult_thread();
resumable_main_switch_scheduler();
benchmark_main_channel_passing_next();
return 0;
//if (argc > 1)
@@ -54,7 +68,7 @@ int main(int argc, const char* argv[])
resumable_main_benchmark_mem(false);
resumable_main_mutex();
resumable_main_event();
resumable_main_event_v2();
//resumable_main_event_v2();
resumable_main_event_timeout();
resumable_main_channel();
resumable_main_channel_mult_thread();

+ 18
- 10
vs_proj/librf.vcxproj View File

@@ -34,19 +34,19 @@
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>ClangCL</PlatformToolset>
<PlatformToolset>v142</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>NotSet</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<PlatformToolset>v142</PlatformToolset>
<PlatformToolset>ClangCL</PlatformToolset>
<UseDebugLibraries>true</UseDebugLibraries>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v142</PlatformToolset>
<PlatformToolset>ClangCL</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>NotSet</CharacterSet>
</PropertyGroup>
@@ -83,13 +83,15 @@
<ClCompile>
<WarningLevel>Level4</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_CONSOLE;ASIO_STANDALONE;_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING=1;ASIO_DISABLE_CONCEPTS=1;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>_CONSOLE;ASIO_STANDALONE;_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING=1;ASIO_DISABLE_CONCEPTS=1;RESUMEF_ENABLE_MULT_SCHEDULER=1;_DEBUG;WIN32;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalIncludeDirectories>..\librf;..\..\asio\asio\include;..\..\asio-1.10.6\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalOptions>/await</AdditionalOptions>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<LanguageStandard>stdcpplatest</LanguageStandard>
<OmitFramePointers />
<DisableSpecificWarnings>4834</DisableSpecificWarnings>
<SDLCheck>true</SDLCheck>
<StringPooling>true</StringPooling>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
@@ -100,7 +102,7 @@
<ClCompile>
<WarningLevel>Level4</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;ASIO_STANDALONE;RESUMEF_DEBUG_COUNTER=0;RESUMEF_ENABLE_MULT_SCHEDULER=1;RESUMEF_USE_BOOST_ANY=0;_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING=1;ASIO_DISABLE_CONCEPTS=1;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>_CONSOLE;ASIO_STANDALONE;_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING=1;ASIO_DISABLE_CONCEPTS=1;RESUMEF_ENABLE_MULT_SCHEDULER=1;RESUMEF_DEBUG_COUNTER=0;_DEBUG;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalIncludeDirectories>..\librf;..\..\asio\asio\include;..\..\asio-1.10.6\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalOptions>/await</AdditionalOptions>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
@@ -108,6 +110,7 @@
<CLanguageStandard>c11</CLanguageStandard>
<CppLanguageStandard>c++1y</CppLanguageStandard>
<DisableSpecificWarnings>4834</DisableSpecificWarnings>
<EnableEnhancedInstructionSet>AdvancedVectorExtensions2</EnableEnhancedInstructionSet>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
@@ -119,7 +122,7 @@
<WarningLevel>Level4</WarningLevel>
<Optimization>Full</Optimization>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_CONSOLE;ASIO_STANDALONE;_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING=1;ASIO_DISABLE_CONCEPTS=1;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>_CONSOLE;ASIO_STANDALONE;_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING=1;ASIO_DISABLE_CONCEPTS=1;RESUMEF_ENABLE_MULT_SCHEDULER=1;NDEBUG;WIN32;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<SDLCheck>
</SDLCheck>
<AdditionalIncludeDirectories>..\librf;..\..\asio\asio\include;..\..\asio-1.10.6\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
@@ -146,7 +149,7 @@
<WarningLevel>Level4</WarningLevel>
<Optimization>Full</Optimization>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>NDEBUG;_CONSOLE;ASIO_STANDALONE;RESUMEF_ENABLE_MULT_SCHEDULER=1;_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING=1;ASIO_DISABLE_CONCEPTS=1;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>_CONSOLE;ASIO_STANDALONE;_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING=1;ASIO_DISABLE_CONCEPTS=1;RESUMEF_ENABLE_MULT_SCHEDULER=1;NDEBUG;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalIncludeDirectories>..\librf;..\..\asio\asio\include;..\..\asio-1.10.6\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalOptions>/await</AdditionalOptions>
<FavorSizeOrSpeed>Size</FavorSizeOrSpeed>
@@ -177,9 +180,7 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\benchmark\benchmark_async_mem.cpp" />
<ClCompile Include="..\benchmark\benchmark_channel_passing_next.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\benchmark\benchmark_channel_passing_next.cpp" />
<ClCompile Include="..\librf\src\event_v1.cpp" />
<ClCompile Include="..\librf\src\event_v2.cpp" />
<ClCompile Include="..\librf\src\mutex.cpp" />
@@ -221,6 +222,8 @@
<ClInclude Include="..\librf\src\asio_task.h" />
<ClInclude Include="..\librf\src\awaitable.h" />
<ClInclude Include="..\librf\src\channel.h" />
<ClInclude Include="..\librf\src\channel_v1.h" />
<ClInclude Include="..\librf\src\channel_v2.h" />
<ClInclude Include="..\librf\src\counted_ptr.h" />
<ClInclude Include="..\librf\src\def.h" />
<ClInclude Include="..\librf\src\event.h" />
@@ -231,6 +234,9 @@
<ClInclude Include="..\librf\src\promise.h" />
<ClInclude Include="..\librf\src\mutex.h" />
<ClInclude Include="..\librf\src\rf_task.h" />
<ClInclude Include="..\librf\src\ring_queue_lockfree.h" />
<ClInclude Include="..\librf\src\ring_queue.h" />
<ClInclude Include="..\librf\src\ring_queue_spinlock.h" />
<ClInclude Include="..\librf\src\scheduler.h" />
<ClInclude Include="..\librf\src\sleep.h" />
<ClInclude Include="..\librf\src\spinlock.h" />
@@ -242,6 +248,8 @@
<ClInclude Include="..\librf\src\utils.h" />
<ClInclude Include="..\librf\src\when.h" />
<ClInclude Include="..\librf\src\_awaker.h" />
<ClInclude Include="..\tutorial\test_ring_queue.h" />
<ClInclude Include="dcas.h" />
</ItemGroup>
<ItemGroup>
<None Include="..\librf\src\asio_task_1.10.0.inl" />

+ 21
- 0
vs_proj/librf.vcxproj.filters View File

@@ -201,6 +201,27 @@
<ClInclude Include="..\librf\src\event_v2.h">
<Filter>librf\src</Filter>
</ClInclude>
<ClInclude Include="dcas.h">
<Filter>Source Files</Filter>
</ClInclude>
<ClInclude Include="..\librf\src\ring_queue_lockfree.h">
<Filter>librf\src</Filter>
</ClInclude>
<ClInclude Include="..\librf\src\ring_queue_spinlock.h">
<Filter>librf\src</Filter>
</ClInclude>
<ClInclude Include="..\tutorial\test_ring_queue.h">
<Filter>tutorial</Filter>
</ClInclude>
<ClInclude Include="..\librf\src\channel_v1.h">
<Filter>librf\src</Filter>
</ClInclude>
<ClInclude Include="..\librf\src\channel_v2.h">
<Filter>librf\src</Filter>
</ClInclude>
<ClInclude Include="..\librf\src\ring_queue.h">
<Filter>librf\src</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<None Include="..\librf\src\asio_task_1.12.0.inl">

Loading…
Cancel
Save