3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/lib/surface/channel.h"
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
28 #include "src/core/ext/filters/client_channel/client_channel.h"
29 #include "src/core/lib/iomgr/timer.h"
30 #include "src/core/lib/surface/api_trace.h"
31 #include "src/core/lib/surface/completion_queue.h"
33 grpc_connectivity_state grpc_channel_check_connectivity_state(
34 grpc_channel* channel, int try_to_connect) {
35 /* forward through to the underlying client channel */
36 grpc_channel_element* client_channel_elem =
37 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
38 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
39 grpc_core::ExecCtx exec_ctx;
40 grpc_connectivity_state state;
42 "grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2,
43 (channel, try_to_connect));
44 if (GPR_LIKELY(client_channel_elem->filter == &grpc_client_channel_filter)) {
45 state = grpc_client_channel_check_connectivity_state(client_channel_elem,
51 "grpc_channel_check_connectivity_state called on something that is "
52 "not a client channel, but '%s'",
53 client_channel_elem->filter->name);
55 return GRPC_CHANNEL_SHUTDOWN;
61 CALLING_BACK_AND_FINISHED,
65 struct state_watcher {
68 grpc_closure on_complete;
69 grpc_closure on_timeout;
70 grpc_closure watcher_timer_init;
72 grpc_connectivity_state state;
73 grpc_completion_queue* cq;
74 grpc_cq_completion completion_storage;
75 grpc_channel* channel;
81 static void delete_state_watcher(state_watcher* w) {
82 grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
83 grpc_channel_get_channel_stack(w->channel));
84 if (client_channel_elem->filter == &grpc_client_channel_filter) {
85 GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_channel_connectivity");
89 gpr_mu_destroy(&w->mu);
93 static void finished_completion(void* pw, grpc_cq_completion* ignored) {
94 bool should_delete = false;
95 state_watcher* w = static_cast<state_watcher*>(pw);
99 case READY_TO_CALL_BACK:
100 GPR_UNREACHABLE_CODE(return );
101 case CALLING_BACK_AND_FINISHED:
102 should_delete = true;
105 gpr_mu_unlock(&w->mu);
108 delete_state_watcher(w);
112 static void partly_done(state_watcher* w, bool due_to_completion,
115 void* end_op_tag = nullptr;
116 grpc_error* end_op_error = nullptr;
117 grpc_completion_queue* end_op_cq = nullptr;
118 grpc_cq_completion* end_op_completion_storage = nullptr;
120 if (due_to_completion) {
121 grpc_timer_cancel(&w->alarm);
123 grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
124 grpc_channel_get_channel_stack(w->channel));
125 grpc_client_channel_watch_connectivity_state(
127 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(w->cq)),
128 nullptr, &w->on_complete, nullptr);
133 if (due_to_completion) {
134 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
135 GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error));
137 GRPC_ERROR_UNREF(error);
138 error = GRPC_ERROR_NONE;
140 if (error == GRPC_ERROR_NONE) {
141 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
142 "Timed out waiting for connection state change");
143 } else if (error == GRPC_ERROR_CANCELLED) {
144 error = GRPC_ERROR_NONE;
149 GRPC_ERROR_REF(error);
151 w->phase = READY_TO_CALL_BACK;
153 case READY_TO_CALL_BACK:
154 if (error != GRPC_ERROR_NONE) {
155 GPR_ASSERT(!due_to_completion);
156 GRPC_ERROR_UNREF(w->error);
157 GRPC_ERROR_REF(error);
160 w->phase = CALLING_BACK_AND_FINISHED;
164 end_op_error = w->error;
165 end_op_completion_storage = &w->completion_storage;
167 case CALLING_BACK_AND_FINISHED:
168 GPR_UNREACHABLE_CODE(return );
171 gpr_mu_unlock(&w->mu);
174 grpc_cq_end_op(end_op_cq, end_op_tag, end_op_error, finished_completion, w,
175 end_op_completion_storage);
178 GRPC_ERROR_UNREF(error);
181 static void watch_complete(void* pw, grpc_error* error) {
182 partly_done(static_cast<state_watcher*>(pw), true, GRPC_ERROR_REF(error));
185 static void timeout_complete(void* pw, grpc_error* error) {
186 partly_done(static_cast<state_watcher*>(pw), false, GRPC_ERROR_REF(error));
189 int grpc_channel_num_external_connectivity_watchers(grpc_channel* channel) {
190 grpc_channel_element* client_channel_elem =
191 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
192 return grpc_client_channel_num_external_connectivity_watchers(
193 client_channel_elem);
196 typedef struct watcher_timer_init_arg {
198 gpr_timespec deadline;
199 } watcher_timer_init_arg;
201 static void watcher_timer_init(void* arg, grpc_error* error_ignored) {
202 watcher_timer_init_arg* wa = static_cast<watcher_timer_init_arg*>(arg);
204 grpc_timer_init(&wa->w->alarm, grpc_timespec_to_millis_round_up(wa->deadline),
209 int grpc_channel_support_connectivity_watcher(grpc_channel* channel) {
210 grpc_channel_element* client_channel_elem =
211 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
212 return client_channel_elem->filter != &grpc_client_channel_filter ? 0 : 1;
215 void grpc_channel_watch_connectivity_state(
216 grpc_channel* channel, grpc_connectivity_state last_observed_state,
217 gpr_timespec deadline, grpc_completion_queue* cq, void* tag) {
218 grpc_channel_element* client_channel_elem =
219 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
220 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
221 grpc_core::ExecCtx exec_ctx;
222 state_watcher* w = static_cast<state_watcher*>(gpr_malloc(sizeof(*w)));
225 "grpc_channel_watch_connectivity_state("
226 "channel=%p, last_observed_state=%d, "
227 "deadline=gpr_timespec { tv_sec: %" PRId64
228 ", tv_nsec: %d, clock_type: %d }, "
231 (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec,
232 (int)deadline.clock_type, cq, tag));
234 GPR_ASSERT(grpc_cq_begin_op(cq, tag));
237 GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w,
238 grpc_schedule_on_exec_ctx);
239 GRPC_CLOSURE_INIT(&w->on_timeout, timeout_complete, w,
240 grpc_schedule_on_exec_ctx);
242 w->state = last_observed_state;
245 w->channel = channel;
248 watcher_timer_init_arg* wa = static_cast<watcher_timer_init_arg*>(
249 gpr_malloc(sizeof(watcher_timer_init_arg)));
251 wa->deadline = deadline;
252 GRPC_CLOSURE_INIT(&w->watcher_timer_init, watcher_timer_init, wa,
253 grpc_schedule_on_exec_ctx);
255 if (client_channel_elem->filter == &grpc_client_channel_filter) {
256 GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
257 grpc_client_channel_watch_connectivity_state(
259 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &w->state,
260 &w->on_complete, &w->watcher_timer_init);