Browse Source

when_all返回所有等待的future的值

tags/v2.9.7
tearshark 6 years ago
parent
commit
a3f73045c9
4 changed files with 202 additions and 48 deletions
  1. 1
    0
      librf/src/future.h
  2. 0
    8
      librf/src/when.cpp
  3. 175
    28
      librf/src/when.h
  4. 26
    12
      tutorial/test_async_when_all.cpp

+ 1
- 0
librf/src/future.h View File

@@ -8,6 +8,7 @@ namespace resumef
template <typename T>
struct future_impl_t
{
typedef T value_type;
typedef promise_t<T> promise_type;
typedef state_t<T> state_type;
counted_ptr<state_type> _state;

+ 0
- 8
librf/src/when.cpp View File

@@ -20,14 +20,6 @@ namespace resumef
}
}
void when_impl::reset(intptr_t initial_counter_)
{
scoped_lock<lock_type> lock_(this->_lock);
this->_awakes = nullptr;
this->_counter = initial_counter_;
}
bool when_impl::wait_(const when_awaker_ptr & awaker)
{
assert(awaker);

+ 175
- 28
librf/src/when.h View File

@@ -23,7 +23,6 @@ namespace resumef
RF_API when_impl(intptr_t initial_counter_);
RF_API void signal();
RF_API void reset(intptr_t initial_counter_);
//如果已经触发了awaker,则返回true
RF_API bool wait_(const when_awaker_ptr & awaker);
@@ -41,19 +40,76 @@ namespace resumef
};
typedef std::shared_ptr<when_impl> when_impl_ptr;
using ignore_type = decltype(std::ignore);
template<class _Ty>
struct remove_future
{
using type = _Ty;
using value_type = _Ty;
};
template<>
struct remove_future<void>
{
using type = void;
using value_type = ignore_type;
};
template<class _Ty>
struct remove_future<future_t<_Ty> > : public remove_future<_Ty>
{
};
template<class _Ty>
struct remove_future<future_t<_Ty>&> : public remove_future<_Ty>
{
};
template<class _Ty>
struct remove_future<future_t<_Ty>&&> : public remove_future<_Ty>
{
};
template<class _Ty>
using remove_future_t = typename remove_future<_Ty>::type;
template<class _Ty>
using remove_future_vt = typename remove_future<_Ty>::value_type;
template<class _Fty, class _Ty>
struct when_one_functor_impl
{
using value_type = _Ty;
using future_type = future_t<value_type>;
when_impl_ptr _e;
mutable future_type _f;
mutable std::reference_wrapper<value_type> _val;
when_one_functor_impl(const detail::when_impl_ptr & e, future_type f, value_type & v)
: _e(e)
, _f(std::move(f))
, _val(v)
{}
when_one_functor_impl(when_one_functor_impl &&) = default;
inline future_vt operator ()() const
{
_val.get() = co_await _f;
_e->signal();
}
};
template<class _Fty>
struct when_one_functor
struct when_one_functor_impl<_Fty, void>
{
typedef future_t<std::remove_reference_t<_Fty> > future_type;
using value_type = ignore_type;
using future_type = future_t<void>;
when_impl_ptr _e;
mutable future_type _f;
when_one_functor(const detail::when_impl_ptr & e, future_type f)
when_one_functor_impl(const detail::when_impl_ptr & e, future_type f, value_type & v)
: _e(e)
, _f(std::move(f))
{}
when_one_functor(when_one_functor &&) = default;
when_one_functor_impl(when_one_functor_impl &&) = default;
inline future_vt operator ()() const
{
@@ -61,40 +117,121 @@ namespace resumef
_e->signal();
}
};
template<class _Fty>
struct when_one_functor<future_t<_Fty> > : public when_one_functor<_Fty>
struct when_one_functor : public when_one_functor_impl<_Fty, remove_future_t<_Fty> >
{
using future_type = typename when_one_functor<_Fty>::future_type;
using base_type = when_one_functor_impl<_Fty, remove_future_t<_Fty> >;
using value_type = typename base_type::value_type;
using future_type = typename base_type::future_type;
when_one_functor(const detail::when_impl_ptr & e, future_type f)
: when_one_functor<_Fty>(e, std::move(f))
when_one_functor(const detail::when_impl_ptr & e, future_type f, value_type & v)
: when_one_functor_impl<_Fty, remove_future_t<_Fty> >(e, std::move(f), v)
{}
when_one_functor(when_one_functor &&) = default;
};
inline void when_one__(scheduler & s, const detail::when_impl_ptr & e)
template<class... _Ty>
size_t sizeof_tuple(const std::tuple<_Ty...> & val)
{
return sizeof...(_Ty);
}
template<class _Cont>
size_t sizeof_tuple(const _Cont & val)
{
return val.typename size();
}
template<class _Tup, size_t _N>
inline void when_all_one__(scheduler & s, const detail::when_impl_ptr & e, _Tup & t)
{
}
template<class _Tup, size_t _N, class _Fty, class... _Rest>
inline void when_all_one__(scheduler & s, const detail::when_impl_ptr & e, _Tup & t, _Fty f, _Rest&&... rest)
{
s + when_one_functor<_Fty>{e, std::move(f), std::get<_N>(t)};
when_all_one__<_Tup, _N + 1, _Rest...>(s, e, t, std::forward<_Rest>(rest)...);
}
template<class _Val, class _Iter, typename _Fty = decltype(*std::declval<_Iter>())>
inline void when_all_range__(scheduler & s, const detail::when_impl_ptr & e, std::vector<_Val> & t, _Iter begin, _Iter end)
{
using future_type = std::remove_reference_t<_Fty>;
const auto _First = begin;
for(; begin != end; ++begin)
s + when_one_functor<future_type>{e, *begin, t[begin - _First]};
}
inline void when_any_one__(scheduler & s, const detail::when_impl_ptr & e)
{
}
template<class _Fty, class... _Rest>
inline void when_one__(scheduler & s, const detail::when_impl_ptr & e, future_t<_Fty> f, _Rest&&... rest)
inline void when_any_one__(scheduler & s, const detail::when_impl_ptr & e, future_t<_Fty> f, _Rest&&... rest)
{
s + when_one_functor<_Fty>{e, std::move(f)};
when_one__(s, e, std::forward<_Rest>(rest)...);
when_any_one__(s, e, std::forward<_Rest>(rest)...);
}
template<class _Iter, typename _Fty = decltype(*std::declval<_Iter>())>
inline void when_one__(scheduler & s, const detail::when_impl_ptr & e, _Iter begin, _Iter end)
inline void when_any_one__(scheduler & s, const detail::when_impl_ptr & e, _Iter begin, _Iter end)
{
using future_type = std::remove_reference_t<_Fty>;
for(; begin != end; ++begin)
for (; begin != end; ++begin)
s + when_one_functor<future_type>{e, *begin};
}
}
template<class _Tup, class... _Fty>
future_t<_Tup> when_all_count(const std::shared_ptr<_Tup> & vals, scheduler & s, _Fty&&... f)
{
promise_t<_Tup> awaitable;
detail::when_impl_ptr _event = std::make_shared<detail::when_impl>(detail::sizeof_tuple(*vals));
auto awaker = std::make_shared<detail::when_awaker>(
[st = awaitable._state, vals](detail::when_impl * e) -> bool
{
if (e)
st->set_value(*vals);
else
st->throw_exception(channel_exception{ error_code::not_ready });
return true;
});
_event->wait_(awaker);
detail::when_all_one__<_Tup, 0u, _Fty...>(s, _event, *vals, std::forward<_Fty>(f)...);
return awaitable.get_future();
}
template<class _Tup, class _Iter>
future_t<_Tup> when_all_range(const std::shared_ptr<_Tup> & vals, scheduler & s, _Iter begin, _Iter end)
{
promise_t<_Tup> awaitable;
detail::when_impl_ptr _event = std::make_shared<detail::when_impl>(detail::sizeof_tuple(*vals));
auto awaker = std::make_shared<detail::when_awaker>(
[st = awaitable._state, vals](detail::when_impl * e) -> bool
{
if (e)
st->set_value(*vals);
else
st->throw_exception(channel_exception{ error_code::not_ready });
return true;
});
_event->wait_(awaker);
detail::when_all_range__(s, _event, *vals, begin, end);
return awaitable.get_future();
}
template<class... _Fty>
future_t<bool> when_count(size_t counter, scheduler & s, _Fty&&... f)
future_t<bool> when_any_count(size_t counter, scheduler & s, _Fty&&... f)
{
promise_t<bool> awaitable;
@@ -107,26 +244,36 @@ namespace resumef
});
_event->wait_(awaker);
detail::when_one__(s, _event, std::forward<_Fty>(f)...);
detail::when_any_one__(s, _event, std::forward<_Fty>(f)...);
return awaitable.get_future();
}
template<class... _Fty>
future_t<bool> when_all(scheduler & s, _Fty&&... f)
auto when_all(scheduler & s, _Fty&&... f) -> future_t<std::tuple<detail::remove_future_vt<_Fty>...> >
{
return when_count(sizeof...(_Fty), s, std::forward<_Fty>(f)...);
using tuple_type = std::tuple<detail::remove_future_vt<_Fty>...>;
auto vals = std::make_shared<tuple_type>();
return when_all_count(vals, s, std::forward<_Fty>(f)...);
}
template<class... _Fty>
future_t<bool> when_all(_Fty&&... f)
auto when_all(_Fty&&... f) -> future_t<std::tuple<detail::remove_future_vt<_Fty>...> >
{
return when_count(sizeof...(_Fty), *this_scheduler(), std::forward<_Fty>(f)...);
using tuple_type = std::tuple<detail::remove_future_vt<_Fty>...>;
auto vals = std::make_shared<tuple_type>();
return when_all_count(vals, *this_scheduler(), std::forward<_Fty>(f)...);
}
template<class _Iter, typename = decltype(*std::declval<_Iter>())>
future_t<bool> when_all(_Iter begin, _Iter end)
template<class _Iter, typename _Fty = decltype(*std::declval<_Iter>())>
auto when_all(_Iter begin, _Iter end) -> future_t<std::vector<detail::remove_future_vt<decltype(*std::declval<_Iter>())> > >
{
return when_count(std::distance(begin, end), *this_scheduler(), begin, end);
using value_type = detail::remove_future_vt<decltype(*std::declval<_Iter>())>;
using vector_type = std::vector<value_type>;
auto vals = std::make_shared<vector_type>(std::distance(begin, end));
return when_all_range(vals, *this_scheduler(), begin, end);
}
@@ -134,22 +281,22 @@ namespace resumef
template<class... _Fty>
future_t<bool> when_any(scheduler & s, _Fty&&... f)
future_t<bool> when_any(scheduler & s, future_t<_Fty>&&... f)
{
static_assert(sizeof...(_Fty) > 0);
return when_count(sizeof...(_Fty) ? 1 : 0, s, std::forward<_Fty>(f)...);
return when_any_count(sizeof...(_Fty) ? 1 : 0, s, std::forward<future_t<_Fty>>(f)...);
}
template<class... _Fty>
future_t<bool> when_any(_Fty&&... f)
future_t<bool> when_any(future_t<_Fty>&&... f)
{
static_assert(sizeof...(_Fty) > 0);
return when_count(sizeof...(_Fty) ? 1 : 0, *this_scheduler(), std::forward<_Fty>(f)...);
return when_any_count(sizeof...(_Fty) ? 1 : 0, *this_scheduler(), std::forward<future_t<_Fty>>(f)...);
}
template<class _Iter, typename = decltype(*std::declval<_Iter>())>
future_t<bool> when_any(_Iter begin, _Iter end)
{
assert(std::distance(begin, end) > 0); //???
return when_count(std::distance(begin, end) ? 1 : 0, *this_scheduler(), begin, end);
return when_any_count(std::distance(begin, end) ? 1 : 0, *this_scheduler(), begin, end);
}
}

