Browse Source

支持N:N模式的协程

tags/v2.9.7
tearshark 6 years ago
parent
commit
db03e525be

+ 1
- 1
benchmark/benchmark_async_mem.cpp View File

@@ -22,5 +22,5 @@ void resumable_main_benchmark_mem()
};
}
resumef::g_scheduler.run_until_notask();
resumef::this_scheduler()->run_until_notask();
}

+ 91
- 0
librf/src/counted_ptr.h View File

@@ -0,0 +1,91 @@

#pragma once
namespace resumef
{
template <typename T>
struct counted_ptr
{
counted_ptr() = default;
counted_ptr(const counted_ptr& cp) : _p(cp._p)
{
_lock();
}
counted_ptr(T* p) : _p(p)
{
_lock();
}
counted_ptr(counted_ptr&& cp)
{
std::swap(_p, cp._p);
}
counted_ptr& operator=(const counted_ptr& cp)
{
if (&cp != this)
{
_unlock();
_lock(cp._p);
}
return *this;
}
counted_ptr& operator=(counted_ptr&& cp)
{
if (&cp != this)
std::swap(_p, cp._p);
return *this;
}
~counted_ptr()
{
_unlock();
}
T* operator->() const
{
return _p;
}
T* get() const
{
return _p;
}
void reset()
{
_unlock();
}
private:
void _unlock()
{
if (_p != nullptr)
{
auto t = _p;
_p = nullptr;
t->unlock();
}
}
void _lock(T* p)
{
if (p != nullptr)
p->lock();
_p = p;
}
void _lock()
{
if (_p != nullptr)
_p->lock();
}
T* _p = nullptr;
};
template <typename T>
counted_ptr<T> make_counted()
{
return new T{};
}
}

+ 1
- 0
librf/src/def.h View File

@@ -30,6 +30,7 @@
#endif
#if RESUMEF_DEBUG_COUNTER
extern std::mutex g_resumef_cout_mutex;
extern std::atomic<intptr_t> g_resumef_state_count;
extern std::atomic<intptr_t> g_resumef_task_count;
extern std::atomic<intptr_t> g_resumef_evtctx_count;

+ 4
- 4
librf/src/event.cpp View File

