Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / lib / iomgr / executor / mpmcqueue.h
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H
20 #define GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H
21
22 #include <grpc/support/port_platform.h>
23
24 #include "src/core/lib/debug/stats.h"
25 #include "src/core/lib/gprpp/abstract.h"
26 #include "src/core/lib/gprpp/atomic.h"
27 #include "src/core/lib/gprpp/sync.h"
28
29 namespace grpc_core {
30
31 extern DebugOnlyTraceFlag grpc_thread_pool_trace;
32
33 // Abstract base class of a Multiple-Producer-Multiple-Consumer(MPMC) queue
34 // interface
35 class MPMCQueueInterface {
36  public:
37   virtual ~MPMCQueueInterface() {}
38
39   // Puts elem into queue immediately at the end of queue.
40   // This might cause to block on full queue depending on implementation.
41   virtual void Put(void* elem) GRPC_ABSTRACT;
42
43   // Removes the oldest element from the queue and return it.
44   // This might cause to block on empty queue depending on implementation.
45   // Optional argument for collecting stats purpose.
46   virtual void* Get(gpr_timespec* wait_time = nullptr) GRPC_ABSTRACT;
47
48   // Returns number of elements in the queue currently
49   virtual int count() const GRPC_ABSTRACT;
50
51   GRPC_ABSTRACT_BASE_CLASS
52 };
53
54 class InfLenFIFOQueue : public MPMCQueueInterface {
55  public:
56   // Creates a new MPMC Queue. The queue created will have infinite length.
57   InfLenFIFOQueue();
58
59   // Releases all resources held by the queue. The queue must be empty, and no
60   // one waits on conditional variables.
61   ~InfLenFIFOQueue();
62
63   // Puts elem into queue immediately at the end of queue. Since the queue has
64   // infinite length, this routine will never block and should never fail.
65   void Put(void* elem);
66
67   // Removes the oldest element from the queue and returns it.
68   // This routine will cause the thread to block if queue is currently empty.
69   // Argument wait_time should be passed in when trace flag turning on (for
70   // collecting stats info purpose.)
71   void* Get(gpr_timespec* wait_time = nullptr);
72
73   // Returns number of elements in queue currently.
74   // There might be concurrently add/remove on queue, so count might change
75   // quickly.
76   int count() const { return count_.Load(MemoryOrder::RELAXED); }
77
78   struct Node {
79     Node* next;  // Linking
80     Node* prev;
81     void* content;             // Points to actual element
82     gpr_timespec insert_time;  // Time for stats
83
84     Node() {
85       next = prev = nullptr;
86       content = nullptr;
87     }
88   };
89
90   // For test purpose only. Returns number of nodes allocated in queue.
91   // Any allocated node will be alive until the destruction of the queue.
92   int num_nodes() const { return num_nodes_; }
93
94   // For test purpose only. Returns the initial number of nodes in queue.
95   int init_num_nodes() const { return kQueueInitNumNodes; }
96
97  private:
98   // For Internal Use Only.
99   // Removes the oldest element from the queue and returns it. This routine
100   // will NOT check whether queue is empty, and it will NOT acquire mutex.
101   // Caller MUST check that queue is not empty and must acquire mutex before
102   // callling.
103   void* PopFront();
104
105   // Stats of queue. This will only be collect when debug trace mode is on.
106   // All printed stats info will have time measurement in microsecond.
107   struct Stats {
108     uint64_t num_started;    // Number of elements have been added to queue
109     uint64_t num_completed;  // Number of elements have been removed from
110                              // the queue
111     gpr_timespec total_queue_time;  // Total waiting time that all the
112                                     // removed elements have spent in queue
113     gpr_timespec max_queue_time;    // Max waiting time among all removed
114                                     // elements
115     gpr_timespec busy_queue_time;   // Accumulated amount of time that queue
116                                     // was not empty
117
118     Stats() {
119       num_started = 0;
120       num_completed = 0;
121       total_queue_time = gpr_time_0(GPR_TIMESPAN);
122       max_queue_time = gpr_time_0(GPR_TIMESPAN);
123       busy_queue_time = gpr_time_0(GPR_TIMESPAN);
124     }
125   };
126
127   // Node for waiting thread queue. Stands for one waiting thread, should have
128   // exact one thread waiting on its CondVar.
129   // Using a doubly linked list for waiting thread queue to wake up waiting
130   // threads in LIFO order to reduce cache misses.
131   struct Waiter {
132     CondVar cv;
133     Waiter* next;
134     Waiter* prev;
135   };
136
137   // Pushs waiter to the front of queue, require caller held mutex
138   void PushWaiter(Waiter* waiter);
139
140   // Removes waiter from queue, require caller held mutex
141   void RemoveWaiter(Waiter* waiter);
142
143   // Returns pointer to the waiter that should be waken up next, should be the
144   // last added waiter.
145   Waiter* TopWaiter();
146
147   Mutex mu_;        // Protecting lock
148   Waiter waiters_;  // Head of waiting thread queue
149
150   // Initial size for delete list
151   static const int kDeleteListInitSize = 1024;
152   // Initial number of nodes allocated
153   static const int kQueueInitNumNodes = 1024;
154
155   Node** delete_list_ = nullptr;  // Keeps track of all allocated array entries
156                                   // for deleting on destruction
157   size_t delete_list_count_ = 0;  // Number of entries in list
158   size_t delete_list_size_ = 0;   // Size of the list. List will be expanded to
159                                   // double size on full
160
161   Node* queue_head_ = nullptr;  // Head of the queue, remove position
162   Node* queue_tail_ = nullptr;  // End of queue, insert position
163   Atomic<int> count_{0};        // Number of elements in queue
164   int num_nodes_ = 0;           // Number of nodes allocated
165
166   Stats stats_;            // Stats info
167   gpr_timespec busy_time;  // Start time of busy queue
168
169   // Internal Helper.
170   // Allocates an array of nodes of size "num", links all nodes together except
171   // the first node's prev and last node's next. They should be set by caller
172   // manually afterward.
173   Node* AllocateNodes(int num);
174 };
175
176 }  // namespace grpc_core
177
178 #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H */