Browse Source

删除不再维护的代码

3.0.0
tearshark 3 years ago
parent
commit
cfdffca29d

+ 0
- 243
librf/src/channel_v1.h View File

@@ -1,243 +0,0 @@
#pragma once

namespace resumef
{
namespace detail
{
template<class _Ty>
struct channel_impl : public std::enable_shared_from_this<channel_impl<_Ty>>
{
typedef _awaker<channel_impl<_Ty>, _Ty*, error_code> channel_read_awaker;
typedef std::shared_ptr<channel_read_awaker> channel_read_awaker_ptr;

typedef _awaker<channel_impl<_Ty>> channel_write_awaker;
typedef std::shared_ptr<channel_write_awaker> channel_write_awaker_ptr;
typedef std::pair<channel_write_awaker_ptr, _Ty> write_tuple_type;
private:
//typedef spinlock lock_type;
typedef std::recursive_mutex lock_type;

lock_type _lock; //保证访问本对象是线程安全的
const size_t _max_counter; //数据队列的容量上限
std::deque<_Ty> _values; //数据队列
std::list<channel_read_awaker_ptr> _read_awakes; //读队列
std::list<write_tuple_type> _write_awakes; //写队列
public:
channel_impl(size_t max_counter_)
:_max_counter(max_counter_)
{
}

#if _DEBUG
const std::deque<_Ty>& debug_queue() const
{
return _values;
}
#endif

template<class callee_t, class = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, channel_read_awaker_ptr>::value>>
decltype(auto) read(callee_t&& awaker)
{
return read_(std::make_shared<channel_read_awaker>(std::forward<callee_t>(awaker)));
}
template<class callee_t, class _Ty2, class = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, channel_write_awaker_ptr>::value>>
decltype(auto) write(callee_t&& awaker, _Ty2&& val)
{
return write_(std::make_shared<channel_write_awaker>(std::forward<callee_t>(awaker)), std::forward<_Ty2>(val));
}

//如果已经触发了awaker,则返回true
//设计目标是线程安全的,实际情况待考察
bool read_(channel_read_awaker_ptr&& r_awaker)
{
assert(r_awaker);

scoped_lock<lock_type> lock_(this->_lock);

bool ret_value;
if (_values.size() > 0)
{
//如果数据队列有数据,则可以直接读数据
auto val = std::move(_values.front());
_values.pop_front();

r_awaker->awake(this, 1, &val, error_code::none);
ret_value = true;
}
else
{
//否则,将“读等待”放入“读队列”
_read_awakes.push_back(r_awaker);
ret_value = false;
}

//如果已有写队列,则唤醒一个“写等待”
awake_one_writer_();

return ret_value;
}

//设计目标是线程安全的,实际情况待考察
template<class _Ty2>
void write_(channel_write_awaker_ptr&& w_awaker, _Ty2&& val)
{
assert(w_awaker);
scoped_lock<lock_type> lock_(this->_lock);

//如果满了,则不添加到数据队列,而是将“写等待”和值,放入“写队列”
bool is_full = _values.size() >= _max_counter;
if (is_full)
_write_awakes.push_back(std::make_pair(std::forward<channel_write_awaker_ptr>(w_awaker), std::forward<_Ty2>(val)));
else
_values.push_back(std::forward<_Ty2>(val));

//如果已有读队列,则唤醒一个“读等待”
awake_one_reader_();

//触发 没有放入“写队列”的“写等待”
if (!is_full) w_awaker->awake(this, 1);
}

private:
//只能被write_函数调用,内部不再需要加锁
void awake_one_reader_()
{
//assert(!(_read_awakes.size() >= 0 && _values.size() == 0));

for (auto iter = _read_awakes.begin(); iter != _read_awakes.end(); )
{
auto r_awaker = *iter;
iter = _read_awakes.erase(iter);

if (r_awaker->awake(this, 1, _values.size() ? &_values.front() : nullptr, error_code::read_before_write))
{
if (_values.size()) _values.pop_front();

//唤醒一个“读等待”后,尝试唤醒一个“写等待”,以处理“数据队列”满后的“写等待”
awake_one_writer_();
break;
}
}
}

//只能被read_函数调用,内部不再需要加锁
void awake_one_writer_()
{
for (auto iter = _write_awakes.begin(); iter != _write_awakes.end(); )
{
auto w_awaker = std::move(*iter);
iter = _write_awakes.erase(iter);

if (w_awaker.first->awake(this, 1))
{
//一个“写等待”唤醒后,将“写等待”绑定的值,放入“数据队列”
_values.push_back(std::move(w_awaker.second));
break;
}
}
}

size_t capacity() const noexcept
{
return _max_counter;
}

channel_impl(const channel_impl&) = delete;
channel_impl(channel_impl&&) = delete;
channel_impl& operator = (const channel_impl&) = delete;
channel_impl& operator = (channel_impl&&) = delete;
};
} //namespace detail

namespace channel_v1
{

template<class _Ty>
struct channel_t
{
typedef detail::channel_impl<_Ty> channel_impl_type;
typedef typename channel_impl_type::channel_read_awaker channel_read_awaker;
typedef typename channel_impl_type::channel_write_awaker channel_write_awaker;

typedef std::shared_ptr<channel_impl_type> channel_impl_ptr;
typedef std::weak_ptr<channel_impl_type> channel_impl_wptr;
typedef std::chrono::system_clock clock_type;
private:
channel_impl_ptr _chan;
public:
channel_t(size_t max_counter = 0)
:_chan(std::make_shared<channel_impl_type>(max_counter))
{

}

template<class _Ty2>
future_t<bool> write(_Ty2&& val) const
{
awaitable_t<bool> awaitable;

auto awaker = std::make_shared<channel_write_awaker>(
[st = awaitable._state](channel_impl_type* chan) -> bool
{
st->set_value(chan ? true : false);
return true;
});
_chan->write_(std::move(awaker), std::forward<_Ty2>(val));

return awaitable.get_future();
}

future_t<_Ty> read() const
{
awaitable_t<_Ty> awaitable;

auto awaker = std::make_shared<channel_read_awaker>(
[st = awaitable._state](channel_impl_type*, _Ty* val, error_code fe) -> bool
{
if (val)
st->set_value(std::move(*val));
else
st->throw_exception(channel_exception{ fe });

return true;
});
_chan->read_(std::move(awaker));

return awaitable.get_future();
}

template<class _Ty2>
future_t<bool> operator << (_Ty2&& val) const
{
return std::move(write(std::forward<_Ty2>(val)));
}

future_t<_Ty> operator co_await () const
{
return read();
}

#if _DEBUG
//非线程安全,返回的队列也不是线程安全的
const auto& debug_queue() const
{
return _chan->debug_queue();
}
#endif

size_t capacity() const noexcept
{
return _chan->capacity();
}

channel_t(const channel_t&) = default;
channel_t(channel_t&&) = default;
channel_t& operator = (const channel_t&) = default;
channel_t& operator = (channel_t&&) = default;
};


using semaphore_t = channel_t<bool>;

} //namespace v1
} //namespace resumef