@@ -94,7 +94,7 @@ namespace resumef
});
_event->wait_(awaker);
g_scheduler.timer()->add(tp,
this_scheduler()->timer()->add(tp,
[awaker](bool bValue)
{
awaker->awake(nullptr, 1);
@@ -209,7 +209,7 @@ namespace resumef
e->wait_(awaker);
}
g_scheduler.timer()->add(tp,
this_scheduler()->timer()->add(tp,
[awaker](bool bValue)
{
awaker->awake(nullptr, 1);
@@ -322,7 +322,7 @@ namespace resumef
awaitable_t<bool> awaitable;
if (evts.size() <= 0)
{
g_scheduler.timer()->add_handler(tp,
this_scheduler()->timer()->add_handler(tp,
[st = awaitable._state](bool bValue)
{
st->set_value(false);
@@ -334,7 +334,7 @@ namespace resumef
ctx->st = awaitable._state;
ctx->evts_waited.reserve(evts.size());
ctx->evts = std::move(evts);
ctx->th = std::move(g_scheduler.timer()->add_handler(tp,
ctx->th = std::move(this_scheduler()->timer()->add_handler(tp,
[ctx](bool bValue)
{
ctx->awake(nullptr);

+ 2
- 38
librf/src/future.h View File

@@ -14,7 +14,6 @@ namespace resumef
future_impl_t(const counted_ptr<state_type>& state) : _state(state)
{
_state->_future_acquired = true;
}
// movable, but not copyable
@@ -144,15 +143,6 @@ namespace resumef
using future_vt = future_t<void>;
template<class state_type>
struct awaitor_initial_suspend
{
counted_ptr<state_type> _state;
bool await_ready() noexcept;
void await_suspend(coroutine_handle<> resume_cb) noexcept;
void await_resume() noexcept;
};
template <typename T>
struct promise_impl_t
{
@@ -188,8 +178,6 @@ namespace resumef
//callback里,不应该调用 get_return_object()
future_type get_future()
{
if (_state->_future_acquired)
throw future_exception{ future_error::already_acquired };
return future_type(_state);
}
@@ -197,9 +185,6 @@ namespace resumef
// cause multiple callbacks.
future_type next_future()
{
// reset and return another future
if (_state->_future_acquired)
_state->reset();
return future_type(_state);
}
@@ -218,7 +203,6 @@ namespace resumef
auto initial_suspend() noexcept
{
return std::experimental::suspend_never{};
//return awaitor_initial_suspend<state_type>{ _state };
}
//这在一个协程被销毁之时调用。
@@ -348,6 +332,7 @@ namespace resumef
return nullptr;
}
/*
inline scheduler * state_base::parent_scheduler() const
{
auto promise_ = parent_promise();
@@ -355,28 +340,7 @@ namespace resumef
return promise_->_state->current_scheduler();
return nullptr;
}
*/
template<class state_type>
bool awaitor_initial_suspend<state_type>::await_ready() noexcept
{
return false;
}
template<class state_type>
void awaitor_initial_suspend<state_type>::await_suspend(coroutine_handle<> resume_cb) noexcept
{
_state->await_suspend(resume_cb);
scheduler * sch_ = _state->parent_scheduler();
if (sch_ != nullptr)
{
_state->current_scheduler(sch_);
_state->resume();
}
}
template<class state_type>
void awaitor_initial_suspend<state_type>::await_resume() noexcept
{
}
}

+ 1
- 1
librf/src/mutex.cpp View File

@@ -115,7 +115,7 @@ namespace resumef
});
_locker->lock_(awaker);
g_scheduler.timer()->add(tp,
this_scheduler()->timer()->add(tp,
[awaker](bool bValue)
{
awaker->awake(nullptr, 1);

+ 0
- 59
librf/src/rf_task.cpp View File

@@ -19,63 +19,4 @@ namespace resumef
#endif
}
template<class state_tt>
struct awaitable_task_t : public task_base
{
counted_ptr<state_tt> _state;
awaitable_task_t() {}
awaitable_task_t(state_tt * state)
: _state(state)
{
}
virtual bool is_suspend() override
{
return !_state->ready();
}
virtual bool go_next(scheduler * schdler) override
{
_state->current_scheduler(schdler);
_state->resume();
return false;
}
virtual void cancel() override
{
_state->cancel();
}
virtual void * get_id() override
{
return _state.get();
}
};
state_base::~state_base()
{
#if RESUMEF_DEBUG_COUNTER
--g_resumef_state_count;
#endif
}
void state_base::set_value_none_lock()
{
// Set all members first as calling coroutine may reset stuff here.
_ready = true;

auto sch_ = this->parent_scheduler();
sch_->push_task_internal(new awaitable_task_t<state_base>(this));
}
void state_base::set_exception(std::exception_ptr && e_)
{
scoped_lock<std::mutex> __guard(_mtx);
_exception = std::move(e_);
// Set all members first as calling coroutine may reset stuff here.
_ready = true;

auto sch_ = this->parent_scheduler();
sch_->push_task_internal(new awaitable_task_t<state_base>(this));
}
}

+ 10
- 1
librf/src/rf_task.h View File

@@ -19,6 +19,7 @@ namespace resumef
virtual bool go_next(scheduler *) = 0;
virtual void cancel() = 0;
virtual void * get_id() = 0;
virtual void bind(scheduler *) = 0;
};
//----------------------------------------------------------------------------------------------
@@ -75,6 +76,10 @@ namespace resumef
{
return nullptr;
}
virtual void bind(scheduler *) override
{
}
};
template<class _Ty>
@@ -112,7 +117,6 @@ namespace resumef
virtual bool go_next(scheduler * schdler) override
{
auto * _state = _future._state.get();
_state->current_scheduler(schdler);
_state->resume();
return !_state->ready() && !_state->_done;
}
@@ -124,6 +128,11 @@ namespace resumef
{
return _future._state.get();
}
virtual void bind(scheduler * schdler) override
{
auto * _state = _future._state.get();
_state->current_scheduler(schdler);
}
};
//----------------------------------------------------------------------------------------------

+ 26
- 25
librf/src/scheduler.cpp View File

@@ -2,6 +2,7 @@
#include <assert.h>
#if RESUMEF_DEBUG_COUNTER
std::mutex g_resumef_cout_mutex;
std::atomic<intptr_t> g_resumef_state_count = 0;
std::atomic<intptr_t> g_resumef_task_count = 0;
std::atomic<intptr_t> g_resumef_evtctx_count = 0;
@@ -31,10 +32,12 @@ namespace resumef
return future_error_string[(size_t)(fe)];
}
thread_local scheduler * th_scheduler_ptr = nullptr;
//获得当前线程下的调度器
scheduler * this_scheduler()
{
return &g_scheduler;
return th_scheduler_ptr ? th_scheduler_ptr : &scheduler::g_scheduler;
}
//获得当前线程下,正在由调度器调度的协程
@@ -50,6 +53,21 @@ namespace resumef
}
}
*/
local_scheduler::local_scheduler()
{
if (th_scheduler_ptr == nullptr)
{
_scheduler_ptr = new scheduler;
th_scheduler_ptr = _scheduler_ptr;
}
}
local_scheduler::~local_scheduler()
{
if (th_scheduler_ptr == _scheduler_ptr)
th_scheduler_ptr = nullptr;
delete _scheduler_ptr;
}
scheduler::scheduler()
: _task()
@@ -61,17 +79,9 @@ namespace resumef
scheduler::~scheduler()
{
cancel_all_task_();
}
scheduler::scheduler(scheduler && right_)
{
this->swap(right_);
}
scheduler & scheduler::operator = (scheduler && right_)
{
this->swap(right_);
return *this;
if (th_scheduler_ptr == this)
th_scheduler_ptr = nullptr;
}
void scheduler::new_task(task_base * task)
@@ -79,6 +89,7 @@ namespace resumef
if (task)
{
scoped_lock<std::recursive_mutex> __guard(_mtx_ready);
task->bind(this);
this->_ready_task.push_back(task);
}
}
@@ -115,6 +126,9 @@ namespace resumef
void scheduler::run_one_batch()
{
if (th_scheduler_ptr == nullptr)
th_scheduler_ptr = this;
{
scoped_lock<std::recursive_mutex> __guard(_mtx_task);
@@ -159,18 +173,5 @@ namespace resumef
this->run_one_batch();
}
void scheduler::swap(scheduler & right_)
{
if (this != &right_)
{
scoped_lock<std::recursive_mutex, std::recursive_mutex, std::recursive_mutex, std::recursive_mutex>
__guard(this->_mtx_ready, this->_mtx_task, right_._mtx_ready, right_._mtx_task);
std::swap(this->_ready_task, right_._ready_task);
std::swap(this->_task, right_._task);
std::swap(this->_timer, right_._timer);
}
}
scheduler g_scheduler;
scheduler scheduler::g_scheduler;
}

+ 20
- 14
librf/src/scheduler.h View File

@@ -11,6 +11,8 @@
namespace resumef
{
struct local_scheduler;
struct scheduler : public std::enable_shared_from_this<scheduler>
{
private:
@@ -53,7 +55,6 @@ namespace resumef
}
RF_API void break_all();
RF_API void swap(scheduler & right_);
inline timer_manager * timer() const
{
@@ -61,21 +62,35 @@ namespace resumef
}
friend struct task_base;
friend struct local_scheduler;
protected:
RF_API scheduler();
public:
RF_API ~scheduler();
RF_API scheduler(scheduler && right_);
RF_API scheduler & operator = (scheduler && right_);
scheduler(scheduler && right_) = delete;
scheduler & operator = (scheduler && right_) = delete;
scheduler(const scheduler &) = delete;
scheduler & operator = (const scheduler &) = delete;
static scheduler g_scheduler;
};
struct local_scheduler
{
RF_API local_scheduler();
RF_API ~local_scheduler();
local_scheduler(local_scheduler && right_) = delete;
local_scheduler & operator = (local_scheduler && right_) = delete;
local_scheduler(const local_scheduler &) = delete;
local_scheduler & operator = (const local_scheduler &) = delete;
private:
scheduler * _scheduler_ptr;
};
//--------------------------------------------------------------------------------------------------
extern scheduler g_scheduler;
#if !defined(_DISABLE_RESUMEF_GO_MACRO)
#define go (*::resumef::this_scheduler()) +
#define GO (*::resumef::this_scheduler()) + [=]()->resumef::future_vt
@@ -83,12 +98,3 @@ namespace resumef
//--------------------------------------------------------------------------------------------------
}
namespace std
{
inline void swap(resumef::scheduler & _Left, resumef::scheduler & right_)
{
_Left.swap(right_);
}
}

+ 2
- 2
librf/src/sleep.h View File

@@ -19,7 +19,7 @@ namespace resumef
template<class _Rep, class _Period>
awaitable_t<bool> sleep_for(const std::chrono::duration<_Rep, _Period>& dt_)
{
return std::move(sleep_for_(std::chrono::duration_cast<std::chrono::system_clock::duration>(dt_), g_scheduler));
return std::move(sleep_for_(std::chrono::duration_cast<std::chrono::system_clock::duration>(dt_), *this_scheduler()));
}
template<class _Clock, class _Duration = typename _Clock::duration>
@@ -30,6 +30,6 @@ namespace resumef
template<class _Clock, class _Duration>
awaitable_t<bool> sleep_until(const std::chrono::time_point<_Clock, _Duration>& tp_)
{
return std::move(sleep_until_(std::chrono::time_point_cast<std::chrono::system_clock::duration>(tp_), g_scheduler));
return std::move(sleep_until_(std::chrono::time_point_cast<std::chrono::system_clock::duration>(tp_), *this_scheduler()));
}
}

+ 115
- 0
librf/src/state.cpp View File

@@ -0,0 +1,115 @@

#include "rf_task.h"
#include "scheduler.h"
#include <assert.h>
namespace resumef
{
template<class state_tt>
struct awaitable_task_t : public task_base
{
counted_ptr<state_tt> _state;
awaitable_task_t() {}
awaitable_task_t(state_tt * state)
: _state(state)
{
}
virtual bool is_suspend() override
{
return !_state->ready();
}
virtual bool go_next(scheduler * schdler) override
{
_state->resume();
return false;
}
virtual void cancel() override
{
_state->cancel();
}
virtual void * get_id() override
{
return _state.get();
}
virtual void bind(scheduler * ) override
{
}
};
state_base::~state_base()
{
#if RESUMEF_DEBUG_COUNTER
--g_resumef_state_count;
#endif
}
void state_base::set_value_none_lock()
{
// Set all members first as calling coroutine may reset stuff here.
_ready = true;

if (_coro)
{
auto sch_ = this->current_scheduler();
/*
if (sch_ == nullptr)
sch_ = this_scheduler();
*/
sch_->push_task_internal(new awaitable_task_t<state_base>(this));
}
}
void state_base::set_exception(std::exception_ptr && e_)
{
scoped_lock<std::mutex> __guard(_mtx);
_exception = std::move(e_);
// Set all members first as calling coroutine may reset stuff here.
_ready = true;

if (_coro)
{
auto sch_ = this->current_scheduler();
/*
if (sch_ == nullptr)
sch_ = this_scheduler();
*/
sch_->push_task_internal(new awaitable_task_t<state_base>(this));
}
}
void state_base::await_suspend(coroutine_handle<> resume_cb)
{
_coro = resume_cb;
if (_current_scheduler == nullptr)
{
auto * promise_ = this->parent_promise();
if (promise_)
{
scheduler * sch_ = promise_->_state->current_scheduler();
if (sch_)
this->current_scheduler(sch_);
else
promise_->_state->_depend_states.push_back(counted_ptr<state_base>(this));
}
}
else
{
for (auto & stptr : _depend_states)
stptr->current_scheduler(_current_scheduler);
_depend_states.clear();
}
}
void state_base::current_scheduler(scheduler * sch_)
{
_current_scheduler = sch_;
for (auto & stptr : _depend_states)
stptr->current_scheduler(sch_);
_depend_states.clear();
}
}

+ 17
- 115
librf/src/state.h View File

@@ -2,6 +2,7 @@
#pragma once
#include "def.h"
#include "counted_ptr.h"
#include <iostream>
namespace resumef
@@ -17,13 +18,13 @@ namespace resumef
private:
void * _this_promise = nullptr;
scheduler * _current_scheduler = nullptr;
std::vector<counted_ptr<state_base>> _depend_states;
public:
coroutine_handle<> _coro;
std::atomic<intptr_t> _count = 0; // tracks reference count of state object
std::exception_ptr _exception;
bool _ready = false;
bool _future_acquired = false;
bool _cancellation = false;
bool _done = false;
@@ -62,7 +63,6 @@ namespace resumef
{
_coro = nullptr;
_ready = false;
_future_acquired = false;
}
void cancel()
@@ -81,11 +81,18 @@ namespace resumef
{
if (_coro)
{
std::cout << "scheduler=" << current_scheduler()
<< ",coro=" << _coro.address()
<< ",this_promise=" << this_promise()
<< ",parent_promise=" << parent_promise() << std::endl;
#if RESUMEF_DEBUG_COUNTER
{
scoped_lock<std::mutex> __lock(g_resumef_cout_mutex);
std::cout << "scheduler=" << current_scheduler()
<< ",coro=" << _coro.address()
<< ",this_promise=" << this_promise()
<< ",parent_promise=" << parent_promise()
<< ",thread=" << std::this_thread::get_id()
<< std::endl;
}
#endif
auto coro = _coro;
_coro = nullptr;
coro();
@@ -105,7 +112,7 @@ namespace resumef
}
promise_t<void> * parent_promise() const;
scheduler * parent_scheduler() const;
//scheduler * parent_scheduler() const;
void * this_promise() const
{
@@ -120,34 +127,15 @@ namespace resumef
{
return _current_scheduler;
}
void current_scheduler(scheduler * sch_)
{
_current_scheduler = sch_;
}
void current_scheduler(scheduler * sch_);
//------------------------------------------------------------------------------------------
//以下是通过future_t/promise_t, 与编译器生成的resumable function交互的接口
bool await_ready()
{
return _ready;
}
void await_suspend(coroutine_handle<> resume_cb)
{
_coro = resume_cb;
}
void await_resume()
{
}
void await_suspend(coroutine_handle<> resume_cb);
void final_suspend()
{
_done = true;
}
bool cancellation_requested() const
{
return _cancellation;
}
//以上是通过future_t/promise_t, 与编译器生成的resumable function交互的接口
//------------------------------------------------------------------------------------------
};
@@ -235,91 +223,5 @@ namespace resumef
}
};
// counted_ptr is similar to shared_ptr but allows explicit control
//
template <typename T>
struct counted_ptr
{
counted_ptr() = default;
counted_ptr(const counted_ptr& cp) : _p(cp._p)
{
_lock();
}
counted_ptr(T* p) : _p(p)
{
_lock();
}
counted_ptr(counted_ptr&& cp)
{
std::swap(_p, cp._p);
}
counted_ptr& operator=(const counted_ptr& cp)
{
if (&cp != this)
{
_unlock();
_lock(cp._p);
}
return *this;
}
counted_ptr& operator=(counted_ptr&& cp)
{
if (&cp != this)
std::swap(_p, cp._p);
return *this;
}
~counted_ptr()
{
_unlock();
}
T* operator->() const
{
return _p;
}
T* get() const
{
return _p;
}
void reset()
{
_unlock();
}
protected:
void _unlock()
{
if (_p != nullptr)
{
auto t = _p;
_p = nullptr;
t->unlock();
}
}
void _lock(T* p)
{
if (p != nullptr)
p->lock();
_p = p;
}
void _lock()
{
if (_p != nullptr)
_p->lock();
}
T* _p = nullptr;
};
template <typename T>
counted_ptr<T> make_counted()
{
return new T{};
}
}

+ 2
- 2
tutorial/test_async_cb.cpp View File

@@ -54,8 +54,8 @@ void resumable_main_cb()
auto val = co_await loop_get_long(2);
std::cout << val << std::endl;
};
//resumef::g_scheduler.run_until_notask();
//resumef::this_scheduler()->run_until_notask();
go resumable_get_long(3);
resumef::g_scheduler.run_until_notask();
resumef::this_scheduler()->run_until_notask();
}

