diff --git a/CMakeLists.txt b/CMakeLists.txt index bf4831f9..af0419d1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -25,9 +25,6 @@ option(CGRAPH_BUILD_PERFORMANCE_TESTS "Enable build performance tests" OFF) # 如果开启此宏定义,则CGraph执行过程中,不会在控制台打印任何信息 # add_definitions(-D_CGRAPH_SILENCE_) -# 此宏可以在纯并发的微小任务下,用于提升整体性能。主要用于在性能测试的情况下使用,一般情况不推荐打开 -# add_definitions(-D_CGRAPH_PARALLEL_MICRO_BATCH_ENABLE_) - # 此宏用于在读写GParam加锁的时候,加锁方式修改为读写锁的情况 # 仅在cpp17或以上版本生效 # 打开后会在多读少写的情况下有性能优化 diff --git a/python/PyCGraph.cpp b/python/PyCGraph.cpp index 98c24e8c..36bbe7ec 100644 --- a/python/PyCGraph.cpp +++ b/python/PyCGraph.cpp @@ -28,6 +28,7 @@ PYBIND11_MODULE(pycgraph, cg) { .def_readwrite("max_local_batch_size", &UThreadPoolConfig::max_local_batch_size_) .def_readwrite("max_pool_batch_size", &UThreadPoolConfig::max_pool_batch_size_) .def_readwrite("max_steal_batch_size", &UThreadPoolConfig::max_steal_batch_size_) + .def_readwrite("pipeline_wait_busy_epoch", &UThreadPoolConfig::pipeline_wait_busy_epoch_) .def_readwrite("primary_thread_busy_epoch", &UThreadPoolConfig::primary_thread_busy_epoch_) .def_readwrite("primary_thread_empty_interval", &UThreadPoolConfig::primary_thread_empty_interval_) .def_readwrite("secondary_thread_ttl", &UThreadPoolConfig::secondary_thread_ttl_) diff --git a/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp b/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp index a195500f..5bf8a5b6 100644 --- a/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp +++ b/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp @@ -204,6 +204,16 @@ CVoid GDynamicEngine::afterElementRun(GElementPtr element) { CVoid GDynamicEngine::fatWait() { + const auto epoch = thread_pool_->getConfig().pipeline_wait_busy_epoch_; + for (CInt i = 0; i < epoch; i++) { + // 对于 pipeline 运行耗时很短的情况,在 进入 cv.wait() 之前,轮询等到一会 + // 如果遇到 isErr() 的情况,还是走进入 mutex 的逻辑 + if (finished_end_size_.load(std::memory_order_acquire) >= total_end_size_) { + return; + } + CGRAPH_YIELD(); + } + CGRAPH_UNIQUE_LOCK lock(locker_.mtx_); locker_.cv_.wait(lock, [this] { /** @@ -216,27 +226,6 @@ CVoid GDynamicEngine::fatWait() { } -#ifdef _CGRAPH_PARALLEL_MICRO_BATCH_ENABLE_ -CVoid GDynamicEngine::parallelRunAll() { - // 微任务模式,主要用于性能测试的场景下 - std::vector> futures; - for (auto& elements : parallel_element_matrix_) { - auto curFut = thread_pool_->commit([elements] { - CGRAPH_FUNCTION_BEGIN - for (auto* element : elements) { - status += element->fatProcessor(CFunctionType::RUN); - CGRAPH_FUNCTION_CHECK_STATUS - } - CGRAPH_FUNCTION_END; - }); - futures.emplace_back(std::move(curFut)); - } - - for (auto& fut : futures) { - cur_status_ += fut.get(); - } -} -#else CVoid GDynamicEngine::parallelRunAll() { parallel_run_num_ = 0; const CIndex& totalSize = static_cast(parallel_element_matrix_.size()); @@ -261,6 +250,14 @@ CVoid GDynamicEngine::parallelRunAll() { (void)thread_pool_->wakeupAllThread(); } + const auto epoch = thread_pool_->getConfig().pipeline_wait_busy_epoch_; + for (CInt i = 0; i < epoch; i++) { + if (parallel_run_num_ >= total_end_size_) { + return; + } + CGRAPH_YIELD(); + } + { CGRAPH_UNIQUE_LOCK lock(locker_.mtx_); locker_.cv_.wait(lock, [this] { @@ -268,7 +265,6 @@ CVoid GDynamicEngine::parallelRunAll() { }); } } -#endif CVoid GDynamicEngine::parallelRunOne(GElementPtr element) { diff --git a/src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h b/src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h index 76a19ae8..14147ef9 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h +++ b/src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h @@ -25,6 +25,7 @@ struct UThreadPoolConfig : public CStruct { CInt max_local_batch_size_ = CGRAPH_MAX_LOCAL_BATCH_SIZE; CInt max_pool_batch_size_ = CGRAPH_MAX_POOL_BATCH_SIZE; CInt max_steal_batch_size_ = CGRAPH_MAX_STEAL_BATCH_SIZE; + CInt pipeline_wait_busy_epoch_ = CGRAPH_PIPIELINE_WAIT_BUSY_EPOCH; CInt primary_thread_busy_epoch_ = CGRAPH_PRIMARY_THREAD_BUSY_EPOCH; CMSec primary_thread_empty_interval_ = CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL; CSec secondary_thread_ttl_ = CGRAPH_SECONDARY_THREAD_TTL; diff --git a/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h b/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h index d090ba1a..5b5e6352 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h +++ b/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h @@ -54,6 +54,7 @@ static const CBool CGRAPH_BATCH_TASK_ENABLE = false; static const CInt CGRAPH_MAX_LOCAL_BATCH_SIZE = 2; // 批量执行本地任务最大值 static const CInt CGRAPH_MAX_POOL_BATCH_SIZE = 2; // 批量执行通用任务最大值 static const CInt CGRAPH_MAX_STEAL_BATCH_SIZE = 2; // 批量盗取任务最大值 +static const CInt CGRAPH_PIPIELINE_WAIT_BUSY_EPOCH = 0; // pipeline 开始wait结束时,等待轮数。在多个小任务的情况下,极大提升性能 static const CInt CGRAPH_PRIMARY_THREAD_BUSY_EPOCH = 3; // 主线程进入wait状态的轮数,数值越大,理论性能越高,但空转可能性也越大 static const CMSec CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL = 1000; // 主线程进入休眠状态的默认时间 static const CSec CGRAPH_SECONDARY_THREAD_TTL = 10; // 辅助线程ttl,单位为s diff --git a/test/Performance/test-performance-01.cpp b/test/Performance/test-performance-01.cpp index bcf599bb..9df436a0 100644 --- a/test/Performance/test-performance-01.cpp +++ b/test/Performance/test-performance-01.cpp @@ -24,7 +24,8 @@ void test_performance_01() { config.max_task_steal_range_ = 7; config.max_thread_size_ = 8; config.primary_thread_empty_interval_ = 0; - config.primary_thread_busy_epoch_ = 500; + config.primary_thread_busy_epoch_ = 10; + config.pipeline_wait_busy_epoch_ = 100; pipeline->setUniqueThreadPoolConfig(config); for (auto& i : arr) { pipeline->registerGElement(&i); diff --git a/test/Performance/test-performance-03.cpp b/test/Performance/test-performance-03.cpp index 39908df0..5d0c3988 100644 --- a/test/Performance/test-performance-03.cpp +++ b/test/Performance/test-performance-03.cpp @@ -22,8 +22,9 @@ void test_performance_03() { config.secondary_thread_size_ = 0; config.max_task_steal_range_ = 1; config.max_thread_size_ = 2; + config.pipeline_wait_busy_epoch_ = 100; config.primary_thread_empty_interval_ = 0; - config.primary_thread_busy_epoch_ = 500; + config.primary_thread_busy_epoch_ = 100; config.monitor_enable_ = false; // 关闭扩缩容机制 pipeline->setUniqueThreadPoolConfig(config); pipeline->registerGElement(&a);