+ 0
- 360
librf/src/event_v1.cpp View File

@@ -1,360 +0,0 @@
#include "../librf.h"

namespace resumef
{
namespace detail
{
event_impl::event_impl(intptr_t initial_counter_)
: _counter(initial_counter_)
{
}

void event_impl::signal()
{
scoped_lock<lock_type> lock_(this->_lock);

++this->_counter;

for (auto iter = this->_awakes.begin(); iter != this->_awakes.end(); )
{
auto awaker = *iter;
iter = this->_awakes.erase(iter);

if (awaker->awake(this, 1))
{
if (--this->_counter == 0)
break;
}
}
}

void event_impl::reset()
{
scoped_lock<lock_type> lock_(this->_lock);

this->_awakes.clear();
this->_counter = 0;
}

bool event_impl::wait_(const event_awaker_ptr& awaker)
{
assert(awaker);

scoped_lock<lock_type> lock_(this->_lock);

if (this->_counter > 0)
{
if (awaker->awake(this, 1))
{
--this->_counter;
return true;
}
}
else
{
this->_awakes.push_back(awaker);
}
return false;
}

}

namespace event_v1
{
event_t::event_t(intptr_t initial_counter_)
: _event(std::make_shared<detail::event_impl>(initial_counter_))
{
}

future_t<bool> event_t::wait() const
{
awaitable_t<bool> awaitable;

auto awaker = std::make_shared<detail::event_awaker>(
[st = awaitable._state](detail::event_impl* e) -> bool
{
st->set_value(e ? true : false);
return true;
});
_event->wait_(awaker);

return awaitable.get_future();
}

future_t<bool> event_t::wait_until_(const clock_type::time_point& tp) const
{
awaitable_t<bool> awaitable;

auto awaker = std::make_shared<detail::event_awaker>(
[st = awaitable._state](detail::event_impl* e) -> bool
{
st->set_value(e ? true : false);
return true;
});
_event->wait_(awaker);

(void)this_scheduler()->timer()->add(tp,
[awaker](bool)
{
awaker->awake(nullptr, 1);
});

return awaitable.get_future();
}

struct wait_any_awaker
{
typedef state_t<intptr_t> state_type;
counted_ptr<state_type> st;
std::vector<detail::event_impl*> evts;

wait_any_awaker(const counted_ptr<state_type>& st_, std::vector<detail::event_impl*>&& evts_)
: st(st_)
, evts(std::forward<std::vector<detail::event_impl*>>(evts_))
{}
wait_any_awaker(const wait_any_awaker&) = delete;
wait_any_awaker(wait_any_awaker&&) = default;

bool operator()(detail::event_impl* e) const
{
if (e)
{
for (auto i = evts.begin(); i != evts.end(); ++i)
{
if (e == (*i))
{
st->set_value((intptr_t)(i - evts.begin()));
return true;
}
}
}
else
{
st->set_value(-1);
}

return false;
}
};

future_t<intptr_t> event_t::wait_any_(std::vector<event_impl_ptr>&& evts)
{
awaitable_t<intptr_t> awaitable;

if (evts.size() <= 0)
{
awaitable._state->set_value(-1);
return awaitable.get_future();
}

auto awaker = std::make_shared<detail::event_awaker>(
[st = awaitable._state, evts](detail::event_impl* e) -> bool
{
if (e)
{
for (auto i = evts.begin(); i != evts.end(); ++i)
{
if (e == (*i).get())
{
st->set_value((intptr_t)(i - evts.begin()));
return true;
}
}
}
else
{
st->set_value(-1);
}

return false;
});

for (auto e : evts)
{
e->wait_(awaker);
}

return awaitable.get_future();
}

future_t<intptr_t> event_t::wait_any_until_(const clock_type::time_point& tp, std::vector<event_impl_ptr>&& evts)
{
awaitable_t<intptr_t> awaitable;

auto awaker = std::make_shared<detail::event_awaker>(
[st = awaitable._state, evts](detail::event_impl* e) -> bool
{
if (e)
{
for (auto i = evts.begin(); i != evts.end(); ++i)
{
if (e == (*i).get())
{
st->set_value((intptr_t)(i - evts.begin()));
return true;
}
}
}
else
{
st->set_value(-1);
}

return false;
});

for (auto e : evts)
{
e->wait_(awaker);
}

(void)this_scheduler()->timer()->add(tp,
[awaker](bool)
{
awaker->awake(nullptr, 1);
});

return awaitable.get_future();
}

future_t<bool> event_t::wait_all_(std::vector<event_impl_ptr>&& evts)
{
awaitable_t<bool> awaitable;
if (evts.size() <= 0)
{
awaitable._state->set_value(false);
return awaitable.get_future();
}

auto awaker = std::make_shared<detail::event_awaker>(
[st = awaitable._state](detail::event_impl* e) -> bool
{
st->set_value(e ? true : false);
return true;
},
evts.size());

for (auto e : evts)
{
e->wait_(awaker);
}

return awaitable.get_future();
}


struct event_t::wait_all_ctx
{
//typedef spinlock lock_type;
typedef std::recursive_mutex lock_type;

counted_ptr<state_t<bool>> st;
std::vector<event_impl_ptr> evts;
std::vector<event_impl_ptr> evts_waited;
timer_handler th;
lock_type _lock;

wait_all_ctx()
{
#if RESUMEF_DEBUG_COUNTER
++g_resumef_evtctx_count;
#endif
}
~wait_all_ctx()
{
th.stop();
#if RESUMEF_DEBUG_COUNTER
--g_resumef_evtctx_count;
#endif
}

bool awake(detail::event_impl* eptr)
{
scoped_lock<lock_type> lock_(this->_lock);

//如果st为nullptr,则说明之前已经返回过值了。本环境无效了。
if (!st.get())
return false;

if (eptr)
{
//记录已经等到的事件
evts_waited.emplace_back(eptr->shared_from_this());

//已经等到的事件达到预期
if (evts_waited.size() == evts.size())
{
evts_waited.clear();

//返回true表示等待成功
st->set_value(true);
//丢弃st,以便于还有其它持有的ctx返回false
st.reset();
//取消定时器
th.stop();
}
}
else
{
//超时后,恢复已经等待的事件计数
for (auto sptr : evts_waited)
{
sptr->signal();
}
evts_waited.clear();

//返回true表示等待失败
st->set_value(false);
//丢弃st,以便于还有其它持有的ctx返回false
st.reset();
//定时器句柄已经无意义了
th.reset();
}

return true;
}
};

//等待所有的事件
//超时后的行为应该表现为:
//要么所有的事件计数减一,要么所有事件计数不动
//则需要超时后,恢复已经等待的事件计数
future_t<bool> event_t::wait_all_until_(const clock_type::time_point& tp, std::vector<event_impl_ptr>&& evts)
{
awaitable_t<bool> awaitable;
if (evts.size() <= 0)
{
(void)this_scheduler()->timer()->add_handler(tp,
[st = awaitable._state](bool)
{
st->set_value(false);
});
return awaitable.get_future();
}

auto ctx = std::make_shared<wait_all_ctx>();
ctx->st = awaitable._state;
ctx->evts_waited.reserve(evts.size());
ctx->evts = std::move(evts);
ctx->th = this_scheduler()->timer()->add_handler(tp,
[ctx](bool)
{
ctx->awake(nullptr);
});

for (auto e : ctx->evts)
{
auto awaker = std::make_shared<detail::event_awaker>(
[ctx](detail::event_impl* eptr) -> bool
{
return ctx->awake(eptr);
});
e->wait_(awaker);
}


return awaitable.get_future();
}

}
}

