3 * Copyright 2019 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H
20 #define GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H
22 #include <grpc/support/port_platform.h>
24 #include "src/core/lib/debug/stats.h"
25 #include "src/core/lib/gprpp/abstract.h"
26 #include "src/core/lib/gprpp/atomic.h"
27 #include "src/core/lib/gprpp/sync.h"
31 extern DebugOnlyTraceFlag grpc_thread_pool_trace;
33 // Abstract base class of a Multiple-Producer-Multiple-Consumer(MPMC) queue
35 class MPMCQueueInterface {
37 virtual ~MPMCQueueInterface() {}
39 // Puts elem into queue immediately at the end of queue.
40 // This might cause to block on full queue depending on implementation.
41 virtual void Put(void* elem) GRPC_ABSTRACT;
43 // Removes the oldest element from the queue and return it.
44 // This might cause to block on empty queue depending on implementation.
45 // Optional argument for collecting stats purpose.
46 virtual void* Get(gpr_timespec* wait_time = nullptr) GRPC_ABSTRACT;
48 // Returns number of elements in the queue currently
49 virtual int count() const GRPC_ABSTRACT;
51 GRPC_ABSTRACT_BASE_CLASS
54 class InfLenFIFOQueue : public MPMCQueueInterface {
56 // Creates a new MPMC Queue. The queue created will have infinite length.
59 // Releases all resources held by the queue. The queue must be empty, and no
60 // one waits on conditional variables.
63 // Puts elem into queue immediately at the end of queue. Since the queue has
64 // infinite length, this routine will never block and should never fail.
67 // Removes the oldest element from the queue and returns it.
68 // This routine will cause the thread to block if queue is currently empty.
69 // Argument wait_time should be passed in when trace flag turning on (for
70 // collecting stats info purpose.)
71 void* Get(gpr_timespec* wait_time = nullptr);
73 // Returns number of elements in queue currently.
74 // There might be concurrently add/remove on queue, so count might change
76 int count() const { return count_.Load(MemoryOrder::RELAXED); }
79 Node* next; // Linking
81 void* content; // Points to actual element
82 gpr_timespec insert_time; // Time for stats
85 next = prev = nullptr;
90 // For test purpose only. Returns number of nodes allocated in queue.
91 // Any allocated node will be alive until the destruction of the queue.
92 int num_nodes() const { return num_nodes_; }
94 // For test purpose only. Returns the initial number of nodes in queue.
95 int init_num_nodes() const { return kQueueInitNumNodes; }
98 // For Internal Use Only.
99 // Removes the oldest element from the queue and returns it. This routine
100 // will NOT check whether queue is empty, and it will NOT acquire mutex.
101 // Caller MUST check that queue is not empty and must acquire mutex before
105 // Stats of queue. This will only be collect when debug trace mode is on.
106 // All printed stats info will have time measurement in microsecond.
108 uint64_t num_started; // Number of elements have been added to queue
109 uint64_t num_completed; // Number of elements have been removed from
111 gpr_timespec total_queue_time; // Total waiting time that all the
112 // removed elements have spent in queue
113 gpr_timespec max_queue_time; // Max waiting time among all removed
115 gpr_timespec busy_queue_time; // Accumulated amount of time that queue
121 total_queue_time = gpr_time_0(GPR_TIMESPAN);
122 max_queue_time = gpr_time_0(GPR_TIMESPAN);
123 busy_queue_time = gpr_time_0(GPR_TIMESPAN);
127 // Node for waiting thread queue. Stands for one waiting thread, should have
128 // exact one thread waiting on its CondVar.
129 // Using a doubly linked list for waiting thread queue to wake up waiting
130 // threads in LIFO order to reduce cache misses.
137 // Pushs waiter to the front of queue, require caller held mutex
138 void PushWaiter(Waiter* waiter);
140 // Removes waiter from queue, require caller held mutex
141 void RemoveWaiter(Waiter* waiter);
143 // Returns pointer to the waiter that should be waken up next, should be the
144 // last added waiter.
147 Mutex mu_; // Protecting lock
148 Waiter waiters_; // Head of waiting thread queue
150 // Initial size for delete list
151 static const int kDeleteListInitSize = 1024;
152 // Initial number of nodes allocated
153 static const int kQueueInitNumNodes = 1024;
155 Node** delete_list_ = nullptr; // Keeps track of all allocated array entries
156 // for deleting on destruction
157 size_t delete_list_count_ = 0; // Number of entries in list
158 size_t delete_list_size_ = 0; // Size of the list. List will be expanded to
159 // double size on full
161 Node* queue_head_ = nullptr; // Head of the queue, remove position
162 Node* queue_tail_ = nullptr; // End of queue, insert position
163 Atomic<int> count_{0}; // Number of elements in queue
164 int num_nodes_ = 0; // Number of nodes allocated
166 Stats stats_; // Stats info
167 gpr_timespec busy_time; // Start time of busy queue
170 // Allocates an array of nodes of size "num", links all nodes together except
171 // the first node's prev and last node's next. They should be set by caller
172 // manually afterward.
173 Node* AllocateNodes(int num);
176 } // namespace grpc_core
178 #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H */