Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / lib / iomgr / executor / mpmcqueue.h
diff --git a/legacy-libs/grpc/deps/grpc/src/core/lib/iomgr/executor/mpmcqueue.h b/legacy-libs/grpc/deps/grpc/src/core/lib/iomgr/executor/mpmcqueue.h
new file mode 100644 (file)
index 0000000..ab5c484
--- /dev/null
@@ -0,0 +1,178 @@
+/*
+ *
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H
+#define GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/debug/stats.h"
+#include "src/core/lib/gprpp/abstract.h"
+#include "src/core/lib/gprpp/atomic.h"
+#include "src/core/lib/gprpp/sync.h"
+
+namespace grpc_core {
+
+extern DebugOnlyTraceFlag grpc_thread_pool_trace;
+
+// Abstract base class of a Multiple-Producer-Multiple-Consumer(MPMC) queue
+// interface
+class MPMCQueueInterface {
+ public:
+  virtual ~MPMCQueueInterface() {}
+
+  // Puts elem into queue immediately at the end of queue.
+  // This might cause to block on full queue depending on implementation.
+  virtual void Put(void* elem) GRPC_ABSTRACT;
+
+  // Removes the oldest element from the queue and return it.
+  // This might cause to block on empty queue depending on implementation.
+  // Optional argument for collecting stats purpose.
+  virtual void* Get(gpr_timespec* wait_time = nullptr) GRPC_ABSTRACT;
+
+  // Returns number of elements in the queue currently
+  virtual int count() const GRPC_ABSTRACT;
+
+  GRPC_ABSTRACT_BASE_CLASS
+};
+
+class InfLenFIFOQueue : public MPMCQueueInterface {
+ public:
+  // Creates a new MPMC Queue. The queue created will have infinite length.
+  InfLenFIFOQueue();
+
+  // Releases all resources held by the queue. The queue must be empty, and no
+  // one waits on conditional variables.
+  ~InfLenFIFOQueue();
+
+  // Puts elem into queue immediately at the end of queue. Since the queue has
+  // infinite length, this routine will never block and should never fail.
+  void Put(void* elem);
+
+  // Removes the oldest element from the queue and returns it.
+  // This routine will cause the thread to block if queue is currently empty.
+  // Argument wait_time should be passed in when trace flag turning on (for
+  // collecting stats info purpose.)
+  void* Get(gpr_timespec* wait_time = nullptr);
+
+  // Returns number of elements in queue currently.
+  // There might be concurrently add/remove on queue, so count might change
+  // quickly.
+  int count() const { return count_.Load(MemoryOrder::RELAXED); }
+
+  struct Node {
+    Node* next;  // Linking
+    Node* prev;
+    void* content;             // Points to actual element
+    gpr_timespec insert_time;  // Time for stats
+
+    Node() {
+      next = prev = nullptr;
+      content = nullptr;
+    }
+  };
+
+  // For test purpose only. Returns number of nodes allocated in queue.
+  // Any allocated node will be alive until the destruction of the queue.
+  int num_nodes() const { return num_nodes_; }
+
+  // For test purpose only. Returns the initial number of nodes in queue.
+  int init_num_nodes() const { return kQueueInitNumNodes; }
+
+ private:
+  // For Internal Use Only.
+  // Removes the oldest element from the queue and returns it. This routine
+  // will NOT check whether queue is empty, and it will NOT acquire mutex.
+  // Caller MUST check that queue is not empty and must acquire mutex before
+  // callling.
+  void* PopFront();
+
+  // Stats of queue. This will only be collect when debug trace mode is on.
+  // All printed stats info will have time measurement in microsecond.
+  struct Stats {
+    uint64_t num_started;    // Number of elements have been added to queue
+    uint64_t num_completed;  // Number of elements have been removed from
+                             // the queue
+    gpr_timespec total_queue_time;  // Total waiting time that all the
+                                    // removed elements have spent in queue
+    gpr_timespec max_queue_time;    // Max waiting time among all removed
+                                    // elements
+    gpr_timespec busy_queue_time;   // Accumulated amount of time that queue
+                                    // was not empty
+
+    Stats() {
+      num_started = 0;
+      num_completed = 0;
+      total_queue_time = gpr_time_0(GPR_TIMESPAN);
+      max_queue_time = gpr_time_0(GPR_TIMESPAN);
+      busy_queue_time = gpr_time_0(GPR_TIMESPAN);
+    }
+  };
+
+  // Node for waiting thread queue. Stands for one waiting thread, should have
+  // exact one thread waiting on its CondVar.
+  // Using a doubly linked list for waiting thread queue to wake up waiting
+  // threads in LIFO order to reduce cache misses.
+  struct Waiter {
+    CondVar cv;
+    Waiter* next;
+    Waiter* prev;
+  };
+
+  // Pushs waiter to the front of queue, require caller held mutex
+  void PushWaiter(Waiter* waiter);
+
+  // Removes waiter from queue, require caller held mutex
+  void RemoveWaiter(Waiter* waiter);
+
+  // Returns pointer to the waiter that should be waken up next, should be the
+  // last added waiter.
+  Waiter* TopWaiter();
+
+  Mutex mu_;        // Protecting lock
+  Waiter waiters_;  // Head of waiting thread queue
+
+  // Initial size for delete list
+  static const int kDeleteListInitSize = 1024;
+  // Initial number of nodes allocated
+  static const int kQueueInitNumNodes = 1024;
+
+  Node** delete_list_ = nullptr;  // Keeps track of all allocated array entries
+                                  // for deleting on destruction
+  size_t delete_list_count_ = 0;  // Number of entries in list
+  size_t delete_list_size_ = 0;   // Size of the list. List will be expanded to
+                                  // double size on full
+
+  Node* queue_head_ = nullptr;  // Head of the queue, remove position
+  Node* queue_tail_ = nullptr;  // End of queue, insert position
+  Atomic<int> count_{0};        // Number of elements in queue
+  int num_nodes_ = 0;           // Number of nodes allocated
+
+  Stats stats_;            // Stats info
+  gpr_timespec busy_time;  // Start time of busy queue
+
+  // Internal Helper.
+  // Allocates an array of nodes of size "num", links all nodes together except
+  // the first node's prev and last node's next. They should be set by caller
+  // manually afterward.
+  Node* AllocateNodes(int num);
+};
+
+}  // namespace grpc_core
+
+#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H */