+ 0
- 215
librf/src/event_v1.h View File

@@ -1,215 +0,0 @@
#pragma once

namespace resumef
{
namespace detail
{
struct event_impl;
typedef _awaker<event_impl> event_awaker;
typedef std::shared_ptr<event_awaker> event_awaker_ptr;

struct event_impl : public std::enable_shared_from_this<event_impl>
{
private:
//typedef spinlock lock_type;
typedef std::recursive_mutex lock_type;

std::list<event_awaker_ptr> _awakes;
intptr_t _counter;
lock_type _lock;
public:
event_impl(intptr_t initial_counter_);

void signal();
void reset();

//如果已经触发了awaker,则返回true
bool wait_(const event_awaker_ptr& awaker);

template<class callee_t, class dummy_t = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, event_awaker_ptr>::value>>
decltype(auto) wait(callee_t&& awaker, dummy_t* dummy_ = nullptr)
{
(void)dummy_;
return wait_(std::make_shared<event_awaker>(std::forward<callee_t>(awaker)));
}

event_impl(const event_impl&) = delete;
event_impl(event_impl&&) = delete;
event_impl& operator = (const event_impl&) = delete;
event_impl& operator = (event_impl&&) = delete;
};
}

namespace event_v1
{

//提供一种在协程和非协程之间同步的手段。
//典型用法是在非协程的线程,或者异步代码里,调用signal()方法触发信号,
//协程代码里,调用co_await wait()等系列方法等待同步。
struct event_t
{
typedef std::shared_ptr<detail::event_impl> event_impl_ptr;
typedef std::weak_ptr<detail::event_impl> event_impl_wptr;
typedef std::chrono::system_clock clock_type;
private:
event_impl_ptr _event;
struct wait_all_ctx;
public:
event_t(intptr_t initial_counter_ = 0);

void signal() const
{
_event->signal();
}
void reset() const
{
_event->reset();
}



future_t<bool>
wait() const;
template<class _Rep, class _Period>
future_t<bool>
wait_for(const std::chrono::duration<_Rep, _Period>& dt) const
{
return wait_for_(std::chrono::duration_cast<clock_type::duration>(dt));
}
template<class _Clock, class _Duration>
future_t<bool>
wait_until(const std::chrono::time_point<_Clock, _Duration>& tp) const
{
return wait_until_(std::chrono::time_point_cast<clock_type::duration>(tp));
}





template<class _Iter>
static future_t<intptr_t>
wait_any(_Iter begin_, _Iter end_)
{
return wait_any_(make_event_vector(begin_, end_));
}
template<class _Cont>
static future_t<intptr_t>
wait_any(const _Cont& cnt_)
{
return wait_any_(make_event_vector(std::begin(cnt_), std::end(cnt_)));
}

template<class _Rep, class _Period, class _Iter>
static future_t<intptr_t>
wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_)
{
return wait_any_for_(std::chrono::duration_cast<clock_type::duration>(dt), make_event_vector(begin_, end_));
}
template<class _Rep, class _Period, class _Cont>
static future_t<intptr_t>
wait_any_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_)
{
return wait_any_for_(std::chrono::duration_cast<clock_type::duration>(dt), make_event_vector(std::begin(cnt_), std::end(cnt_)));
}

template<class _Clock, class _Duration, class _Iter>
static future_t<intptr_t>
wait_any_until(const std::chrono::time_point<_Clock, _Duration>& tp, _Iter begin_, _Iter end_)
{
return wait_any_until_(std::chrono::time_point_cast<clock_type::duration>(tp), make_event_vector(begin_, end_));
}
template<class _Clock, class _Duration, class _Cont>
static future_t<intptr_t>
wait_any_until(const std::chrono::time_point<_Clock, _Duration>& tp, const _Cont& cnt_)
{
return wait_any_until_(std::chrono::time_point_cast<clock_type::duration>(tp), make_event_vector(std::begin(cnt_), std::end(cnt_)));
}





template<class _Iter>
static future_t<bool>
wait_all(_Iter begin_, _Iter end_)
{
return wait_all_(make_event_vector(begin_, end_));
}
template<class _Cont>
static future_t<bool>
wait_all(const _Cont& cnt_)
{
return wait_all(std::begin(cnt_), std::end(cnt_));
}

template<class _Rep, class _Period, class _Iter>
static future_t<bool>
wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, _Iter begin_, _Iter end_)
{
return wait_all_for_(std::chrono::duration_cast<clock_type::duration>(dt), make_event_vector(begin_, end_));
}
template<class _Rep, class _Period, class _Cont>
static future_t<bool>
wait_all_for(const std::chrono::duration<_Rep, _Period>& dt, const _Cont& cnt_)
{
return wait_all_for_(std::chrono::duration_cast<clock_type::duration>(dt), make_event_vector(std::begin(cnt_), std::end(cnt_)));
}

