Browse Source

when_any的返回值修改为索引+std::any

tags/v2.9.7
tearshark 6 years ago
parent
commit
2165e473d1
2 changed files with 175 additions and 41 deletions
  1. 152
    36
      librf/src/when.h
  2. 23
    5
      tutorial/test_async_when_all.cpp

+ 152
- 36
librf/src/when.h View File

@@ -50,14 +50,12 @@ namespace resumef
{
using type = std::remove_reference_t<_Ty>;
using value_type = type;
using optional_type = std::optional<type>;
};
template<>
struct remove_future<void>
{
using type = void;
using value_type = ignore_type;
using optional_type = std::optional<ignore_type>;
};
template<class _Ty>
struct remove_future<future_t<_Ty> > : public remove_future<_Ty>
@@ -72,14 +70,10 @@ namespace resumef
{
};
template<class _Ty>
using remove_future_t = typename remove_future<_Ty>::type;
template<class _Ty>
using remove_future_vt = typename remove_future<_Ty>::value_type;
template<class _Ty>
using remove_future_ot = typename remove_future<_Ty>::optional_type;
template<class _Fty, class _Ty>
struct when_one_functor
struct when_all_functor
{
using value_type = _Ty;
using future_type = _Fty;
@@ -88,12 +82,14 @@ namespace resumef
mutable future_type _f;
mutable std::reference_wrapper<value_type> _val;
when_one_functor(const detail::when_impl_ptr & e, future_type f, value_type & v)
when_all_functor(const detail::when_impl_ptr & e, future_type f, value_type & v)
: _e(e)
, _f(std::move(f))
, _val(v)
{}
when_one_functor(when_one_functor &&) = default;
when_all_functor(when_all_functor &&) = default;
when_all_functor & operator = (const when_all_functor &) = default;
when_all_functor & operator = (when_all_functor &&) = default;
inline future_vt operator ()() const
{
@@ -111,33 +107,33 @@ namespace resumef
}
};
template<class _Tup, size_t _N>
inline void when_one__(scheduler & s, const detail::when_impl_ptr & e, _Tup & t)
template<class _Tup, size_t _Idx>
inline void when_all_one__(scheduler & s, const detail::when_impl_ptr & e, _Tup & t)
{
}
template<class _Tup, size_t _N, class _Fty, class... _Rest>
inline void when_one__(scheduler & s, const detail::when_impl_ptr & e, _Tup & t, _Fty f, _Rest&&... rest)
template<class _Tup, size_t _Idx, class _Fty, class... _Rest>
inline void when_all_one__(scheduler & s, const detail::when_impl_ptr & e, _Tup & t, _Fty f, _Rest&&... rest)
{
s + when_one_functor<_Fty, std::tuple_element_t<_N, _Tup> >{e, std::move(f), std::get<_N>(t)};
s + when_all_functor<_Fty, std::tuple_element_t<_Idx, _Tup> >{e, std::move(f), std::get<_Idx>(t)};
when_one__<_Tup, _N + 1, _Rest...>(s, e, t, std::forward<_Rest>(rest)...);
when_all_one__<_Tup, _Idx + 1, _Rest...>(s, e, t, std::forward<_Rest>(rest)...);
}
template<class _Val, class _Iter, typename _Fty = decltype(*std::declval<_Iter>())>
inline void when_range__(scheduler & s, const detail::when_impl_ptr & e, std::vector<_Val> & t, _Iter begin, _Iter end)
inline void when_all_range__(scheduler & s, const detail::when_impl_ptr & e, std::vector<_Val> & t, _Iter begin, _Iter end)
{
using future_type = std::remove_reference_t<_Fty>;
const auto _First = begin;
for(; begin != end; ++begin)
s + when_one_functor<future_type, _Val>{e, *begin, t[begin - _First]};
s + when_all_functor<future_type, _Val>{e, *begin, t[begin - _First]};
}
template<class _Tup, class... _Fty>
future_t<_Tup> when_count(size_t count, const std::shared_ptr<_Tup> & vals, scheduler & s, _Fty&&... f)
future_t<_Tup> when_all_count(size_t count, const std::shared_ptr<_Tup> & vals, scheduler & s, _Fty&&... f)
{
promise_t<_Tup> awaitable;
@@ -153,13 +149,13 @@ namespace resumef
});
_event->wait_(awaker);
when_one__<_Tup, 0u, _Fty...>(s, _event, *vals, std::forward<_Fty>(f)...);
when_all_one__<_Tup, 0u, _Fty...>(s, _event, *vals, std::forward<_Fty>(f)...);
return awaitable.get_future();
}
template<class _Tup, class _Iter>
future_t<_Tup> when_range(size_t count, const std::shared_ptr<_Tup> & vals, scheduler & s, _Iter begin, _Iter end)
future_t<_Tup> when_all_range(size_t count, const std::shared_ptr<_Tup> & vals, scheduler & s, _Iter begin, _Iter end)
{
promise_t<_Tup> awaitable;
@@ -175,7 +171,132 @@ namespace resumef
});
_event->wait_(awaker);
when_range__(s, _event, *vals, begin, end);
when_all_range__(s, _event, *vals, begin, end);
return awaitable.get_future();
}
using when_any_pair = std::pair<intptr_t, std::any>;
using when_any_result_ptr = std::shared_ptr<when_any_pair>;
template<class _Fty>
struct when_any_functor
{
using value_type = when_any_pair;
using future_type = _Fty;
when_impl_ptr _e;
mutable future_type _f;
mutable when_any_result_ptr _val;
intptr_t _Idx;
when_any_functor(const detail::when_impl_ptr & e, future_type f, const when_any_result_ptr & v, intptr_t idx)
: _e(e)
, _f(std::move(f))
, _val(v)
, _Idx(idx)
{
assert(idx >= 0);
}
when_any_functor(when_any_functor &&) = default;
when_any_functor & operator = (const when_any_functor &) = default;
when_any_functor & operator = (when_any_functor &&) = default;
inline future_vt operator ()() const
{
if (_val->first < 0)
{
if constexpr(std::is_same_v<future_type, future_vt>)
{
co_await _f;
if (_val->first < 0)
{
_val->first = _Idx;
_e->signal();
}
}
else
{
auto tval = co_await _f;
if (_val->first < 0)
{
_val->first = _Idx;
_val->second = std::move(tval);
_e->signal();
}
}
}
else
{
co_await _f;
}
}
};
template<intptr_t _Idx>
inline void when_any_one__(scheduler & s, const detail::when_impl_ptr & e, const when_any_result_ptr & t)
{
}
template<intptr_t _Idx, class _Fty, class... _Rest>
inline void when_any_one__(scheduler & s, const detail::when_impl_ptr & e, const when_any_result_ptr & t, _Fty f, _Rest&&... rest)
{
s + when_any_functor<_Fty>{e, std::move(f), t, _Idx};
when_any_one__<_Idx + 1, _Rest...>(s, e, t, std::forward<_Rest>(rest)...);
}
template<class... _Fty>
future_t<when_any_pair> when_any_count(size_t count, const when_any_result_ptr & val_ptr, scheduler & s, _Fty&&... f)
{
promise_t<when_any_pair> awaitable;
when_impl_ptr _event = std::make_shared<when_impl>(count);
auto awaker = std::make_shared<when_awaker>(
[st = awaitable._state, val_ptr](when_impl * e) -> bool
{
if (e)
st->set_value(*val_ptr);
else
st->throw_exception(channel_exception{ error_code::not_ready });
return true;
});
_event->wait_(awaker);
when_any_one__<0u, _Fty...>(s, _event, val_ptr, std::forward<_Fty>(f)...);
return awaitable.get_future();
}
template<class _Iter, typename _Fty = decltype(*std::declval<_Iter>())>
inline void when_any_range__(scheduler & s, const detail::when_impl_ptr & e, const when_any_result_ptr & t, _Iter begin, _Iter end)
{
using future_type = std::remove_reference_t<_Fty>;
const auto _First = begin;
for (; begin != end; ++begin)
s + when_any_functor<future_type>{e, *begin, t, begin - _First};
}
template<class _Iter>
future_t<when_any_pair> when_any_range(size_t count, const when_any_result_ptr & val_ptr, scheduler & s, _Iter begin, _Iter end)
{
promise_t<when_any_pair> awaitable;
when_impl_ptr _event = std::make_shared<when_impl>(count);
auto awaker = std::make_shared<when_awaker>(
[st = awaitable._state, val_ptr](when_impl * e) -> bool
{
if (e)
st->set_value(*val_ptr);
else
st->throw_exception(channel_exception{ error_code::not_ready });
return true;
});
_event->wait_(awaker);
when_any_range__(s, _event, val_ptr, begin, end);
return awaitable.get_future();
}
@@ -187,7 +308,7 @@ namespace resumef
using tuple_type = std::tuple<detail::remove_future_vt<_Fty>...>;
auto vals = std::make_shared<tuple_type>();
return detail::when_count(sizeof...(_Fty), vals, s, std::forward<_Fty>(f)...);
return detail::when_all_count(sizeof...(_Fty), vals, s, std::forward<_Fty>(f)...);
}
template<class... _Fty>
auto when_all(_Fty&&... f) -> future_t<std::tuple<detail::remove_future_vt<_Fty>...> >
@@ -201,7 +322,7 @@ namespace resumef
using vector_type = std::vector<value_type>;
auto vals = std::make_shared<vector_type>(std::distance(begin, end));
return detail::when_range(vals->size(), vals, s, begin, end);
return detail::when_all_range(vals->size(), vals, s, begin, end);
}
template<class _Iter, typename _Fty = decltype(*std::declval<_Iter>())>
auto when_all(_Iter begin, _Iter end) -> future_t<std::vector<detail::remove_future_vt<_Fty> > >
@@ -213,31 +334,26 @@ namespace resumef
template<class... _Fty>
auto when_any(scheduler & s, _Fty&&... f) -> future_t<std::tuple<detail::remove_future_ot<_Fty>...> >
auto when_any(scheduler & s, _Fty&&... f) -> future_t<detail::when_any_pair>
{
static_assert(sizeof...(_Fty) > 0);
auto vals = std::make_shared<detail::when_any_pair>(-1, std::any{});
using tuple_type = std::tuple<detail::remove_future_ot<_Fty>...>;
auto vals = std::make_shared<tuple_type>();
return detail::when_count(sizeof...(_Fty) ? 1 : 0, vals, s, std::forward<_Fty>(f)...);
return detail::when_any_count(sizeof...(_Fty) ? 1 : 0, vals, s, std::forward<_Fty>(f)...);
}
template<class... _Fty>
auto when_any(_Fty&&... f) -> future_t<std::tuple<detail::remove_future_ot<_Fty>...> >
auto when_any(_Fty&&... f) -> future_t<detail::when_any_pair>
{
return when_any(*this_scheduler(), std::forward<_Fty>(f)...);
}
template<class _Iter, typename _Fty = decltype(*std::declval<_Iter>())>
auto when_any(scheduler & s, _Iter begin, _Iter end) -> future_t<std::vector<detail::remove_future_ot<_Fty> > >
auto when_any(scheduler & s, _Iter begin, _Iter end) -> future_t<detail::when_any_pair>
{
using value_type = detail::remove_future_ot<_Fty>;
using vector_type = std::vector<value_type>;
auto vals = std::make_shared<vector_type>(std::distance(begin, end));
auto vals = std::make_shared<detail::when_any_pair>(-1, std::any{});
return detail::when_range(vals->size() ? 1 : 0, vals, s, begin, end);
return detail::when_any_range((begin != end) ? 1 : 0, vals, s, begin, end);
}
template<class _Iter, typename _Fty = decltype(*std::declval<_Iter>())>
auto when_any(_Iter begin, _Iter end) -> future_t<std::vector<detail::remove_future_ot<_Fty> > >
auto when_any(_Iter begin, _Iter end) -> future_t<detail::when_any_pair>
{
return when_any(*this_scheduler(), begin, end);
}

+ 23
- 5
tutorial/test_async_when_all.cpp View File

@@ -16,7 +16,9 @@ void test_when_any()
GO
{
auto vals = co_await when_any(
auto vals = co_await when_any();
vals = co_await when_any(
[]() ->future_t<int>
{
auto dt = rand() % 1000;
@@ -38,10 +40,26 @@ void test_when_any()
std::cout << dt << "@c" << std::endl;
}());
if (std::get<0>(vals).has_value())
std::cout << "first done!" << std::endl;
if (vals.first == 0)
std::cout << "first done! value is " << std::any_cast<int>(vals.second) << std::endl;
else
std::cout << "any done!" << std::endl;
std::cout << "any done! index is " << vals.first << std::endl;
co_await sleep_for(1010ms);
std::cout << std::endl;
auto my_sleep = [](const char * name) -> future_t<int>
{
auto dt = rand() % 1000;
co_await sleep_for(1ms * dt);
std::cout << dt << "@" << name << std::endl;
return dt;
};
std::vector<future_t<int> > v{ my_sleep("g"), my_sleep("h"), my_sleep("i") };
vals = co_await when_any(std::begin(v), std::end(v));
std::cout << "any range done! index is " << vals.first << ", valus is " << std::any_cast<int>(vals.second) << std::endl;
};
this_scheduler()->run_until_notask();
}
@@ -87,7 +105,7 @@ void test_when_all()
auto vals = co_await when_all(std::begin(v), std::end(v));
std::cout << vals[0] << "," << vals[1] << "," << vals[2] << "," << std::endl << std::endl;
std::cout << "all done!" << std::endl;
std::cout << "all range done!" << std::endl;
};
this_scheduler()->run_until_notask();
}

Loading…
Cancel
Save