+ 2
- 2
tutorial/test_async_channel.cpp View File

@@ -67,7 +67,7 @@ void test_channel_read_first()
go test_channel_read(c);
go test_channel_write(c);
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
}
void test_channel_write_first()
@@ -77,7 +77,7 @@ void test_channel_write_first()
go test_channel_write(c);
go test_channel_read(c);
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
}
void resumable_main_channel()

+ 1
- 1
tutorial/test_async_dynamic_go.cpp View File

@@ -41,7 +41,7 @@ void test_dynamic_go()
go co_star(1);
go co_star(2);
resumef::g_scheduler.run_until_notask();
resumef::this_scheduler()->run_until_notask();
std::cout << "dynamic_go_count = " << dynamic_go_count << std::endl;
for (auto & j : dynamic_cells)

+ 5
- 5
tutorial/test_async_event.cpp View File

@@ -37,7 +37,7 @@ void test_wait_one()
event_t evt;
go resumable_wait_event(evt);
auto tt = async_set_event(evt, 1000ms);
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
tt.join();
}
@@ -56,7 +56,7 @@ void test_wait_one()
std::cout << std::this_thread::get_id() << std::endl;
auto tt = async_set_event(evt2, 1000ms);
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
tt.join();
}
@@ -85,7 +85,7 @@ void test_wait_any()
vtt.emplace_back(async_set_event(e, 1ms * (500 + rand() % 1000)));
}
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
for (auto & tt : vtt)
tt.join();
@@ -113,7 +113,7 @@ void test_wait_all()
vtt.emplace_back(async_set_event(e, 1ms * (500 + rand() % 1000)));
}
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
for (auto & tt : vtt)
tt.join();
@@ -141,7 +141,7 @@ void test_wait_all_timeout()
vtt.emplace_back(async_set_event(e, 1ms * (500 + rand() % 1000)));
}
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
for (auto & tt : vtt)
tt.join();

