Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc-cloned / deps / grpc / src / core / ext / filters / client_channel / channel_connectivity.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/surface/channel.h"
22
23 #include <inttypes.h>
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27
28 #include "src/core/ext/filters/client_channel/client_channel.h"
29 #include "src/core/lib/iomgr/timer.h"
30 #include "src/core/lib/surface/api_trace.h"
31 #include "src/core/lib/surface/completion_queue.h"
32
33 grpc_connectivity_state grpc_channel_check_connectivity_state(
34     grpc_channel* channel, int try_to_connect) {
35   /* forward through to the underlying client channel */
36   grpc_channel_element* client_channel_elem =
37       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
38   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
39   grpc_core::ExecCtx exec_ctx;
40   grpc_connectivity_state state;
41   GRPC_API_TRACE(
42       "grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2,
43       (channel, try_to_connect));
44   if (GPR_LIKELY(client_channel_elem->filter == &grpc_client_channel_filter)) {
45     state = grpc_client_channel_check_connectivity_state(client_channel_elem,
46                                                          try_to_connect);
47
48     return state;
49   }
50   gpr_log(GPR_ERROR,
51           "grpc_channel_check_connectivity_state called on something that is "
52           "not a client channel, but '%s'",
53           client_channel_elem->filter->name);
54
55   return GRPC_CHANNEL_SHUTDOWN;
56 }
57
58 typedef enum {
59   WAITING,
60   READY_TO_CALL_BACK,
61   CALLING_BACK_AND_FINISHED,
62 } callback_phase;
63
64 namespace {
65 struct state_watcher {
66   gpr_mu mu;
67   callback_phase phase;
68   grpc_closure on_complete;
69   grpc_closure on_timeout;
70   grpc_closure watcher_timer_init;
71   grpc_timer alarm;
72   grpc_connectivity_state state;
73   grpc_completion_queue* cq;
74   grpc_cq_completion completion_storage;
75   grpc_channel* channel;
76   grpc_error* error;
77   void* tag;
78 };
79 }  // namespace
80
81 static void delete_state_watcher(state_watcher* w) {
82   grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
83       grpc_channel_get_channel_stack(w->channel));
84   if (client_channel_elem->filter == &grpc_client_channel_filter) {
85     GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_channel_connectivity");
86   } else {
87     abort();
88   }
89   gpr_mu_destroy(&w->mu);
90   gpr_free(w);
91 }
92
93 static void finished_completion(void* pw, grpc_cq_completion* ignored) {
94   bool should_delete = false;
95   state_watcher* w = static_cast<state_watcher*>(pw);
96   gpr_mu_lock(&w->mu);
97   switch (w->phase) {
98     case WAITING:
99     case READY_TO_CALL_BACK:
100       GPR_UNREACHABLE_CODE(return );
101     case CALLING_BACK_AND_FINISHED:
102       should_delete = true;
103       break;
104   }
105   gpr_mu_unlock(&w->mu);
106
107   if (should_delete) {
108     delete_state_watcher(w);
109   }
110 }
111
112 static void partly_done(state_watcher* w, bool due_to_completion,
113                         grpc_error* error) {
114   bool end_op = false;
115   void* end_op_tag = nullptr;
116   grpc_error* end_op_error = nullptr;
117   grpc_completion_queue* end_op_cq = nullptr;
118   grpc_cq_completion* end_op_completion_storage = nullptr;
119
120   if (due_to_completion) {
121     grpc_timer_cancel(&w->alarm);
122   } else {
123     grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
124         grpc_channel_get_channel_stack(w->channel));
125     grpc_client_channel_watch_connectivity_state(
126         client_channel_elem,
127         grpc_polling_entity_create_from_pollset(grpc_cq_pollset(w->cq)),
128         nullptr, &w->on_complete, nullptr);
129   }
130
131   gpr_mu_lock(&w->mu);
132
133   if (due_to_completion) {
134     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
135       GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error));
136     }
137     GRPC_ERROR_UNREF(error);
138     error = GRPC_ERROR_NONE;
139   } else {
140     if (error == GRPC_ERROR_NONE) {
141       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
142           "Timed out waiting for connection state change");
143     } else if (error == GRPC_ERROR_CANCELLED) {
144       error = GRPC_ERROR_NONE;
145     }
146   }
147   switch (w->phase) {
148     case WAITING:
149       GRPC_ERROR_REF(error);
150       w->error = error;
151       w->phase = READY_TO_CALL_BACK;
152       break;
153     case READY_TO_CALL_BACK:
154       if (error != GRPC_ERROR_NONE) {
155         GPR_ASSERT(!due_to_completion);
156         GRPC_ERROR_UNREF(w->error);
157         GRPC_ERROR_REF(error);
158         w->error = error;
159       }
160       w->phase = CALLING_BACK_AND_FINISHED;
161       end_op = true;
162       end_op_cq = w->cq;
163       end_op_tag = w->tag;
164       end_op_error = w->error;
165       end_op_completion_storage = &w->completion_storage;
166       break;
167     case CALLING_BACK_AND_FINISHED:
168       GPR_UNREACHABLE_CODE(return );
169       break;
170   }
171   gpr_mu_unlock(&w->mu);
172
173   if (end_op) {
174     grpc_cq_end_op(end_op_cq, end_op_tag, end_op_error, finished_completion, w,
175                    end_op_completion_storage);
176   }
177
178   GRPC_ERROR_UNREF(error);
179 }
180
181 static void watch_complete(void* pw, grpc_error* error) {
182   partly_done(static_cast<state_watcher*>(pw), true, GRPC_ERROR_REF(error));
183 }
184
185 static void timeout_complete(void* pw, grpc_error* error) {
186   partly_done(static_cast<state_watcher*>(pw), false, GRPC_ERROR_REF(error));
187 }
188
189 int grpc_channel_num_external_connectivity_watchers(grpc_channel* channel) {
190   grpc_channel_element* client_channel_elem =
191       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
192   return grpc_client_channel_num_external_connectivity_watchers(
193       client_channel_elem);
194 }
195
196 typedef struct watcher_timer_init_arg {
197   state_watcher* w;
198   gpr_timespec deadline;
199 } watcher_timer_init_arg;
200
201 static void watcher_timer_init(void* arg, grpc_error* error_ignored) {
202   watcher_timer_init_arg* wa = static_cast<watcher_timer_init_arg*>(arg);
203
204   grpc_timer_init(&wa->w->alarm, grpc_timespec_to_millis_round_up(wa->deadline),
205                   &wa->w->on_timeout);
206   gpr_free(wa);
207 }
208
209 int grpc_channel_support_connectivity_watcher(grpc_channel* channel) {
210   grpc_channel_element* client_channel_elem =
211       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
212   return client_channel_elem->filter != &grpc_client_channel_filter ? 0 : 1;
213 }
214
215 void grpc_channel_watch_connectivity_state(
216     grpc_channel* channel, grpc_connectivity_state last_observed_state,
217     gpr_timespec deadline, grpc_completion_queue* cq, void* tag) {
218   grpc_channel_element* client_channel_elem =
219       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
220   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
221   grpc_core::ExecCtx exec_ctx;
222   state_watcher* w = static_cast<state_watcher*>(gpr_malloc(sizeof(*w)));
223
224   GRPC_API_TRACE(
225       "grpc_channel_watch_connectivity_state("
226       "channel=%p, last_observed_state=%d, "
227       "deadline=gpr_timespec { tv_sec: %" PRId64
228       ", tv_nsec: %d, clock_type: %d }, "
229       "cq=%p, tag=%p)",
230       7,
231       (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec,
232        (int)deadline.clock_type, cq, tag));
233
234   GPR_ASSERT(grpc_cq_begin_op(cq, tag));
235
236   gpr_mu_init(&w->mu);
237   GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w,
238                     grpc_schedule_on_exec_ctx);
239   GRPC_CLOSURE_INIT(&w->on_timeout, timeout_complete, w,
240                     grpc_schedule_on_exec_ctx);
241   w->phase = WAITING;
242   w->state = last_observed_state;
243   w->cq = cq;
244   w->tag = tag;
245   w->channel = channel;
246   w->error = nullptr;
247
248   watcher_timer_init_arg* wa = static_cast<watcher_timer_init_arg*>(
249       gpr_malloc(sizeof(watcher_timer_init_arg)));
250   wa->w = w;
251   wa->deadline = deadline;
252   GRPC_CLOSURE_INIT(&w->watcher_timer_init, watcher_timer_init, wa,
253                     grpc_schedule_on_exec_ctx);
254
255   if (client_channel_elem->filter == &grpc_client_channel_filter) {
256     GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
257     grpc_client_channel_watch_connectivity_state(
258         client_channel_elem,
259         grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &w->state,
260         &w->on_complete, &w->watcher_timer_init);
261   } else {
262     abort();
263   }
264 }