template<class _Clock, class _Duration, class _Iter>
static future_t<bool>
wait_all_until(const std::chrono::time_point<_Clock, _Duration>& tp, _Iter begin_, _Iter end_)
{
return wait_all_until_(std::chrono::time_point_cast<clock_type::duration>(tp), make_event_vector(begin_, end_));
}
template<class _Clock, class _Duration, class _Cont>
static future_t<bool>
wait_all_until(const std::chrono::time_point<_Clock, _Duration>& tp, const _Cont& cnt_)
{
return wait_all_until_(std::chrono::time_point_cast<clock_type::duration>(tp), make_event_vector(std::begin(cnt_), std::end(cnt_)));
}



event_t(const event_t&) = default;
event_t(event_t&&) = default;
event_t& operator = (const event_t&) = default;
event_t& operator = (event_t&&) = default;

private:
template<class _Iter>
static std::vector<event_impl_ptr> make_event_vector(_Iter begin_, _Iter end_)
{
std::vector<event_impl_ptr> evts;
evts.reserve(std::distance(begin_, end_));
for (auto i = begin_; i != end_; ++i)
evts.push_back((*i)._event);

return evts;
}

inline future_t<bool> wait_for_(const clock_type::duration& dt) const
{
return wait_until_(clock_type::now() + dt);
}
future_t<bool> wait_until_(const clock_type::time_point& tp) const;