+ 6
- 6
tutorial/test_async_event_timeout.cpp View File

@@ -48,7 +48,7 @@ void test_wait_timeout_one()
async_set_event(evt, 10s + 50ms);
//go resumalbe_set_event(evt, 10s + 50ms);
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
}
void test_wait_timeout_any_invalid()
@@ -65,7 +65,7 @@ void test_wait_timeout_any_invalid()
assert(idx < 0);
std::cout << "invalid wait!" << std::endl;
};
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
}
void test_wait_timeout_any()
@@ -93,7 +93,7 @@ void test_wait_timeout_any()
}
//取消剩下的定时器,以便于协程调度器退出来
g_scheduler.timer()->clear();
this_scheduler()->timer()->clear();
};
srand((int)time(nullptr));
@@ -103,7 +103,7 @@ void test_wait_timeout_any()
async_set_event(e, 1ms * (1000 + rand() % 5000));
}
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
}
void test_wait_timeout_all_invalid()
@@ -120,7 +120,7 @@ void test_wait_timeout_all_invalid()
assert(!result);
std::cout << "invalid wait!" << std::endl;
};
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
}
void test_wait_timeout_all()
@@ -154,7 +154,7 @@ void test_wait_timeout_all()
//async_set_event(e, 1ms * (1000 + rand() % 5000));
}
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
}
void resumable_main_event_timeout()

