--- /dev/null
+/*
+ *
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H
+#define GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/debug/stats.h"
+#include "src/core/lib/gprpp/abstract.h"
+#include "src/core/lib/gprpp/atomic.h"
+#include "src/core/lib/gprpp/sync.h"
+
+namespace grpc_core {
+
+extern DebugOnlyTraceFlag grpc_thread_pool_trace;
+
+// Abstract base class of a Multiple-Producer-Multiple-Consumer(MPMC) queue
+// interface
+class MPMCQueueInterface {
+ public:
+ virtual ~MPMCQueueInterface() {}
+
+ // Puts elem into queue immediately at the end of queue.
+ // This might cause to block on full queue depending on implementation.
+ virtual void Put(void* elem) GRPC_ABSTRACT;
+
+ // Removes the oldest element from the queue and return it.
+ // This might cause to block on empty queue depending on implementation.
+ // Optional argument for collecting stats purpose.
+ virtual void* Get(gpr_timespec* wait_time = nullptr) GRPC_ABSTRACT;
+
+ // Returns number of elements in the queue currently
+ virtual int count() const GRPC_ABSTRACT;
+
+ GRPC_ABSTRACT_BASE_CLASS
+};
+
+class InfLenFIFOQueue : public MPMCQueueInterface {
+ public:
+ // Creates a new MPMC Queue. The queue created will have infinite length.
+ InfLenFIFOQueue();
+
+ // Releases all resources held by the queue. The queue must be empty, and no
+ // one waits on conditional variables.
+ ~InfLenFIFOQueue();
+
+ // Puts elem into queue immediately at the end of queue. Since the queue has
+ // infinite length, this routine will never block and should never fail.
+ void Put(void* elem);
+
+ // Removes the oldest element from the queue and returns it.
+ // This routine will cause the thread to block if queue is currently empty.
+ // Argument wait_time should be passed in when trace flag turning on (for
+ // collecting stats info purpose.)
+ void* Get(gpr_timespec* wait_time = nullptr);
+
+ // Returns number of elements in queue currently.
+ // There might be concurrently add/remove on queue, so count might change
+ // quickly.
+ int count() const { return count_.Load(MemoryOrder::RELAXED); }
+
+ struct Node {
+ Node* next; // Linking
+ Node* prev;
+ void* content; // Points to actual element
+ gpr_timespec insert_time; // Time for stats
+
+ Node() {
+ next = prev = nullptr;
+ content = nullptr;
+ }
+ };
+
+ // For test purpose only. Returns number of nodes allocated in queue.
+ // Any allocated node will be alive until the destruction of the queue.
+ int num_nodes() const { return num_nodes_; }
+
+ // For test purpose only. Returns the initial number of nodes in queue.
+ int init_num_nodes() const { return kQueueInitNumNodes; }
+
+ private:
+ // For Internal Use Only.
+ // Removes the oldest element from the queue and returns it. This routine
+ // will NOT check whether queue is empty, and it will NOT acquire mutex.
+ // Caller MUST check that queue is not empty and must acquire mutex before
+ // callling.
+ void* PopFront();
+
+ // Stats of queue. This will only be collect when debug trace mode is on.
+ // All printed stats info will have time measurement in microsecond.
+ struct Stats {
+ uint64_t num_started; // Number of elements have been added to queue
+ uint64_t num_completed; // Number of elements have been removed from
+ // the queue
+ gpr_timespec total_queue_time; // Total waiting time that all the
+ // removed elements have spent in queue
+ gpr_timespec max_queue_time; // Max waiting time among all removed
+ // elements
+ gpr_timespec busy_queue_time; // Accumulated amount of time that queue
+ // was not empty
+
+ Stats() {
+ num_started = 0;
+ num_completed = 0;
+ total_queue_time = gpr_time_0(GPR_TIMESPAN);
+ max_queue_time = gpr_time_0(GPR_TIMESPAN);
+ busy_queue_time = gpr_time_0(GPR_TIMESPAN);
+ }
+ };
+
+ // Node for waiting thread queue. Stands for one waiting thread, should have
+ // exact one thread waiting on its CondVar.
+ // Using a doubly linked list for waiting thread queue to wake up waiting
+ // threads in LIFO order to reduce cache misses.
+ struct Waiter {
+ CondVar cv;
+ Waiter* next;
+ Waiter* prev;
+ };
+
+ // Pushs waiter to the front of queue, require caller held mutex
+ void PushWaiter(Waiter* waiter);
+
+ // Removes waiter from queue, require caller held mutex
+ void RemoveWaiter(Waiter* waiter);
+
+ // Returns pointer to the waiter that should be waken up next, should be the
+ // last added waiter.
+ Waiter* TopWaiter();
+
+ Mutex mu_; // Protecting lock
+ Waiter waiters_; // Head of waiting thread queue
+
+ // Initial size for delete list
+ static const int kDeleteListInitSize = 1024;
+ // Initial number of nodes allocated
+ static const int kQueueInitNumNodes = 1024;
+
+ Node** delete_list_ = nullptr; // Keeps track of all allocated array entries
+ // for deleting on destruction
+ size_t delete_list_count_ = 0; // Number of entries in list
+ size_t delete_list_size_ = 0; // Size of the list. List will be expanded to
+ // double size on full
+
+ Node* queue_head_ = nullptr; // Head of the queue, remove position
+ Node* queue_tail_ = nullptr; // End of queue, insert position
+ Atomic<int> count_{0}; // Number of elements in queue
+ int num_nodes_ = 0; // Number of nodes allocated
+
+ Stats stats_; // Stats info
+ gpr_timespec busy_time; // Start time of busy queue
+
+ // Internal Helper.
+ // Allocates an array of nodes of size "num", links all nodes together except
+ // the first node's prev and last node's next. They should be set by caller
+ // manually afterward.
+ Node* AllocateNodes(int num);
+};
+
+} // namespace grpc_core
+
+#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H */