+ 26
- 12
tutorial/test_async_when_all.cpp View File

@@ -9,6 +9,7 @@
#include "librf.h"
using namespace resumef;
/*
void test_when_any()
{
@@ -39,35 +40,48 @@ void test_when_any()
};
this_scheduler()->run_until_notask();
}
*/
void test_when_all()
{
using namespace std::chrono;
auto my_sleep = [](const char * name) -> future_vt
auto my_sleep = [](const char * name) -> future_t<int>
{
auto dt = rand() % 1000;
co_await sleep_for(1ms * dt);
std::cout << dt << "@" << name << std::endl;
return dt;
};
auto my_sleep_v = [](const char * name) -> future_vt
{
auto dt = rand() % 1000;
co_await sleep_for(1ms * dt);
std::cout << dt << "@" << name << std::endl;
};
GO
{
co_await when_all();
when_all();
std::cout << "zero!" << std::endl << std::endl;
co_await when_all(my_sleep("a"), my_sleep("b"));
std::cout << std::endl;
auto [a, b] = co_await when_all(my_sleep("a"), my_sleep_v("b"));
b;
std::cout << a << std::endl << std::endl;
co_await my_sleep("c");
std::cout << std::endl;
auto c = co_await my_sleep("c");
std::cout << c << std::endl << std::endl;
co_await when_all(my_sleep("d"), my_sleep("e"), my_sleep("f"));
std::cout << std::endl;
auto [d, e, f] = co_await when_all(my_sleep("d"), my_sleep_v("e"), my_sleep("f"));
e;
std::cout << d << "," << f << std::endl << std::endl;
std::vector<future_vt> v{ my_sleep("g"), my_sleep("h"), my_sleep("i") };
co_await when_all(std::begin(v), std::end(v));
std::cout << std::endl;
std::vector<future_t<int> > v{ my_sleep("g"), my_sleep("h"), my_sleep("i") };
auto vals = co_await when_all(std::begin(v), std::end(v));
std::cout << vals[0] << "," << vals[1] << "," << vals[2] << "," << std::endl << std::endl;
std::cout << "all done!" << std::endl;
};
@@ -78,7 +92,7 @@ void resumable_main_when_all()
{
srand((uint32_t)time(nullptr));
test_when_any();
//test_when_any();
std::cout << std::endl;
test_when_all();
}

Loading…
Cancel
Save