+ 2
- 2
tutorial/test_async_exception.cpp View File

@@ -81,10 +81,10 @@ future_vt test_bomb_exception()
void resumable_main_exception()
{
go test_signal_exception();
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
std::cout << std::endl;
go test_bomb_exception();
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
}

+ 90
- 0
tutorial/test_async_multi_thread.cpp View File

@@ -0,0 +1,90 @@

#include <chrono>
#include <iostream>
#include <string>
#include <conio.h>
#include <thread>
#include "librf.h"
using namespace resumef;
std::mutex cout_mutex;
//这是一个重度计算任务,只能单开线程来避免主线程被阻塞
auto async_heavy_computing_tasks(int64_t val)
{
using namespace std::chrono;
awaitable_t<int64_t> awaitable;
std::thread([val, st = awaitable._state]
{
std::this_thread::sleep_for(500ms);
st->set_value(val * val);
}).detach();
return awaitable;
}
future_vt heavy_computing_sequential(int64_t val)
{
for(size_t i = 0; i < 3; ++i)
{
{
scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << val << " @" << std::this_thread::get_id() << std::endl;
}
val = co_await async_heavy_computing_tasks(val);
}
{
scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << val << " @" << std::this_thread::get_id() << std::endl;
}
}
void test_use_single_thread()
{
//使用local_scheduler来申明一个绑定到本线程的调度器 my_scheduler
//后续在本线程运行的协程,通过this_scheduler()获得my_scheduler的地址
//从而将这些协程的所有操作都绑定到my_scheduler里面去调度
//实现一个协程始终绑定到一个线程的目的
//在同一个线程里,申明多个local_scheduler会怎么样?
//----我也不知道
//如果不申明my_scheduler,则this_scheduler()获得默认主调度器的地址
local_scheduler my_scheduler;
{
scoped_lock<std::mutex> __lock(cout_mutex);
std::cout << "running in thread @" << std::this_thread::get_id() << std::endl;
}
go heavy_computing_sequential(2);
this_scheduler()->run_until_notask();
}
const size_t N = 2;
void test_use_multi_thread()
{
std::thread th_array[N];
for (size_t i = 0; i < N; ++i)
th_array[i] = std::thread(&test_use_single_thread);
test_use_single_thread();
for (auto & th : th_array)
th.join();
}
void resumable_main_multi_thread()
{
std::cout << "test_use_single_thread @" << std::this_thread::get_id() << std::endl << std::endl;
test_use_single_thread();
std::cout << std::endl;
std::cout << "test_use_multi_thread @" << std::this_thread::get_id() << std::endl << std::endl;
test_use_multi_thread();
//运行主调度器里面的协程
//但本范例不应该有协程存在,仅演示不要忽略了主调度器
scheduler::g_scheduler.run_until_notask();
}

