Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / lib / iomgr / executor / threadpool.cc
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/executor/threadpool.h"
22
23 namespace grpc_core {
24
25 void ThreadPoolWorker::Run() {
26   while (true) {
27     void* elem;
28
29     if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
30       // Updates stats and print
31       gpr_timespec wait_time = gpr_time_0(GPR_TIMESPAN);
32       elem = queue_->Get(&wait_time);
33       stats_.sleep_time = gpr_time_add(stats_.sleep_time, wait_time);
34       gpr_log(GPR_INFO,
35               "ThreadPool Worker [%s %d] Stats:  sleep_time          %f",
36               thd_name_, index_, gpr_timespec_to_micros(stats_.sleep_time));
37     } else {
38       elem = queue_->Get(nullptr);
39     }
40     if (elem == nullptr) {
41       break;
42     }
43     // Runs closure
44     auto* closure =
45         static_cast<grpc_experimental_completion_queue_functor*>(elem);
46     closure->functor_run(closure, closure->internal_success);
47   }
48 }
49
50 void ThreadPool::SharedThreadPoolConstructor() {
51   // All worker threads in thread pool must be joinable.
52   thread_options_.set_joinable(true);
53
54   // Create at least 1 worker thread.
55   if (num_threads_ <= 0) num_threads_ = 1;
56
57   queue_ = New<InfLenFIFOQueue>();
58   threads_ = static_cast<ThreadPoolWorker**>(
59       gpr_zalloc(num_threads_ * sizeof(ThreadPoolWorker*)));
60   for (int i = 0; i < num_threads_; ++i) {
61     threads_[i] =
62         New<ThreadPoolWorker>(thd_name_, this, queue_, thread_options_, i);
63     threads_[i]->Start();
64   }
65 }
66
67 size_t ThreadPool::DefaultStackSize() {
68 #if defined(__ANDROID__) || defined(__APPLE__)
69   return 1952 * 1024;
70 #else
71   return 64 * 1024;
72 #endif
73 }
74
75 void ThreadPool::AssertHasNotBeenShutDown() {
76   // For debug checking purpose, using RELAXED order is sufficient.
77   GPR_DEBUG_ASSERT(!shut_down_.Load(MemoryOrder::RELAXED));
78 }
79
80 ThreadPool::ThreadPool(int num_threads) : num_threads_(num_threads) {
81   thd_name_ = "ThreadPoolWorker";
82   thread_options_ = Thread::Options();
83   thread_options_.set_stack_size(DefaultStackSize());
84   SharedThreadPoolConstructor();
85 }
86
87 ThreadPool::ThreadPool(int num_threads, const char* thd_name)
88     : num_threads_(num_threads), thd_name_(thd_name) {
89   thread_options_ = Thread::Options();
90   thread_options_.set_stack_size(DefaultStackSize());
91   SharedThreadPoolConstructor();
92 }
93
94 ThreadPool::ThreadPool(int num_threads, const char* thd_name,
95                        const Thread::Options& thread_options)
96     : num_threads_(num_threads),
97       thd_name_(thd_name),
98       thread_options_(thread_options) {
99   if (thread_options_.stack_size() == 0) {
100     thread_options_.set_stack_size(DefaultStackSize());
101   }
102   SharedThreadPoolConstructor();
103 }
104
105 ThreadPool::~ThreadPool() {
106   // For debug checking purpose, using RELAXED order is sufficient.
107   shut_down_.Store(true, MemoryOrder::RELAXED);
108
109   for (int i = 0; i < num_threads_; ++i) {
110     queue_->Put(nullptr);
111   }
112
113   for (int i = 0; i < num_threads_; ++i) {
114     threads_[i]->Join();
115   }
116
117   for (int i = 0; i < num_threads_; ++i) {
118     Delete(threads_[i]);
119   }
120   gpr_free(threads_);
121   Delete(queue_);
122 }
123
124 void ThreadPool::Add(grpc_experimental_completion_queue_functor* closure) {
125   AssertHasNotBeenShutDown();
126   queue_->Put(static_cast<void*>(closure));
127 }
128
129 int ThreadPool::num_pending_closures() const { return queue_->count(); }
130
131 int ThreadPool::pool_capacity() const { return num_threads_; }
132
133 const Thread::Options& ThreadPool::thread_options() const {
134   return thread_options_;
135 }
136
137 const char* ThreadPool::thread_name() const { return thd_name_; }
138 }  // namespace grpc_core