static future_t<intptr_t> wait_any_(std::vector<event_impl_ptr>&& evts);
inline static future_t<intptr_t> wait_any_for_(const clock_type::duration& dt, std::vector<event_impl_ptr>&& evts)
{
return wait_any_until_(clock_type::now() + dt, std::forward<std::vector<event_impl_ptr>>(evts));
}
static future_t<intptr_t> wait_any_until_(const clock_type::time_point& tp, std::vector<event_impl_ptr>&& evts);


static future_t<bool> wait_all_(std::vector<event_impl_ptr>&& evts);
inline static future_t<bool> wait_all_for_(const clock_type::duration& dt, std::vector<event_impl_ptr>&& evts)
{
return wait_all_until_(clock_type::now() + dt, std::forward<std::vector<event_impl_ptr>>(evts));
}
static future_t<bool> wait_all_until_(const clock_type::time_point& tp, std::vector<event_impl_ptr>&& evts);
};

}
}

+ 0
- 128
librf/src/mutex_v1.cpp View File

@@ -1,128 +0,0 @@
#include "../librf.h"

namespace resumef
{
namespace detail
{
mutex_impl::mutex_impl()
{
}

void mutex_impl::unlock()
{
scoped_lock<lock_type> lock_(this->_lock);

if (_owner != nullptr)
{
for (auto iter = _awakes.begin(); iter != _awakes.end(); )
{
auto awaker = *iter;
iter = _awakes.erase(iter);

if (awaker->awake(this, 1))
{
_owner = awaker;
break;
}

}
if (_awakes.size() == 0)
{
_owner = nullptr;
}
}
}

bool mutex_impl::lock_(const mutex_awaker_ptr& awaker)
{
assert(awaker);

scoped_lock<lock_type> lock_(this->_lock);

if (_owner == nullptr)
{
_owner = awaker;
awaker->awake(this, 1);
return true;
}
else
{
_awakes.push_back(awaker);
return false;
}
}

bool mutex_impl::try_lock_(const mutex_awaker_ptr& awaker)
{
assert(awaker);

scoped_lock<lock_type> lock_(this->_lock);

if (_owner == nullptr)
{
_owner = awaker;
return true;
}
else
{
return false;
}
}

}

namespace mutex_v1
{

mutex_t::mutex_t()
: _locker(std::make_shared<detail::mutex_impl>())
{
}

future_t<bool> mutex_t::lock() const
{
awaitable_t<bool> awaitable;

auto awaker = std::make_shared<detail::mutex_awaker>(
[st = awaitable._state](detail::mutex_impl* e) -> bool
{
st->set_value(e ? true : false);
return true;
});
_locker->lock_(awaker);

return awaitable.get_future();
}

bool mutex_t::try_lock() const
{
auto dummy_awaker = std::make_shared<detail::mutex_awaker>(
[](detail::mutex_impl*) -> bool
{
return true;
});
return _locker->try_lock_(dummy_awaker);
}

future_t<bool> mutex_t::try_lock_until_(const clock_type::time_point& tp) const
{
awaitable_t<bool> awaitable;

auto awaker = std::make_shared<detail::mutex_awaker>(
[st = awaitable._state](detail::mutex_impl* e) -> bool
{
st->set_value(e ? true : false);
return true;
});
_locker->lock_(awaker);

(void)this_scheduler()->timer()->add(tp,
[awaker](bool)
{
awaker->awake(nullptr, 1);
});

return awaitable.get_future();
}
}
}

+ 0
- 92
librf/src/mutex_v1.h View File

@@ -1,92 +0,0 @@
#pragma once

