Built motion from commit 44377920.|2.6.11
[motion2.git] / legacy-libs / grpc-cloned / deps / grpc / src / core / lib / iomgr / call_combiner.h
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #ifndef GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
20 #define GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
21
22 #include <grpc/support/port_platform.h>
23
24 #include <stddef.h>
25
26 #include <grpc/support/atm.h>
27
28 #include "src/core/lib/gpr/mpscq.h"
29 #include "src/core/lib/gprpp/inlined_vector.h"
30 #include "src/core/lib/gprpp/ref_counted.h"
31 #include "src/core/lib/gprpp/ref_counted_ptr.h"
32 #include "src/core/lib/iomgr/closure.h"
33 #include "src/core/lib/iomgr/dynamic_annotations.h"
34
35 // A simple, lock-free mechanism for serializing activity related to a
36 // single call.  This is similar to a combiner but is more lightweight.
37 //
38 // It requires the callback (or, in the common case where the callback
39 // actually kicks off a chain of callbacks, the last callback in that
40 // chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP())
41 // when it is done with the action that was kicked off by the original
42 // callback.
43
44 namespace grpc_core {
45
46 extern DebugOnlyTraceFlag grpc_call_combiner_trace;
47
48 class CallCombiner {
49  public:
50   CallCombiner();
51   ~CallCombiner();
52
53 #ifndef NDEBUG
54 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \
55   (call_combiner)->Start((closure), (error), __FILE__, __LINE__, (reason))
56 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \
57   (call_combiner)->Stop(__FILE__, __LINE__, (reason))
58   /// Starts processing \a closure.
59   void Start(grpc_closure* closure, grpc_error* error, const char* file,
60              int line, const char* reason);
61   /// Yields the call combiner to the next closure in the queue, if any.
62   void Stop(const char* file, int line, const char* reason);
63 #else
64 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \
65   (call_combiner)->Start((closure), (error), (reason))
66 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \
67   (call_combiner)->Stop((reason))
68   /// Starts processing \a closure.
69   void Start(grpc_closure* closure, grpc_error* error, const char* reason);
70   /// Yields the call combiner to the next closure in the queue, if any.
71   void Stop(const char* reason);
72 #endif
73
74   /// Registers \a closure to be invoked when Cancel() is called.
75   ///
76   /// Once a closure is registered, it will always be scheduled exactly
77   /// once; this allows the closure to hold references that will be freed
78   /// regardless of whether or not the call was cancelled.  If a cancellation
79   /// does occur, the closure will be scheduled with the cancellation error;
80   /// otherwise, it will be scheduled with GRPC_ERROR_NONE.
81   ///
82   /// The closure will be scheduled in the following cases:
83   /// - If Cancel() was called prior to registering the closure, it will be
84   ///   scheduled immediately with the cancelation error.
85   /// - If Cancel() is called after registering the closure, the closure will
86   ///   be scheduled with the cancellation error.
87   /// - If SetNotifyOnCancel() is called again to register a new cancellation
88   ///   closure, the previous cancellation closure will be scheduled with
89   ///   GRPC_ERROR_NONE.
90   ///
91   /// If \a closure is NULL, then no closure will be invoked on
92   /// cancellation; this effectively unregisters the previously set closure.
93   /// However, most filters will not need to explicitly unregister their
94   /// callbacks, as this is done automatically when the call is destroyed.
95   /// Filters that schedule the cancellation closure on ExecCtx do not need
96   /// to take a ref on the call stack to guarantee closure liveness. This is
97   /// done by explicitly flushing ExecCtx after the unregistration during
98   /// call destruction.
99   void SetNotifyOnCancel(grpc_closure* closure);
100
101   /// Indicates that the call has been cancelled.
102   void Cancel(grpc_error* error);
103
104  private:
105   void ScheduleClosure(grpc_closure* closure, grpc_error* error);
106 #ifdef GRPC_TSAN_ENABLED
107   static void TsanClosure(void* arg, grpc_error* error);
108 #endif
109
110   gpr_atm size_ = 0;  // size_t, num closures in queue or currently executing
111   gpr_mpscq queue_;
112   // Either 0 (if not cancelled and no cancellation closure set),
113   // a grpc_closure* (if the lowest bit is 0),
114   // or a grpc_error* (if the lowest bit is 1).
115   gpr_atm cancel_state_ = 0;
116 #ifdef GRPC_TSAN_ENABLED
117   // A fake ref-counted lock that is kept alive after the destruction of
118   // grpc_call_combiner, when we are running the original closure.
119   //
120   // Ideally we want to lock and unlock the call combiner as a pointer, when the
121   // callback is called. However, original_closure is free to trigger
122   // anything on the call combiner (including destruction of grpc_call).
123   // Thus, we need a ref-counted structure that can outlive the call combiner.
124   struct TsanLock : public RefCounted<TsanLock, NonPolymorphicRefCount> {
125     TsanLock() { TSAN_ANNOTATE_RWLOCK_CREATE(&taken); }
126     ~TsanLock() { TSAN_ANNOTATE_RWLOCK_DESTROY(&taken); }
127     // To avoid double-locking by the same thread, we should acquire/release
128     // the lock only when taken is false. On each acquire taken must be set to
129     // true.
130     std::atomic<bool> taken{false};
131   };
132   RefCountedPtr<TsanLock> tsan_lock_ = MakeRefCounted<TsanLock>();
133   grpc_closure tsan_closure_;
134   grpc_closure* original_closure_;
135 #endif
136 };
137
138 // Helper for running a list of closures in a call combiner.
139 //
140 // Each callback running in the call combiner will eventually be
141 // returned to the surface, at which point the surface will yield the
142 // call combiner.  So when we are running in the call combiner and have
143 // more than one callback to return to the surface, we need to re-enter
144 // the call combiner for all but one of those callbacks.
145 class CallCombinerClosureList {
146  public:
147   CallCombinerClosureList() {}
148
149   // Adds a closure to the list.  The closure must eventually result in
150   // the call combiner being yielded.
151   void Add(grpc_closure* closure, grpc_error* error, const char* reason) {
152     closures_.emplace_back(closure, error, reason);
153   }
154
155   // Runs all closures in the call combiner and yields the call combiner.
156   //
157   // All but one of the closures in the list will be scheduled via
158   // GRPC_CALL_COMBINER_START(), and the remaining closure will be
159   // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
160   // yielding the call combiner.  If the list is empty, then the call
161   // combiner will be yielded immediately.
162   void RunClosures(CallCombiner* call_combiner) {
163     if (closures_.empty()) {
164       GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
165       return;
166     }
167     for (size_t i = 1; i < closures_.size(); ++i) {
168       auto& closure = closures_[i];
169       GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
170                                closure.reason);
171     }
172     if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
173       gpr_log(GPR_INFO,
174               "CallCombinerClosureList executing closure while already "
175               "holding call_combiner %p: closure=%p error=%s reason=%s",
176               call_combiner, closures_[0].closure,
177               grpc_error_string(closures_[0].error), closures_[0].reason);
178     }
179     // This will release the call combiner.
180     GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error);
181     closures_.clear();
182   }
183
184   // Runs all closures in the call combiner, but does NOT yield the call
185   // combiner.  All closures will be scheduled via GRPC_CALL_COMBINER_START().
186   void RunClosuresWithoutYielding(CallCombiner* call_combiner) {
187     for (size_t i = 0; i < closures_.size(); ++i) {
188       auto& closure = closures_[i];
189       GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
190                                closure.reason);
191     }
192     closures_.clear();
193   }
194
195   size_t size() const { return closures_.size(); }
196
197  private:
198   struct CallCombinerClosure {
199     grpc_closure* closure;
200     grpc_error* error;
201     const char* reason;
202
203     CallCombinerClosure(grpc_closure* closure, grpc_error* error,
204                         const char* reason)
205         : closure(closure), error(error), reason(reason) {}
206   };
207
208   // There are generally a maximum of 6 closures to run in the call
209   // combiner, one for each pending op.
210   InlinedVector<CallCombinerClosure, 6> closures_;
211 };
212
213 }  // namespace grpc_core
214
215 #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */