Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc-cloned / deps / grpc / src / core / ext / filters / client_idle / client_idle_filter.cc
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <limits.h>
22
23 #include "src/core/lib/channel/channel_args.h"
24 #include "src/core/lib/channel/channel_stack_builder.h"
25 #include "src/core/lib/gprpp/atomic.h"
26 #include "src/core/lib/iomgr/timer.h"
27 #include "src/core/lib/surface/channel_init.h"
28 #include "src/core/lib/transport/http2_errors.h"
29
30 // The idle filter is enabled in client channel by default.
31 // Set GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS to [1000, INT_MAX) in channel args to
32 // configure the idle timeout.
33 #define DEFAULT_IDLE_TIMEOUT_MS (30 /*minutes*/ * 60 * 1000)
34 // The user input idle timeout smaller than this would be capped to it.
35 #define MIN_IDLE_TIMEOUT_MS (1 /*second*/ * 1000)
36
37 namespace grpc_core {
38
39 TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter");
40
41 #define GRPC_IDLE_FILTER_LOG(format, ...)                               \
42   do {                                                                  \
43     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) {       \
44       gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \
45     }                                                                   \
46   } while (0)
47
48 namespace {
49
50 /*
51   client_idle_filter maintains a state tracking if there are active calls in the
52   channel and its internal idle_timer_. The states are specified as following:
53
54   +--------------------------------------------+-------------+---------+
55   |               ChannelState                 | idle_timer_ | channel |
56   +--------------------------------------------+-------------+---------+
57   | IDLE                                       | unset       | idle    |
58   | CALLS_ACTIVE                               | unset       | busy    |
59   | TIMER_PENDING                              | set-valid   | idle    |
60   | TIMER_PENDING_CALLS_ACTIVE                 | set-invalid | busy    |
61   | TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START | set-invalid | idle    |
62   +--------------------------------------------+-------------+---------+
63
64   IDLE: The initial state of the client_idle_filter, indicating the channel is
65   in IDLE.
66
67   CALLS_ACTIVE: The channel has 1 or 1+ active calls and the timer is not set.
68
69   TIMER_PENDING: The state after the timer is set and no calls have arrived
70   after the timer is set. The channel must have 0 active call in this state. If
71   the timer is fired in this state, the channel will go into IDLE state.
72
73   TIMER_PENDING_CALLS_ACTIVE: The state after the timer is set and at least one
74   call has arrived after the timer is set. The channel must have 1 or 1+ active
75   calls in this state. If the timer is fired in this state, we won't reschedule
76   it.
77
78   TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START: The state after the timer is set
79   and at least one call has arrived after the timer is set, BUT the channel
80   currently has 0 active call. If the timer is fired in this state, we will
81   reschedule it according to the finish time of the latest call.
82
83   PROCESSING: The state set to block other threads when the setting thread is
84   doing some work to keep state consistency.
85
86   idle_timer_ will not be cancelled (unless the channel is shutting down).
87   If the timer callback is called when the idle_timer_ is valid (i.e. idle_state
88   is TIMER_PENDING), the channel will enter IDLE, otherwise the channel won't be
89   changed.
90
91   State transitions:
92                                             IDLE
93                                             |  ^
94             ---------------------------------  *
95             |                                  *
96             v                                  *
97       CALLS_ACTIVE =================> TIMER_PENDING
98             ^                               |  ^
99             *  ------------------------------  *
100             *  |                               *
101             *  v                               *
102 TIMER_PENDING_CALLS_ACTIVE ===> TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START
103             ^                               |
104             |                               |
105             ---------------------------------
106
107   ---> Triggered by IncreaseCallCount()
108   ===> Triggered by DecreaseCallCount()
109   ***> Triggered by IdleTimerCallback()
110 */
111 enum ChannelState {
112   IDLE,
113   CALLS_ACTIVE,
114   TIMER_PENDING,
115   TIMER_PENDING_CALLS_ACTIVE,
116   TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START,
117   PROCESSING
118 };
119
120 grpc_millis GetClientIdleTimeout(const grpc_channel_args* args) {
121   return GPR_MAX(
122       grpc_channel_arg_get_integer(
123           grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS),
124           {DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX}),
125       MIN_IDLE_TIMEOUT_MS);
126 }
127
128 class ChannelData {
129  public:
130   static grpc_error* Init(grpc_channel_element* elem,
131                           grpc_channel_element_args* args);
132   static void Destroy(grpc_channel_element* elem);
133
134   static void StartTransportOp(grpc_channel_element* elem,
135                                grpc_transport_op* op);
136
137   void IncreaseCallCount();
138
139   void DecreaseCallCount();
140
141  private:
142   ChannelData(grpc_channel_element* elem, grpc_channel_element_args* args,
143               grpc_error** error);
144   ~ChannelData() = default;
145
146   static void IdleTimerCallback(void* arg, grpc_error* error);
147   static void IdleTransportOpCompleteCallback(void* arg, grpc_error* error);
148
149   void StartIdleTimer();
150
151   void EnterIdle();
152
153   grpc_channel_element* elem_;
154   // The channel stack to which we take refs for pending callbacks.
155   grpc_channel_stack* channel_stack_;
156   // Timeout after the last RPC finishes on the client channel at which the
157   // channel goes back into IDLE state.
158   const grpc_millis client_idle_timeout_;
159
160   // Member data used to track the state of channel.
161   grpc_millis last_idle_time_;
162   Atomic<intptr_t> call_count_{0};
163   Atomic<ChannelState> state_{IDLE};
164
165   // Idle timer and its callback closure.
166   grpc_timer idle_timer_;
167   grpc_closure idle_timer_callback_;
168
169   // The transport op telling the client channel to enter IDLE.
170   grpc_transport_op idle_transport_op_;
171   grpc_closure idle_transport_op_complete_callback_;
172 };
173
174 grpc_error* ChannelData::Init(grpc_channel_element* elem,
175                               grpc_channel_element_args* args) {
176   grpc_error* error = GRPC_ERROR_NONE;
177   new (elem->channel_data) ChannelData(elem, args, &error);
178   return error;
179 }
180
181 void ChannelData::Destroy(grpc_channel_element* elem) {
182   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
183   chand->~ChannelData();
184 }
185
186 void ChannelData::StartTransportOp(grpc_channel_element* elem,
187                                    grpc_transport_op* op) {
188   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
189   // Catch the disconnect_with_error transport op.
190   if (op->disconnect_with_error != nullptr) {
191     // IncreaseCallCount() introduces a dummy call and prevent the timer from
192     // being reset by other threads.
193     chand->IncreaseCallCount();
194     // If the timer has been set, cancel the timer.
195     // No synchronization issues here. grpc_timer_cancel() is valid as long as
196     // the timer has been init()ed before.
197     grpc_timer_cancel(&chand->idle_timer_);
198   }
199   // Pass the op to the next filter.
200   grpc_channel_next_op(elem, op);
201 }
202
203 void ChannelData::IncreaseCallCount() {
204   const intptr_t previous_value = call_count_.FetchAdd(1, MemoryOrder::RELAXED);
205   GRPC_IDLE_FILTER_LOG("call counter has increased to %" PRIuPTR,
206                        previous_value + 1);
207   if (previous_value == 0) {
208     // This call is the one that makes the channel busy.
209     // Loop here to make sure the previous decrease operation has finished.
210     ChannelState state = state_.Load(MemoryOrder::RELAXED);
211     while (true) {
212       switch (state) {
213         // Timer has not been set. Switch to CALLS_ACTIVE.
214         case IDLE:
215           // In this case, no other threads will modify the state, so we can
216           // just store the value.
217           state_.Store(CALLS_ACTIVE, MemoryOrder::RELAXED);
218           return;
219         // Timer has been set. Switch to TIMER_PENDING_CALLS_ACTIVE.
220         case TIMER_PENDING:
221         case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START:
222           // At this point, the state may have been switched to IDLE by the
223           // idle timer callback. Therefore, use CAS operation to change the
224           // state atomically.
225           // Use MemoryOrder::ACQUIRE on success to ensure last_idle_time_ has
226           // been properly set in DecreaseCallCount().
227           if (state_.CompareExchangeWeak(&state, TIMER_PENDING_CALLS_ACTIVE,
228                                          MemoryOrder::ACQUIRE,
229                                          MemoryOrder::RELAXED)) {
230             return;
231           }
232           break;
233         default:
234           // The state has not been switched to desired value yet, try again.
235           state = state_.Load(MemoryOrder::RELAXED);
236           break;
237       }
238     }
239   }
240 }
241
242 void ChannelData::DecreaseCallCount() {
243   const intptr_t previous_value = call_count_.FetchSub(1, MemoryOrder::RELAXED);
244   GRPC_IDLE_FILTER_LOG("call counter has decreased to %" PRIuPTR,
245                        previous_value - 1);
246   if (previous_value == 1) {
247     // This call is the one that makes the channel idle.
248     // last_idle_time_ does not need to be Atomic<> because busy-loops in
249     // IncreaseCallCount(), DecreaseCallCount() and IdleTimerCallback() will
250     // prevent multiple threads from simultaneously accessing this variable.
251     last_idle_time_ = ExecCtx::Get()->Now();
252     ChannelState state = state_.Load(MemoryOrder::RELAXED);
253     while (true) {
254       switch (state) {
255         // Timer has not been set. Set the timer and switch to TIMER_PENDING
256         case CALLS_ACTIVE:
257           // Release store here to make other threads see the updated value of
258           // last_idle_time_.
259           StartIdleTimer();
260           state_.Store(TIMER_PENDING, MemoryOrder::RELEASE);
261           return;
262         // Timer has been set. Switch to
263         // TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START
264         case TIMER_PENDING_CALLS_ACTIVE:
265           // At this point, the state may have been switched to CALLS_ACTIVE by
266           // the idle timer callback. Therefore, use CAS operation to change the
267           // state atomically.
268           // Release store here to make the idle timer callback see the updated
269           // value of last_idle_time_ to properly reset the idle timer.
270           if (state_.CompareExchangeWeak(
271                   &state, TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START,
272                   MemoryOrder::RELEASE, MemoryOrder::RELAXED)) {
273             return;
274           }
275           break;
276         default:
277           // The state has not been switched to desired value yet, try again.
278           state = state_.Load(MemoryOrder::RELAXED);
279           break;
280       }
281     }
282   }
283 }
284
285 ChannelData::ChannelData(grpc_channel_element* elem,
286                          grpc_channel_element_args* args, grpc_error** error)
287     : elem_(elem),
288       channel_stack_(args->channel_stack),
289       client_idle_timeout_(GetClientIdleTimeout(args->channel_args)) {
290   // If the idle filter is explicitly disabled in channel args, this ctor should
291   // not get called.
292   GPR_ASSERT(client_idle_timeout_ != GRPC_MILLIS_INF_FUTURE);
293   GRPC_IDLE_FILTER_LOG("created with max_leisure_time = %" PRId64 " ms",
294                        client_idle_timeout_);
295   // Initialize the idle timer without setting it.
296   grpc_timer_init_unset(&idle_timer_);
297   // Initialize the idle timer callback closure.
298   GRPC_CLOSURE_INIT(&idle_timer_callback_, IdleTimerCallback, this,
299                     grpc_schedule_on_exec_ctx);
300   // Initialize the idle transport op complete callback.
301   GRPC_CLOSURE_INIT(&idle_transport_op_complete_callback_,
302                     IdleTransportOpCompleteCallback, this,
303                     grpc_schedule_on_exec_ctx);
304 }
305
306 void ChannelData::IdleTimerCallback(void* arg, grpc_error* error) {
307   GRPC_IDLE_FILTER_LOG("timer alarms");
308   ChannelData* chand = static_cast<ChannelData*>(arg);
309   if (error != GRPC_ERROR_NONE) {
310     GRPC_IDLE_FILTER_LOG("timer canceled");
311     GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback");
312     return;
313   }
314   bool finished = false;
315   ChannelState state = chand->state_.Load(MemoryOrder::RELAXED);
316   while (!finished) {
317     switch (state) {
318       case TIMER_PENDING:
319         // Change the state to PROCESSING to block IncreaseCallCout() until the
320         // EnterIdle() operation finishes, preventing mistakenly entering IDLE
321         // when active RPC exists.
322         finished = chand->state_.CompareExchangeWeak(
323             &state, PROCESSING, MemoryOrder::ACQUIRE, MemoryOrder::RELAXED);
324         if (finished) {
325           chand->EnterIdle();
326           chand->state_.Store(IDLE, MemoryOrder::RELAXED);
327         }
328         break;
329       case TIMER_PENDING_CALLS_ACTIVE:
330         finished = chand->state_.CompareExchangeWeak(
331             &state, CALLS_ACTIVE, MemoryOrder::RELAXED, MemoryOrder::RELAXED);
332         break;
333       case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START:
334         // Change the state to PROCESSING to block IncreaseCallCount() until the
335         // StartIdleTimer() operation finishes, preventing mistakenly restarting
336         // the timer after grpc_timer_cancel() when shutdown.
337         finished = chand->state_.CompareExchangeWeak(
338             &state, PROCESSING, MemoryOrder::ACQUIRE, MemoryOrder::RELAXED);
339         if (finished) {
340           chand->StartIdleTimer();
341           chand->state_.Store(TIMER_PENDING, MemoryOrder::RELAXED);
342         }
343         break;
344       default:
345         // The state has not been switched to desired value yet, try again.
346         state = chand->state_.Load(MemoryOrder::RELAXED);
347         break;
348     }
349   }
350   GRPC_IDLE_FILTER_LOG("timer finishes");
351   GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback");
352 }
353
354 void ChannelData::IdleTransportOpCompleteCallback(void* arg,
355                                                   grpc_error* error) {
356   ChannelData* chand = static_cast<ChannelData*>(arg);
357   GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "idle transport op");
358 }
359
360 void ChannelData::StartIdleTimer() {
361   GRPC_IDLE_FILTER_LOG("timer has started");
362   // Hold a ref to the channel stack for the timer callback.
363   GRPC_CHANNEL_STACK_REF(channel_stack_, "max idle timer callback");
364   grpc_timer_init(&idle_timer_, last_idle_time_ + client_idle_timeout_,
365                   &idle_timer_callback_);
366 }
367
368 void ChannelData::EnterIdle() {
369   GRPC_IDLE_FILTER_LOG("the channel will enter IDLE");
370   // Hold a ref to the channel stack for the transport op.
371   GRPC_CHANNEL_STACK_REF(channel_stack_, "idle transport op");
372   // Initialize the transport op.
373   idle_transport_op_ = {};
374   idle_transport_op_.disconnect_with_error = grpc_error_set_int(
375       GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"),
376       GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE);
377   idle_transport_op_.on_consumed = &idle_transport_op_complete_callback_;
378   // Pass the transport op down to the channel stack.
379   grpc_channel_next_op(elem_, &idle_transport_op_);
380 }
381
382 class CallData {
383  public:
384   static grpc_error* Init(grpc_call_element* elem,
385                           const grpc_call_element_args* args);
386   static void Destroy(grpc_call_element* elem,
387                       const grpc_call_final_info* final_info,
388                       grpc_closure* then_schedule_closure);
389 };
390
391 grpc_error* CallData::Init(grpc_call_element* elem,
392                            const grpc_call_element_args* args) {
393   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
394   chand->IncreaseCallCount();
395   return GRPC_ERROR_NONE;
396 }
397
398 void CallData::Destroy(grpc_call_element* elem,
399                        const grpc_call_final_info* final_info,
400                        grpc_closure* ignored) {
401   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
402   chand->DecreaseCallCount();
403 }
404
405 const grpc_channel_filter grpc_client_idle_filter = {
406     grpc_call_next_op,
407     ChannelData::StartTransportOp,
408     sizeof(CallData),
409     CallData::Init,
410     grpc_call_stack_ignore_set_pollset_or_pollset_set,
411     CallData::Destroy,
412     sizeof(ChannelData),
413     ChannelData::Init,
414     ChannelData::Destroy,
415     grpc_channel_next_get_info,
416     "client_idle"};
417
418 static bool MaybeAddClientIdleFilter(grpc_channel_stack_builder* builder,
419                                      void* arg) {
420   const grpc_channel_args* channel_args =
421       grpc_channel_stack_builder_get_channel_arguments(builder);
422   if (!grpc_channel_args_want_minimal_stack(channel_args) &&
423       GetClientIdleTimeout(channel_args) != INT_MAX) {
424     return grpc_channel_stack_builder_prepend_filter(
425         builder, &grpc_client_idle_filter, nullptr, nullptr);
426   } else {
427     return true;
428   }
429 }
430
431 }  // namespace
432 }  // namespace grpc_core
433
434 void grpc_client_idle_filter_init(void) {
435   grpc_channel_init_register_stage(
436       GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
437       grpc_core::MaybeAddClientIdleFilter, nullptr);
438 }
439
440 void grpc_client_idle_filter_shutdown(void) {}