namespace resumef
{
namespace detail
{
struct mutex_impl;
typedef ::resumef::detail::_awaker<mutex_impl> mutex_awaker;
typedef std::shared_ptr<mutex_awaker> mutex_awaker_ptr;

struct mutex_impl : public std::enable_shared_from_this<mutex_impl>
{
private:
//typedef spinlock lock_type;
typedef std::recursive_mutex lock_type;

std::list<mutex_awaker_ptr> _awakes;
mutex_awaker_ptr _owner;
lock_type _lock;
public:
mutex_impl();

//如果已经触发了awaker,则返回true
bool lock_(const mutex_awaker_ptr& awaker);
bool try_lock_(const mutex_awaker_ptr& awaker);
void unlock();

template<class callee_t, class dummy_t = std::enable_if<!std::is_same<std::remove_cv_t<callee_t>, mutex_awaker_ptr>::value>>
decltype(auto) lock(callee_t&& awaker, dummy_t* dummy_ = nullptr)
{
(void)dummy_;
return lock_(std::make_shared<mutex_awaker>(std::forward<callee_t>(awaker)));
}

private:
mutex_impl(const mutex_impl&) = delete;
mutex_impl(mutex_impl&&) = delete;
mutex_impl& operator = (const mutex_impl&) = delete;
mutex_impl& operator = (mutex_impl&&) = delete;
};
}

namespace mutex_v1
{
struct mutex_t
{
typedef std::shared_ptr<detail::mutex_impl> lock_impl_ptr;
typedef std::weak_ptr<detail::mutex_impl> lock_impl_wptr;
typedef std::chrono::system_clock clock_type;
private:
lock_impl_ptr _locker;
public:
mutex_t();

void unlock() const
{
_locker->unlock();
}


future_t<bool> lock() const;
bool try_lock() const;

/*
template<class _Rep, class _Period>
awaitable_t<bool>
try_lock_for(const std::chrono::duration<_Rep, _Period> & dt) const
{
return try_lock_for_(std::chrono::duration_cast<clock_type::duration>(dt));
}
template<class _Clock, class _Duration>
awaitable_t<bool>
try_lock_until(const std::chrono::time_point<_Clock, _Duration> & tp) const
{
return try_lock_until_(std::chrono::time_point_cast<clock_type::duration>(tp));
}
*/


mutex_t(const mutex_t&) = default;
mutex_t(mutex_t&&) = default;
mutex_t& operator = (const mutex_t&) = default;
mutex_t& operator = (mutex_t&&) = default;
private:
inline future_t<bool> try_lock_for_(const clock_type::duration& dt) const
{
return try_lock_until_(clock_type::now() + dt);
}
future_t<bool> try_lock_until_(const clock_type::time_point& tp) const;
};
}
}

+ 0
- 219
librf/src/ring_queue_lockfree.h View File

@@ -1,219 +0,0 @@
#pragma once