+ 1
- 1
tutorial/test_async_mutex.cpp View File

@@ -51,5 +51,5 @@ void resumable_main_mutex()
go test_mutex_push();
go test_mutex_pop(1);
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
}

+ 1
- 1
tutorial/test_async_resumable.cpp View File

@@ -38,7 +38,7 @@ void resumable_switch(int coro)
return N / coro;
};
}
resumef::g_scheduler.run_until_notask();
resumef::this_scheduler()->run_until_notask();
auto end = std::chrono::steady_clock::now();
dump("BenchmarkSwitch_" + std::to_string(coro), N, start, end);

+ 1
- 1
tutorial/test_async_routine.cpp View File

@@ -38,5 +38,5 @@ void resumable_main_routine()
{
go test_routine_use_timer_2();
//go test_routine_use_timer();
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
}

+ 3
- 3
tutorial/test_async_sleep.cpp View File

@@ -47,9 +47,9 @@ void test_wait_all_events_with_signal_by_sleep()
};
}
while (!g_scheduler.empty())
while (!this_scheduler()->empty())
{
g_scheduler.run_one_batch();
this_scheduler()->run_one_batch();
//std::cout << "press any key to continue." << std::endl;
//_getch();
}
@@ -58,7 +58,7 @@ void test_wait_all_events_with_signal_by_sleep()
void resumable_main_sleep()
{
go test_sleep_use_timer();
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
std::cout << std::endl;
test_wait_all_events_with_signal_by_sleep();

+ 1
- 1
tutorial/test_async_suspend_always.cpp View File

@@ -54,7 +54,7 @@ void resumable_main_suspend_always()
{
go test_recursive_await();
go test_recursive_go();
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
}
/*

+ 3
- 3
tutorial/test_async_timer.cpp View File

@@ -13,7 +13,7 @@ void resumable_main_timer()
{
using namespace std::chrono;
auto th = g_scheduler.timer()->add_handler(system_clock::now() + 5s,
auto th = this_scheduler()->timer()->add_handler(system_clock::now() + 5s,
[](bool bValue)
{
if (bValue)
@@ -22,14 +22,14 @@ void resumable_main_timer()
std::cout << "timer after 5s." << std::endl;
});
auto th2 = g_scheduler.timer()->add_handler(1s,
auto th2 = this_scheduler()->timer()->add_handler(1s,
[&th](bool)
{
std::cout << "timer after 1s." << std::endl;
th.stop();
});
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
th2.stop(); //but th2 is invalid

+ 2
- 2
tutorial/test_async_yield_return.cpp View File

@@ -64,8 +64,8 @@ auto test_yield_void()
void resumable_main_yield_return()
{
go test_yield_int();
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
go test_yield_void();
g_scheduler.run_until_notask();
this_scheduler()->run_until_notask();
}

+ 2
- 1
vs_proj/librf.cpp View File

@@ -14,12 +14,13 @@ extern void resumable_main_event_timeout();
extern void resumable_main_dynamic_go();
extern void resumable_main_channel();
extern void resumable_main_cb();
extern void resumable_main_multi_thread();
extern void resumable_main_benchmark_mem();
int main(int argc, const char * argv[])
{
resumable_main_routine();
resumable_main_multi_thread();
return 0;
resumable_main_yield_return();

+ 4
- 1
vs_proj/librf.vcxproj View File

@@ -100,7 +100,7 @@
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;RESUMEF_DEBUG_COUNTER=1;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<SDLCheck>true</SDLCheck>
<AdditionalIncludeDirectories>..\librf;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalOptions>/await /std:c++latest </AdditionalOptions>
@@ -153,6 +153,7 @@
<ClCompile Include="..\librf\src\rf_task.cpp" />
<ClCompile Include="..\librf\src\scheduler.cpp" />
<ClCompile Include="..\librf\src\sleep.cpp" />
<ClCompile Include="..\librf\src\state.cpp" />
<ClCompile Include="..\librf\src\timer.cpp" />
<ClCompile Include="..\tutorial\test_async_cb.cpp" />
<ClCompile Include="..\tutorial\test_async_channel.cpp" />
@@ -160,6 +161,7 @@
<ClCompile Include="..\tutorial\test_async_event.cpp" />
<ClCompile Include="..\tutorial\test_async_event_timeout.cpp" />
<ClCompile Include="..\tutorial\test_async_exception.cpp" />
<ClCompile Include="..\tutorial\test_async_multi_thread.cpp" />
<ClCompile Include="..\tutorial\test_async_mutex.cpp" />
<ClCompile Include="..\tutorial\test_async_resumable.cpp" />
<ClCompile Include="..\tutorial\test_async_routine.cpp" />
@@ -173,6 +175,7 @@
<ClInclude Include="..\librf\librf.h" />
<ClInclude Include="..\librf\src\asio_task.h" />
<ClInclude Include="..\librf\src\channel.h" />
<ClInclude Include="..\librf\src\counted_ptr.h" />
<ClInclude Include="..\librf\src\def.h" />
<ClInclude Include="..\librf\src\event.h" />
<ClInclude Include="..\librf\src\future.h" />

+ 9
- 0
vs_proj/librf.vcxproj.filters View File

@@ -85,6 +85,12 @@
<ClCompile Include="..\benchmark\benchmark_async_mem.cpp">
<Filter>benchmark</Filter>
</ClCompile>
<ClCompile Include="..\librf\src\state.cpp">
<Filter>librf\src</Filter>
</ClCompile>
<ClCompile Include="..\tutorial\test_async_multi_thread.cpp">
<Filter>tutorial</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\librf\librf.h">
@@ -138,5 +144,8 @@
<ClInclude Include="..\librf\src\unix\coroutine.h">
<Filter>librf\src\unix</Filter>
</ClInclude>
<ClInclude Include="..\librf\src\counted_ptr.h">
<Filter>librf\src</Filter>
</ClInclude>
</ItemGroup>
</Project>

Loading…
Cancel
Save