3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
25 #include <grpc/support/atm.h>
26 #include <grpc/support/log.h>
28 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
30 #include "src/core/lib/iomgr/error.h"
31 #include "src/core/lib/profiling/timers.h"
33 static grpc_error* init_channel_elem(grpc_channel_element* elem,
34 grpc_channel_element_args* args) {
35 return GRPC_ERROR_NONE;
38 static void destroy_channel_elem(grpc_channel_element* elem) {}
43 // Stats object to update.
44 grpc_core::RefCountedPtr<grpc_core::GrpcLbClientStats> client_stats;
45 // State for intercepting send_initial_metadata.
46 grpc_closure on_complete_for_send;
47 grpc_closure* original_on_complete_for_send;
48 bool send_initial_metadata_succeeded = false;
49 // State for intercepting recv_initial_metadata.
50 grpc_closure recv_initial_metadata_ready;
51 grpc_closure* original_recv_initial_metadata_ready;
52 bool recv_initial_metadata_succeeded = false;
57 static void on_complete_for_send(void* arg, grpc_error* error) {
58 call_data* calld = static_cast<call_data*>(arg);
59 if (error == GRPC_ERROR_NONE) {
60 calld->send_initial_metadata_succeeded = true;
62 GRPC_CLOSURE_RUN(calld->original_on_complete_for_send, GRPC_ERROR_REF(error));
65 static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
66 call_data* calld = static_cast<call_data*>(arg);
67 if (error == GRPC_ERROR_NONE) {
68 calld->recv_initial_metadata_succeeded = true;
70 GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready,
71 GRPC_ERROR_REF(error));
74 static grpc_error* init_call_elem(grpc_call_element* elem,
75 const grpc_call_element_args* args) {
76 GPR_ASSERT(args->context != nullptr);
77 new (elem->call_data) call_data();
78 return GRPC_ERROR_NONE;
81 static void destroy_call_elem(grpc_call_element* elem,
82 const grpc_call_final_info* final_info,
83 grpc_closure* ignored) {
84 call_data* calld = static_cast<call_data*>(elem->call_data);
85 if (calld->client_stats != nullptr) {
86 // Record call finished, optionally setting client_failed_to_send and
88 calld->client_stats->AddCallFinished(
89 !calld->send_initial_metadata_succeeded /* client_failed_to_send */,
90 calld->recv_initial_metadata_succeeded /* known_received */);
95 static void start_transport_stream_op_batch(
96 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
97 call_data* calld = static_cast<call_data*>(elem->call_data);
98 GPR_TIMER_SCOPE("clr_start_transport_stream_op_batch", 0);
99 // Handle send_initial_metadata.
100 if (batch->send_initial_metadata) {
101 // Grab client stats object from metadata.
102 grpc_linked_mdelem* client_stats_md =
103 batch->payload->send_initial_metadata.send_initial_metadata->list.head;
104 for (; client_stats_md != nullptr;
105 client_stats_md = client_stats_md->next) {
106 if (GRPC_SLICE_START_PTR(GRPC_MDKEY(client_stats_md->md)) ==
107 static_cast<const void*>(grpc_core::kGrpcLbClientStatsMetadataKey)) {
111 if (client_stats_md != nullptr) {
112 grpc_core::GrpcLbClientStats* client_stats =
113 const_cast<grpc_core::GrpcLbClientStats*>(
114 reinterpret_cast<const grpc_core::GrpcLbClientStats*>(
115 GRPC_SLICE_START_PTR(GRPC_MDVALUE(client_stats_md->md))));
116 if (client_stats != nullptr) {
117 calld->client_stats.reset(client_stats);
118 // Intercept completion.
119 calld->original_on_complete_for_send = batch->on_complete;
120 GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send,
121 calld, grpc_schedule_on_exec_ctx);
122 batch->on_complete = &calld->on_complete_for_send;
124 // Remove metadata so it doesn't go out on the wire.
125 grpc_metadata_batch_remove(
126 batch->payload->send_initial_metadata.send_initial_metadata,
130 // Intercept completion of recv_initial_metadata.
131 if (batch->recv_initial_metadata) {
132 calld->original_recv_initial_metadata_ready =
133 batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
134 GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
135 recv_initial_metadata_ready, calld,
136 grpc_schedule_on_exec_ctx);
137 batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
138 &calld->recv_initial_metadata_ready;
140 // Chain to next filter.
141 grpc_call_next_op(elem, batch);
144 const grpc_channel_filter grpc_client_load_reporting_filter = {
145 start_transport_stream_op_batch,
146 grpc_channel_next_op,
149 grpc_call_stack_ignore_set_pollset_or_pollset_set,
151 0, // sizeof(channel_data)
153 destroy_channel_elem,
154 grpc_channel_next_get_info,
155 "client_load_reporting"};