3 * Copyright 2019 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/lib/iomgr/executor/threadpool.h"
25 void ThreadPoolWorker::Run() {
29 if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
30 // Updates stats and print
31 gpr_timespec wait_time = gpr_time_0(GPR_TIMESPAN);
32 elem = queue_->Get(&wait_time);
33 stats_.sleep_time = gpr_time_add(stats_.sleep_time, wait_time);
35 "ThreadPool Worker [%s %d] Stats: sleep_time %f",
36 thd_name_, index_, gpr_timespec_to_micros(stats_.sleep_time));
38 elem = queue_->Get(nullptr);
40 if (elem == nullptr) {
45 static_cast<grpc_experimental_completion_queue_functor*>(elem);
46 closure->functor_run(closure, closure->internal_success);
50 void ThreadPool::SharedThreadPoolConstructor() {
51 // All worker threads in thread pool must be joinable.
52 thread_options_.set_joinable(true);
54 // Create at least 1 worker thread.
55 if (num_threads_ <= 0) num_threads_ = 1;
57 queue_ = New<InfLenFIFOQueue>();
58 threads_ = static_cast<ThreadPoolWorker**>(
59 gpr_zalloc(num_threads_ * sizeof(ThreadPoolWorker*)));
60 for (int i = 0; i < num_threads_; ++i) {
62 New<ThreadPoolWorker>(thd_name_, this, queue_, thread_options_, i);
67 size_t ThreadPool::DefaultStackSize() {
68 #if defined(__ANDROID__) || defined(__APPLE__)
75 void ThreadPool::AssertHasNotBeenShutDown() {
76 // For debug checking purpose, using RELAXED order is sufficient.
77 GPR_DEBUG_ASSERT(!shut_down_.Load(MemoryOrder::RELAXED));
80 ThreadPool::ThreadPool(int num_threads) : num_threads_(num_threads) {
81 thd_name_ = "ThreadPoolWorker";
82 thread_options_ = Thread::Options();
83 thread_options_.set_stack_size(DefaultStackSize());
84 SharedThreadPoolConstructor();
87 ThreadPool::ThreadPool(int num_threads, const char* thd_name)
88 : num_threads_(num_threads), thd_name_(thd_name) {
89 thread_options_ = Thread::Options();
90 thread_options_.set_stack_size(DefaultStackSize());
91 SharedThreadPoolConstructor();
94 ThreadPool::ThreadPool(int num_threads, const char* thd_name,
95 const Thread::Options& thread_options)
96 : num_threads_(num_threads),
98 thread_options_(thread_options) {
99 if (thread_options_.stack_size() == 0) {
100 thread_options_.set_stack_size(DefaultStackSize());
102 SharedThreadPoolConstructor();
105 ThreadPool::~ThreadPool() {
106 // For debug checking purpose, using RELAXED order is sufficient.
107 shut_down_.Store(true, MemoryOrder::RELAXED);
109 for (int i = 0; i < num_threads_; ++i) {
110 queue_->Put(nullptr);
113 for (int i = 0; i < num_threads_; ++i) {
117 for (int i = 0; i < num_threads_; ++i) {
124 void ThreadPool::Add(grpc_experimental_completion_queue_functor* closure) {
125 AssertHasNotBeenShutDown();
126 queue_->Put(static_cast<void*>(closure));
129 int ThreadPool::num_pending_closures() const { return queue_->count(); }
131 int ThreadPool::pool_capacity() const { return num_threads_; }
133 const Thread::Options& ThreadPool::thread_options() const {
134 return thread_options_;
137 const char* ThreadPool::thread_name() const { return thd_name_; }
138 } // namespace grpc_core