namespace resumef
{
//目前无法解决三个索引数值回绕导致的问题
//如果为了避免索引回绕的问题,索引采用uint64_t类型,
//则在与spinlock<T, false, uint32_t>版本的对比中速度反而慢了
//pop时无法使用move语义来获取数据。因为算法要求先获取值,且获取后有可能失败,从而重新获取其它值。
template<class _Ty, class _Sty = uint32_t>
struct ring_queue_lockfree
{
using value_type = _Ty;
using size_type = _Sty;
public:
ring_queue_lockfree(size_t sz);

ring_queue_lockfree(const ring_queue_lockfree&) = delete;
ring_queue_lockfree(ring_queue_lockfree&&) = default;
ring_queue_lockfree& operator =(const ring_queue_lockfree&) = delete;
ring_queue_lockfree& operator =(ring_queue_lockfree&&) = default;

auto size() const noexcept->size_type;
auto capacity() const noexcept->size_type;
bool empty() const noexcept;
bool full() const noexcept;
template<class U>
bool try_push(U&& value) noexcept(std::is_nothrow_move_constructible_v<U>);
bool try_pop(value_type& value) noexcept(std::is_nothrow_move_constructible_v<value_type>);
private:
std::unique_ptr<value_type[]> m_bufferPtr;
size_type m_bufferSize;

std::atomic<size_type> m_writeIndex; //Where a new element will be inserted to.
std::atomic<size_type> m_readIndex; //Where the next element where be extracted from.
std::atomic<size_type> m_maximumReadIndex; //It points to the place where the latest "commited" data has been inserted.
//If it's not the same as writeIndex it means there are writes pending to be "commited" to the queue,
//that means that the place for the data was reserved (the index in the array)
//but the data is still not in the queue,
//so the thread trying to read will have to wait for those other threads to
//save the data into the queue.
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
std::atomic<size_type> m_count;
#endif

auto countToIndex(size_type a_count) const noexcept->size_type;
auto nextIndex(size_type a_count) const noexcept->size_type;
};

template<class _Ty, class _Sty>
ring_queue_lockfree<_Ty, _Sty>::ring_queue_lockfree(size_t sz)
: m_bufferPtr(new value_type[sz + 1])
, m_bufferSize(static_cast<size_type>(sz + 1))
, m_writeIndex(0)
, m_readIndex(0)
, m_maximumReadIndex(0)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
, m_count(0)
#endif
{
assert(sz < (std::numeric_limits<size_type>::max)());
}

template<class _Ty, class _Sty>
auto ring_queue_lockfree<_Ty, _Sty>::countToIndex(size_type a_count) const noexcept->size_type
{
//return (a_count % m_bufferSize);
return a_count;
}

template<class _Ty, class _Sty>
auto ring_queue_lockfree<_Ty, _Sty>::nextIndex(size_type a_count) const noexcept->size_type
{
//return static_cast<size_type>((a_count + 1));
return static_cast<size_type>((a_count + 1) % m_bufferSize);
}

template<class _Ty, class _Sty>
auto ring_queue_lockfree<_Ty, _Sty>::size() const noexcept->size_type
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load();
#else
auto currentWriteIndex = m_maximumReadIndex.load(std::memory_order_acquire);
currentWriteIndex = countToIndex(currentWriteIndex);

auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
currentReadIndex = countToIndex(currentReadIndex);

if (currentWriteIndex >= currentReadIndex)
return (currentWriteIndex - currentReadIndex);
else
return (m_bufferSize + currentWriteIndex - currentReadIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, class _Sty>
auto ring_queue_lockfree<_Ty, _Sty>::capacity() const noexcept->size_type
{
return m_bufferSize - 1;
}

template<class _Ty, class _Sty>
bool ring_queue_lockfree<_Ty, _Sty>::empty() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load() == 0;
#else
auto currentWriteIndex = m_maximumReadIndex.load(std::memory_order_acquire);
auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
return countToIndex(currentWriteIndex) == countToIndex(currentReadIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, class _Sty>
bool ring_queue_lockfree<_Ty, _Sty>::full() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return (m_count.load() == (m_bufferSize - 1));
#else
auto currentWriteIndex = m_writeIndex.load(std::memory_order_acquire);
auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);
return countToIndex(nextIndex(currentWriteIndex)) == countToIndex(currentReadIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, class _Sty>
template<class U>
bool ring_queue_lockfree<_Ty, _Sty>::try_push(U&& value) noexcept(std::is_nothrow_move_constructible_v<U>)
{
auto currentWriteIndex = m_writeIndex.load(std::memory_order_acquire);

do
{
if (countToIndex(nextIndex(currentWriteIndex)) == countToIndex(m_readIndex.load(std::memory_order_acquire)))
{
// the queue is full
return false;
}

// There is more than one producer. Keep looping till this thread is able
// to allocate space for current piece of data
//
// using compare_exchange_strong because it isn't allowed to fail spuriously
// When the compare_exchange operation is in a loop the weak version
// will yield better performance on some platforms, but here we'd have to
// load m_writeIndex all over again
} while (!m_writeIndex.compare_exchange_strong(currentWriteIndex, nextIndex(currentWriteIndex), std::memory_order_acq_rel));

// Just made sure this index is reserved for this thread.
m_bufferPtr[countToIndex(currentWriteIndex)] = std::move(value);

// update the maximum read index after saving the piece of data. It can't
// fail if there is only one thread inserting in the queue. It might fail
// if there is more than 1 producer thread because this operation has to
// be done in the same order as the previous CAS
//
// using compare_exchange_weak because they are allowed to fail spuriously
// (act as if *this != expected, even if they are equal), but when the
// compare_exchange operation is in a loop the weak version will yield
// better performance on some platforms.
auto savedWriteIndex = currentWriteIndex;
while (!m_maximumReadIndex.compare_exchange_weak(currentWriteIndex, nextIndex(currentWriteIndex), std::memory_order_acq_rel))
{
currentWriteIndex = savedWriteIndex;
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield (POSIX.1b)
std::this_thread::yield();
}

// The value was successfully inserted into the queue
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_add(1);
#endif
return true;
}

template<class _Ty, class _Sty>
bool ring_queue_lockfree<_Ty, _Sty>::try_pop(value_type& value) noexcept(std::is_nothrow_move_constructible_v<value_type>)
{
auto currentReadIndex = m_readIndex.load(std::memory_order_acquire);

for (;;)
{
auto idx = countToIndex(currentReadIndex);

// to ensure thread-safety when there is more than 1 producer
// thread a second index is defined (m_maximumReadIndex)
if (idx == countToIndex(m_maximumReadIndex.load(std::memory_order_acquire)))
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
}

// retrieve the data from the queue
value = m_bufferPtr[idx]; //但是,这里的方法不适合。如果只支持移动怎么办?

// try to perfrom now the CAS operation on the read index. If we succeed
// a_data already contains what m_readIndex pointed to before we
// increased it
if (m_readIndex.compare_exchange_strong(currentReadIndex, nextIndex(currentReadIndex), std::memory_order_acq_rel))
{
// got here. The value was retrieved from the queue. Note that the
// data inside the m_queue array is not deleted nor reseted
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_sub(1);
#endif
return true;
}

// it failed retrieving the element off the queue. Someone else must
// have read the element stored at countToIndex(currentReadIndex)
// before we could perform the CAS operation
} // keep looping to try again!
}
}

+ 0
- 162
librf/src/ring_queue_spinlock.h View File

@@ -1,162 +0,0 @@
#pragma once

namespace resumef
{
//使用自旋锁完成的线程安全的环形队列。
//支持多个线程同时push和pop。
//_Option : 如果队列保存的数据不支持拷贝只支持移动,则需要设置为true;或者数据希望pop后销毁,都需要设置为true。
//_Sty : 内存保持数量和索引的整数类型。用于外部控制队列的结构体大小。
template<class _Ty, bool _Option = false, class _Sty = uint32_t>
struct ring_queue_spinlock
{
using value_type = _Ty;
using size_type = _Sty;

static constexpr bool use_option = _Option;
using optional_type = std::conditional_t<use_option, std::optional<value_type>, value_type>;
public:
ring_queue_spinlock(size_t sz);

ring_queue_spinlock(const ring_queue_spinlock&) = delete;
ring_queue_spinlock(ring_queue_spinlock&&) = default;
ring_queue_spinlock& operator =(const ring_queue_spinlock&) = delete;
ring_queue_spinlock& operator =(ring_queue_spinlock&&) = default;

auto size() const noexcept->size_type;
auto capacity() const noexcept->size_type;
bool empty() const noexcept;
bool full() const noexcept;
template<class U>
bool try_push(U&& value) noexcept(std::is_nothrow_move_assignable_v<U>);
bool try_pop(value_type& value) noexcept(std::is_nothrow_move_assignable_v<value_type>);
private:
using container_type = std::conditional_t<std::is_same_v<value_type, bool>, std::unique_ptr<optional_type[]>, std::vector<optional_type>>;
container_type m_bufferPtr;
size_type m_bufferSize;

size_type m_writeIndex;
size_type m_readIndex;
mutable resumef::spinlock m_lock;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
std::atomic<size_type> m_count;
#endif

auto nextIndex(size_type a_count) const noexcept->size_type;
};

template<class _Ty, bool _Option, class _Sty>
ring_queue_spinlock<_Ty, _Option, _Sty>::ring_queue_spinlock(size_t sz)
: m_bufferSize(static_cast<size_type>(sz + 1))
, m_writeIndex(0)
, m_readIndex(0)
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
, m_count(0)
#endif
{
if constexpr (std::is_same_v<value_type, bool>)
m_bufferPtr = container_type{ new optional_type[sz + 1] };
else
m_bufferPtr.resize(sz + 1);
assert(sz < (std::numeric_limits<size_type>::max)());
}

template<class _Ty, bool _Option, class _Sty>
auto ring_queue_spinlock<_Ty, _Option, _Sty>::nextIndex(size_type a_count) const noexcept->size_type
{
return static_cast<size_type>((a_count + 1) % m_bufferSize);
}

template<class _Ty, bool _Option, class _Sty>
auto ring_queue_spinlock<_Ty, _Option, _Sty>::size() const noexcept->size_type
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire);
#else
std::scoped_lock __guard(this->m_lock);

if (m_writeIndex >= m_readIndex)
return (m_writeIndex - m_readIndex);
else
return (m_bufferSize + m_writeIndex - m_readIndex);
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
auto ring_queue_spinlock<_Ty, _Option, _Sty>::capacity() const noexcept->size_type
{
return m_bufferSize - 1;
}

template<class _Ty, bool _Option, class _Sty>
bool ring_queue_spinlock<_Ty, _Option, _Sty>::empty() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return m_count.load(std::memory_order_acquire) == 0;
#else
std::scoped_lock __guard(this->m_lock);

return m_writeIndex == m_readIndex;
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
bool ring_queue_spinlock<_Ty, _Option, _Sty>::full() const noexcept
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
return (m_count.load(std::memory_order_acquire) == (m_bufferSize - 1));
#else
std::scoped_lock __guard(this->m_lock);

return nextIndex(m_writeIndex) == m_readIndex;
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}

