3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
20 #define GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
22 #include <grpc/support/port_platform.h>
26 #include <grpc/support/atm.h>
28 #include "src/core/lib/gpr/mpscq.h"
29 #include "src/core/lib/gprpp/inlined_vector.h"
30 #include "src/core/lib/gprpp/ref_counted.h"
31 #include "src/core/lib/gprpp/ref_counted_ptr.h"
32 #include "src/core/lib/iomgr/closure.h"
33 #include "src/core/lib/iomgr/dynamic_annotations.h"
35 // A simple, lock-free mechanism for serializing activity related to a
36 // single call. This is similar to a combiner but is more lightweight.
38 // It requires the callback (or, in the common case where the callback
39 // actually kicks off a chain of callbacks, the last callback in that
40 // chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP())
41 // when it is done with the action that was kicked off by the original
46 extern DebugOnlyTraceFlag grpc_call_combiner_trace;
54 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \
55 (call_combiner)->Start((closure), (error), __FILE__, __LINE__, (reason))
56 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \
57 (call_combiner)->Stop(__FILE__, __LINE__, (reason))
58 /// Starts processing \a closure.
59 void Start(grpc_closure* closure, grpc_error* error, const char* file,
60 int line, const char* reason);
61 /// Yields the call combiner to the next closure in the queue, if any.
62 void Stop(const char* file, int line, const char* reason);
64 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \
65 (call_combiner)->Start((closure), (error), (reason))
66 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \
67 (call_combiner)->Stop((reason))
68 /// Starts processing \a closure.
69 void Start(grpc_closure* closure, grpc_error* error, const char* reason);
70 /// Yields the call combiner to the next closure in the queue, if any.
71 void Stop(const char* reason);
74 /// Registers \a closure to be invoked when Cancel() is called.
76 /// Once a closure is registered, it will always be scheduled exactly
77 /// once; this allows the closure to hold references that will be freed
78 /// regardless of whether or not the call was cancelled. If a cancellation
79 /// does occur, the closure will be scheduled with the cancellation error;
80 /// otherwise, it will be scheduled with GRPC_ERROR_NONE.
82 /// The closure will be scheduled in the following cases:
83 /// - If Cancel() was called prior to registering the closure, it will be
84 /// scheduled immediately with the cancelation error.
85 /// - If Cancel() is called after registering the closure, the closure will
86 /// be scheduled with the cancellation error.
87 /// - If SetNotifyOnCancel() is called again to register a new cancellation
88 /// closure, the previous cancellation closure will be scheduled with
91 /// If \a closure is NULL, then no closure will be invoked on
92 /// cancellation; this effectively unregisters the previously set closure.
93 /// However, most filters will not need to explicitly unregister their
94 /// callbacks, as this is done automatically when the call is destroyed.
95 /// Filters that schedule the cancellation closure on ExecCtx do not need
96 /// to take a ref on the call stack to guarantee closure liveness. This is
97 /// done by explicitly flushing ExecCtx after the unregistration during
99 void SetNotifyOnCancel(grpc_closure* closure);
101 /// Indicates that the call has been cancelled.
102 void Cancel(grpc_error* error);
105 void ScheduleClosure(grpc_closure* closure, grpc_error* error);
106 #ifdef GRPC_TSAN_ENABLED
107 static void TsanClosure(void* arg, grpc_error* error);
110 gpr_atm size_ = 0; // size_t, num closures in queue or currently executing
112 // Either 0 (if not cancelled and no cancellation closure set),
113 // a grpc_closure* (if the lowest bit is 0),
114 // or a grpc_error* (if the lowest bit is 1).
115 gpr_atm cancel_state_ = 0;
116 #ifdef GRPC_TSAN_ENABLED
117 // A fake ref-counted lock that is kept alive after the destruction of
118 // grpc_call_combiner, when we are running the original closure.
120 // Ideally we want to lock and unlock the call combiner as a pointer, when the
121 // callback is called. However, original_closure is free to trigger
122 // anything on the call combiner (including destruction of grpc_call).
123 // Thus, we need a ref-counted structure that can outlive the call combiner.
124 struct TsanLock : public RefCounted<TsanLock, NonPolymorphicRefCount> {
125 TsanLock() { TSAN_ANNOTATE_RWLOCK_CREATE(&taken); }
126 ~TsanLock() { TSAN_ANNOTATE_RWLOCK_DESTROY(&taken); }
127 // To avoid double-locking by the same thread, we should acquire/release
128 // the lock only when taken is false. On each acquire taken must be set to
130 std::atomic<bool> taken{false};
132 RefCountedPtr<TsanLock> tsan_lock_ = MakeRefCounted<TsanLock>();
133 grpc_closure tsan_closure_;
134 grpc_closure* original_closure_;
138 // Helper for running a list of closures in a call combiner.
140 // Each callback running in the call combiner will eventually be
141 // returned to the surface, at which point the surface will yield the
142 // call combiner. So when we are running in the call combiner and have
143 // more than one callback to return to the surface, we need to re-enter
144 // the call combiner for all but one of those callbacks.
145 class CallCombinerClosureList {
147 CallCombinerClosureList() {}
149 // Adds a closure to the list. The closure must eventually result in
150 // the call combiner being yielded.
151 void Add(grpc_closure* closure, grpc_error* error, const char* reason) {
152 closures_.emplace_back(closure, error, reason);
155 // Runs all closures in the call combiner and yields the call combiner.
157 // All but one of the closures in the list will be scheduled via
158 // GRPC_CALL_COMBINER_START(), and the remaining closure will be
159 // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
160 // yielding the call combiner. If the list is empty, then the call
161 // combiner will be yielded immediately.
162 void RunClosures(CallCombiner* call_combiner) {
163 if (closures_.empty()) {
164 GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
167 for (size_t i = 1; i < closures_.size(); ++i) {
168 auto& closure = closures_[i];
169 GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
172 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
174 "CallCombinerClosureList executing closure while already "
175 "holding call_combiner %p: closure=%p error=%s reason=%s",
176 call_combiner, closures_[0].closure,
177 grpc_error_string(closures_[0].error), closures_[0].reason);
179 // This will release the call combiner.
180 GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error);
184 // Runs all closures in the call combiner, but does NOT yield the call
185 // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START().
186 void RunClosuresWithoutYielding(CallCombiner* call_combiner) {
187 for (size_t i = 0; i < closures_.size(); ++i) {
188 auto& closure = closures_[i];
189 GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
195 size_t size() const { return closures_.size(); }
198 struct CallCombinerClosure {
199 grpc_closure* closure;
203 CallCombinerClosure(grpc_closure* closure, grpc_error* error,
205 : closure(closure), error(error), reason(reason) {}
208 // There are generally a maximum of 6 closures to run in the call
209 // combiner, one for each pending op.
210 InlinedVector<CallCombinerClosure, 6> closures_;
213 } // namespace grpc_core
215 #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */