Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ option(CGRAPH_BUILD_PERFORMANCE_TESTS "Enable build performance tests" OFF)
# 如果开启此宏定义,则CGraph执行过程中,不会在控制台打印任何信息
# add_definitions(-D_CGRAPH_SILENCE_)

# 此宏可以在纯并发的微小任务下,用于提升整体性能。主要用于在性能测试的情况下使用,一般情况不推荐打开
# add_definitions(-D_CGRAPH_PARALLEL_MICRO_BATCH_ENABLE_)

# 此宏用于在读写GParam加锁的时候,加锁方式修改为读写锁的情况
# 仅在cpp17或以上版本生效
# 打开后会在多读少写的情况下有性能优化
Expand Down
1 change: 1 addition & 0 deletions python/PyCGraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ PYBIND11_MODULE(pycgraph, cg) {
.def_readwrite("max_local_batch_size", &UThreadPoolConfig::max_local_batch_size_)
.def_readwrite("max_pool_batch_size", &UThreadPoolConfig::max_pool_batch_size_)
.def_readwrite("max_steal_batch_size", &UThreadPoolConfig::max_steal_batch_size_)
.def_readwrite("pipeline_wait_busy_epoch", &UThreadPoolConfig::pipeline_wait_busy_epoch_)
.def_readwrite("primary_thread_busy_epoch", &UThreadPoolConfig::primary_thread_busy_epoch_)
.def_readwrite("primary_thread_empty_interval", &UThreadPoolConfig::primary_thread_empty_interval_)
.def_readwrite("secondary_thread_ttl", &UThreadPoolConfig::secondary_thread_ttl_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ CVoid GDynamicEngine::afterElementRun(GElementPtr element) {


CVoid GDynamicEngine::fatWait() {
const auto epoch = thread_pool_->getConfig().pipeline_wait_busy_epoch_;
for (CInt i = 0; i < epoch; i++) {
// 对于 pipeline 运行耗时很短的情况,在 进入 cv.wait() 之前,轮询等到一会
// 如果遇到 isErr() 的情况,还是走进入 mutex 的逻辑
if (finished_end_size_.load(std::memory_order_acquire) >= total_end_size_) {
return;
}
CGRAPH_YIELD();
}

CGRAPH_UNIQUE_LOCK lock(locker_.mtx_);
locker_.cv_.wait(lock, [this] {
/**
Expand All @@ -216,27 +226,6 @@ CVoid GDynamicEngine::fatWait() {
}


#ifdef _CGRAPH_PARALLEL_MICRO_BATCH_ENABLE_
CVoid GDynamicEngine::parallelRunAll() {
// 微任务模式,主要用于性能测试的场景下
std::vector<std::future<CStatus>> futures;
for (auto& elements : parallel_element_matrix_) {
auto curFut = thread_pool_->commit([elements] {
CGRAPH_FUNCTION_BEGIN
for (auto* element : elements) {
status += element->fatProcessor(CFunctionType::RUN);
CGRAPH_FUNCTION_CHECK_STATUS
}
CGRAPH_FUNCTION_END;
});
futures.emplace_back(std::move(curFut));
}

for (auto& fut : futures) {
cur_status_ += fut.get();
}
}
#else
CVoid GDynamicEngine::parallelRunAll() {
parallel_run_num_ = 0;
const CIndex& totalSize = static_cast<CIndex>(parallel_element_matrix_.size());
Expand All @@ -261,14 +250,21 @@ CVoid GDynamicEngine::parallelRunAll() {
(void)thread_pool_->wakeupAllThread();
}

const auto epoch = thread_pool_->getConfig().pipeline_wait_busy_epoch_;
for (CInt i = 0; i < epoch; i++) {
if (parallel_run_num_ >= total_end_size_) {
return;
}
CGRAPH_YIELD();
}

{
CGRAPH_UNIQUE_LOCK lock(locker_.mtx_);
locker_.cv_.wait(lock, [this] {
return (parallel_run_num_ >= total_end_size_ || cur_status_.isErr());
});
}
}
#endif


CVoid GDynamicEngine::parallelRunOne(GElementPtr element) {
Expand Down
1 change: 1 addition & 0 deletions src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ struct UThreadPoolConfig : public CStruct {
CInt max_local_batch_size_ = CGRAPH_MAX_LOCAL_BATCH_SIZE;
CInt max_pool_batch_size_ = CGRAPH_MAX_POOL_BATCH_SIZE;
CInt max_steal_batch_size_ = CGRAPH_MAX_STEAL_BATCH_SIZE;
CInt pipeline_wait_busy_epoch_ = CGRAPH_PIPIELINE_WAIT_BUSY_EPOCH;
CInt primary_thread_busy_epoch_ = CGRAPH_PRIMARY_THREAD_BUSY_EPOCH;
CMSec primary_thread_empty_interval_ = CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL;
CSec secondary_thread_ttl_ = CGRAPH_SECONDARY_THREAD_TTL;
Expand Down
1 change: 1 addition & 0 deletions src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ static const CBool CGRAPH_BATCH_TASK_ENABLE = false;
static const CInt CGRAPH_MAX_LOCAL_BATCH_SIZE = 2; // 批量执行本地任务最大值
static const CInt CGRAPH_MAX_POOL_BATCH_SIZE = 2; // 批量执行通用任务最大值
static const CInt CGRAPH_MAX_STEAL_BATCH_SIZE = 2; // 批量盗取任务最大值
static const CInt CGRAPH_PIPIELINE_WAIT_BUSY_EPOCH = 0; // pipeline 开始wait结束时,等待轮数。在多个小任务的情况下,极大提升性能
static const CInt CGRAPH_PRIMARY_THREAD_BUSY_EPOCH = 3; // 主线程进入wait状态的轮数,数值越大,理论性能越高,但空转可能性也越大
static const CMSec CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL = 1000; // 主线程进入休眠状态的默认时间
static const CSec CGRAPH_SECONDARY_THREAD_TTL = 10; // 辅助线程ttl,单位为s
Expand Down
3 changes: 2 additions & 1 deletion test/Performance/test-performance-01.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ void test_performance_01() {
config.max_task_steal_range_ = 7;
config.max_thread_size_ = 8;
config.primary_thread_empty_interval_ = 0;
config.primary_thread_busy_epoch_ = 500;
config.primary_thread_busy_epoch_ = 10;
config.pipeline_wait_busy_epoch_ = 100;
pipeline->setUniqueThreadPoolConfig(config);
for (auto& i : arr) {
pipeline->registerGElement<TestAdd1GNode>(&i);
Expand Down
3 changes: 2 additions & 1 deletion test/Performance/test-performance-03.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ void test_performance_03() {
config.secondary_thread_size_ = 0;
config.max_task_steal_range_ = 1;
config.max_thread_size_ = 2;
config.pipeline_wait_busy_epoch_ = 100;
config.primary_thread_empty_interval_ = 0;
config.primary_thread_busy_epoch_ = 500;
config.primary_thread_busy_epoch_ = 100;
config.monitor_enable_ = false; // 关闭扩缩容机制
pipeline->setUniqueThreadPoolConfig(config);
pipeline->registerGElement<TestAdd1GNode>(&a);
Expand Down
Loading