template<class _Ty, bool _Option, class _Sty>
template<class U>
bool ring_queue_spinlock<_Ty, _Option, _Sty>::try_push(U&& value) noexcept(std::is_nothrow_move_assignable_v<U>)
{
std::scoped_lock __guard(this->m_lock);

auto nextWriteIndex = nextIndex(m_writeIndex);
if (nextWriteIndex == m_readIndex)
return false;

assert(m_writeIndex < m_bufferSize);

m_bufferPtr[m_writeIndex] = std::move(value);
m_writeIndex = nextWriteIndex;

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_add(1, std::memory_order_acq_rel);
#endif
return true;
}

template<class _Ty, bool _Option, class _Sty>
bool ring_queue_spinlock<_Ty, _Option, _Sty>::try_pop(value_type& value) noexcept(std::is_nothrow_move_assignable_v<value_type>)
{
std::scoped_lock __guard(this->m_lock);

if (m_readIndex == m_writeIndex)
return false;

optional_type& ov = m_bufferPtr[m_readIndex];
if constexpr (use_option)
{
value = std::move(ov.value());
ov = std::nullopt;
}
else
{
value = std::move(ov);
}

m_readIndex = nextIndex(m_readIndex);

#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
m_count.fetch_sub(1, std::memory_order_acq_rel);
#endif
return true;
}
}

Loading…
Cancel
Save