Browse Source

优化调度器所花费的时间

tags/v2.9.7
tearshark 5 years ago
parent
commit
3e81636db7

+ 13
- 3
benchmark/benchmark_async_mem.cpp View File

const size_t N = 1000000; const size_t N = 1000000;
volatile size_t globalValue = 0;
void resumable_main_benchmark_mem() void resumable_main_benchmark_mem()
{ {
using namespace std::chrono; using namespace std::chrono;
resumef::state_t<void> st;
std::cout << sizeof(st) << " " << sizeof(resumef::promise_vt) << std::endl;
for (size_t i = 0; i < N; ++i) for (size_t i = 0; i < N; ++i)
{ {
GO
go[=]()->resumef::future_t<size_t>
{ {
for(size_t k = 0; k<100; ++k)
co_await resumef::sleep_for(10s);
for (size_t k = 0; k < 10; ++k)
{
globalValue += i * k;
co_yield k;
}
return 0;
}; };
} }
resumef::this_scheduler()->run_until_notask(); resumef::this_scheduler()->run_until_notask();
_getch();
} }

+ 2
- 0
librf/src/rf_task.cpp View File

namespace resumef namespace resumef
{ {
task_base::task_base() task_base::task_base()
: _next_node(nullptr)
, _prev_node(nullptr)
{ {
#if RESUMEF_DEBUG_COUNTER #if RESUMEF_DEBUG_COUNTER
++g_resumef_task_count; ++g_resumef_task_count;

+ 2
- 0
librf/src/rf_task.h View File

#if RESUMEF_ENABLE_MULT_SCHEDULER #if RESUMEF_ENABLE_MULT_SCHEDULER
virtual void bind(scheduler *) = 0; virtual void bind(scheduler *) = 0;
#endif #endif
task_base * _next_node;
task_base * _prev_node;
}; };
//---------------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------------

+ 13
- 26
librf/src/scheduler.cpp View File

{ {
if (task) if (task)
{ {
scoped_lock<std::recursive_mutex> __guard(_mtx_ready);
scoped_lock<spinlock> __guard(_mtx_ready);
#if RESUMEF_ENABLE_MULT_SCHEDULER #if RESUMEF_ENABLE_MULT_SCHEDULER
task->bind(this); task->bind(this);
#endif #endif
void scheduler::cancel_all_task_() void scheduler::cancel_all_task_()
{ {
{ {
scoped_lock<std::recursive_mutex> __guard(_mtx_task);
for (auto task : this->_task)
{
task->cancel();
delete task;
}
this->_task.clear();
scoped_lock<lock_type> __guard(_mtx_task);
this->_task.clear(true);
} }
{ {
scoped_lock<std::recursive_mutex> __guard(_mtx_ready);
for (auto task : this->_ready_task)
{
task->cancel();
delete task;
}
this->_ready_task.clear();
scoped_lock<spinlock> __guard(_mtx_ready);
this->_ready_task.clear(true);
} }
} }
{ {
cancel_all_task_(); cancel_all_task_();
scoped_lock<std::recursive_mutex> __guard(_mtx_task);
scoped_lock<lock_type> __guard(_mtx_task);
this->_timer->clear(); this->_timer->clear();
} }
th_scheduler_ptr = this; th_scheduler_ptr = this;
#endif #endif
{ {
scoped_lock<std::recursive_mutex> __guard(_mtx_task);
scoped_lock<lock_type> __guard(_mtx_task);
this->_timer->update(); this->_timer->update();
using namespace std::chrono; using namespace std::chrono;
for (auto iter = this->_task.begin(); iter != this->_task.end(); )
for (auto task = this->_task.begin(); task != nullptr; )
{ {
#if _DEBUG #if _DEBUG
#define MAX_TIME_COST 10000us #define MAX_TIME_COST 10000us
#endif #endif
// time_cost_evaluation<microseconds> eva(MAX_TIME_COST); // time_cost_evaluation<microseconds> eva(MAX_TIME_COST);
auto task = *iter;
if (task->is_suspend() || task->go_next(this)) if (task->is_suspend() || task->go_next(this))
{ {
// eva.add("coscheduler"); // eva.add("coscheduler");
++iter;
task = task->_next_node;
continue; continue;
} }
iter = this->_task.erase(iter);
delete task;
task = this->_task.erase(task, false);
} }
{ {
scoped_lock<std::recursive_mutex> __guard(_mtx_ready);
if (this->_ready_task.size() > 0)
scoped_lock<spinlock> __guard(_mtx_ready);
if (!this->_ready_task.empty())
{ {
this->_task.insert(this->_task.end(), this->_ready_task.begin(), this->_ready_task.end());
this->_ready_task.clear();
this->_task.merge_back(this->_ready_task);
} }
} }
} }

+ 10
- 8
librf/src/scheduler.h View File

//#include <yvals.h> //#include <yvals.h>
#include "rf_task.h" #include "rf_task.h"
#include "task_list.h"
#include "utils.h" #include "utils.h"
#include "timer.h" #include "timer.h"
struct scheduler : public std::enable_shared_from_this<scheduler> struct scheduler : public std::enable_shared_from_this<scheduler>
{ {
private: private:
mutable std::recursive_mutex _mtx_ready;
std::deque<task_base *> _ready_task;
//typedef spinlock lock_type;
typedef std::recursive_mutex lock_type;
mutable std::recursive_mutex _mtx_task;
std::list<task_base *> _task;
mutable spinlock _mtx_ready;
task_list _ready_task;
mutable lock_type _mtx_task;
task_list _task;
timer_mgr_ptr _timer; timer_mgr_ptr _timer;
RF_API void new_task(task_base * task); RF_API void new_task(task_base * task);
inline bool empty() const inline bool empty() const
{ {
scoped_lock<std::recursive_mutex, std::recursive_mutex> __guard(_mtx_ready, _mtx_task);
scoped_lock<spinlock, lock_type> __guard(_mtx_ready, _mtx_task);
return
(this->_task.size() + this->_ready_task.size()) == 0 &&
this->_timer->empty();
return this->_task.empty() && this->_ready_task.empty() && this->_timer->empty();
} }
RF_API void break_all(); RF_API void break_all();

+ 28
- 10
librf/src/spinlock.h View File

{ {
struct spinlock struct spinlock
{ {
volatile std::atomic_flag lck;
static const size_t MAX_ACTIVE_SPIN = 4000;
static const int FREE_VALUE = 0;
static const int LOCKED_VALUE = 1;
volatile std::atomic<int> lck;
spinlock() spinlock()
{ {
lck.clear();
lck = FREE_VALUE;
} }
void lock() void lock()
{ {
if (std::atomic_flag_test_and_set_explicit(&lck, std::memory_order_acquire))
using namespace std::chrono;
int val = FREE_VALUE;
if (!lck.compare_exchange_weak(val, LOCKED_VALUE, std::memory_order_acquire))
{ {
using namespace std::chrono;
size_t spinCount = 0;
auto dt = 1ms; auto dt = 1ms;
while (std::atomic_flag_test_and_set_explicit(&lck, std::memory_order_acquire))
do
{ {
std::this_thread::sleep_for(dt);
dt *= 2;
}
while (lck.load(std::memory_order_relaxed) != FREE_VALUE)
{
if (spinCount < MAX_ACTIVE_SPIN)
++spinCount;
else
{
std::this_thread::sleep_for(dt);
dt *= 2;
}
}
val = FREE_VALUE;
} while (!lck.compare_exchange_weak(val, LOCKED_VALUE, std::memory_order_acquire));
} }
} }
bool try_lock() bool try_lock()
{ {
bool ret = !std::atomic_flag_test_and_set_explicit(&lck, std::memory_order_acquire);
int val = FREE_VALUE;
bool ret = lck.compare_exchange_weak(val, LOCKED_VALUE, std::memory_order_acquire);
return ret; return ret;
} }
void unlock() void unlock()
{ {
std::atomic_flag_clear_explicit(&lck, std::memory_order_release);
lck.store(FREE_VALUE, std::memory_order_release);
} }
}; };
} }

+ 3
- 0
librf/src/state.h View File

#pragma once #pragma once
#include "def.h" #include "def.h"
#include "spinlock.h"
#include "counted_ptr.h" #include "counted_ptr.h"
#include <iostream> #include <iostream>
struct state_base struct state_base
{ {
protected: protected:
//typedef spinlock lock_type;
typedef std::recursive_mutex lock_type; typedef std::recursive_mutex lock_type;
lock_type _mtx; //for value, _exception lock_type _mtx; //for value, _exception
RF_API void set_value_none_lock(); RF_API void set_value_none_lock();
#if RESUMEF_ENABLE_MULT_SCHEDULER #if RESUMEF_ENABLE_MULT_SCHEDULER

+ 168
- 0
librf/src/task_list.h View File

#pragma once
namespace resumef
{
struct task_list
{
using value_type = task_base;
using size_type = std::size_t;
using difference_type = std::ptrdiff_t;
using pointer = value_type *;
using const_pointer = const value_type *;
using reference = value_type & ;
using const_reference = const value_type&;
//using iterator = typename _Mybase::iterator;
//using const_iterator = typename _Mybase::const_iterator;
//using reverse_iterator = std::reverse_iterator<iterator>;
//using const_reverse_iterator = std::reverse_iterator<const_iterator>;
private:
pointer _M_header;
pointer _M_last;
public:
task_list()
: _M_header(nullptr)
, _M_last(nullptr)
{
}
~task_list()
{
clear(true);
}
task_list(const task_list& _Right) = delete;
task_list& operator=(const task_list& _Right) = delete;
task_list(task_list&& _Right)
{
_M_header = _Right._M_header;
_Right._M_header = nullptr;
_M_last = _Right._M_last;
_Right._M_last = nullptr;
}
task_list& operator=(task_list&& _Right)
{ // assign by moving _Right
if (this != std::addressof(_Right))
{ // different, assign it
clear(true);
_Move_assign(_Right);
}
return (*this);
}
void clear(bool cancel_)
{
pointer header = _M_header;
_M_header = _M_last = nullptr;
for (; header; )
{
pointer temp = header;
header = header->_next_node;
if (cancel_)
temp->cancel();
delete temp;
}
}
void push_back(pointer node)
{
assert(node != nullptr);
assert(node->_next_node == nullptr);
assert(node->_prev_node == nullptr);
_Push_back(node);
}
pointer erase(pointer node, bool cancel_)
{
assert(node != nullptr);
pointer const next = node->_next_node;
pointer const prev = node->_prev_node;
if (next)
next->_prev_node = prev;
if (prev)
prev->_next_node = next;
if (_M_header == node)
_M_header = next;
if (_M_last == node)
_M_last = prev;
if (cancel_)
node->cancel();
delete node;
return next;
}
void merge_back(task_list & _Right)
{
if (this == std::addressof(_Right) || _Right._M_header == nullptr)
return;
if (_M_header == nullptr)
{
_Move_assign(_Right);
}
else
{
assert(_M_last != nullptr);
_M_last->_next_node = _Right._M_header;
_Right._M_header->_prev_node = _M_last;
_M_last = _Right._M_last;
_Right._M_header = _Right._M_last = nullptr;
}
}
bool empty() const
{
return _M_header == nullptr;
}
pointer begin() const
{
return _M_header;
}
pointer end() const
{
return nullptr;
}
private:
void _Push_back(pointer node)
{
if (_M_header == nullptr)
{
assert(_M_last == nullptr);
_M_header = _M_last = node;
}
else
{
assert(_M_last != nullptr);
_M_last->_next_node = node;
node->_prev_node = _M_last;
_M_last = node;
}
}
void _Move_assign(task_list& _Right)
{
assert(this != std::addressof(_Right));
_M_header = _Right._M_header;
_Right._M_header = nullptr;
_M_last = _Right._M_last;
_Right._M_last = nullptr;
}
};
}

+ 7
- 3
tutorial/test_async_resumable.cpp View File

#include "librf.h" #include "librf.h"
//static const int N = 100000000;
static const int N = 10;
static const int N = 10000000;
//static const int N = 10;
template <typename T> template <typename T>
void dump(std::string name, int n, T start, T end) void dump(std::string name, int n, T start, T end)
void resumable_main_resumable() void resumable_main_resumable()
{ {
resumable_switch(1); resumable_switch(1);
resumable_switch(10);
resumable_switch(100);
resumable_switch(1000); resumable_switch(1000);
resumable_switch(1000000);
resumable_switch(10000);
//resumable_switch(10000000);
} }

+ 2
- 3
vs_proj/librf.cpp View File

int main(int argc, const char * argv[]) int main(int argc, const char * argv[])
{ {
//resumable_main_resumable();
resumable_main_when_all(); resumable_main_when_all();
//resumable_main_multi_thread();
return 0;
resumable_main_multi_thread();
resumable_main_yield_return(); resumable_main_yield_return();
resumable_main_timer(); resumable_main_timer();
resumable_main_suspend_always(); resumable_main_suspend_always();

+ 1
- 0
vs_proj/librf.vcxproj View File

<ClInclude Include="..\librf\src\event.h" /> <ClInclude Include="..\librf\src\event.h" />
<ClInclude Include="..\librf\src\future.h" /> <ClInclude Include="..\librf\src\future.h" />
<ClInclude Include="..\librf\src\generator.h" /> <ClInclude Include="..\librf\src\generator.h" />
<ClInclude Include="..\librf\src\task_list.h" />
<ClInclude Include="..\librf\src\mutex.h" /> <ClInclude Include="..\librf\src\mutex.h" />
<ClInclude Include="..\librf\src\rf_task.h" /> <ClInclude Include="..\librf\src\rf_task.h" />
<ClInclude Include="..\librf\src\scheduler.h" /> <ClInclude Include="..\librf\src\scheduler.h" />

+ 3
- 0
vs_proj/librf.vcxproj.filters View File

<ClInclude Include="..\librf\src\when.h"> <ClInclude Include="..\librf\src\when.h">
<Filter>librf\src</Filter> <Filter>librf\src</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="..\librf\src\task_list.h">
<Filter>librf\src</Filter>
</ClInclude>
</ItemGroup> </ItemGroup>
</Project> </Project>

Loading…
Cancel
Save