3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
23 #include <grpc/support/atm.h>
24 #include <grpc/support/log.h>
26 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
27 #include "src/core/lib/iomgr/error.h"
28 #include "src/core/lib/profiling/timers.h"
30 static grpc_error* init_channel_elem(grpc_channel_element* elem,
31 grpc_channel_element_args* args) {
32 return GRPC_ERROR_NONE;
35 static void destroy_channel_elem(grpc_channel_element* elem) {}
40 // Stats object to update.
41 grpc_core::RefCountedPtr<grpc_core::GrpcLbClientStats> client_stats;
42 // State for intercepting send_initial_metadata.
43 grpc_closure on_complete_for_send;
44 grpc_closure* original_on_complete_for_send;
45 bool send_initial_metadata_succeeded = false;
46 // State for intercepting recv_initial_metadata.
47 grpc_closure recv_initial_metadata_ready;
48 grpc_closure* original_recv_initial_metadata_ready;
49 bool recv_initial_metadata_succeeded = false;
54 static void on_complete_for_send(void* arg, grpc_error* error) {
55 call_data* calld = static_cast<call_data*>(arg);
56 if (error == GRPC_ERROR_NONE) {
57 calld->send_initial_metadata_succeeded = true;
59 GRPC_CLOSURE_RUN(calld->original_on_complete_for_send, GRPC_ERROR_REF(error));
62 static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
63 call_data* calld = static_cast<call_data*>(arg);
64 if (error == GRPC_ERROR_NONE) {
65 calld->recv_initial_metadata_succeeded = true;
67 GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready,
68 GRPC_ERROR_REF(error));
71 static grpc_error* init_call_elem(grpc_call_element* elem,
72 const grpc_call_element_args* args) {
73 GPR_ASSERT(args->context != nullptr);
74 new (elem->call_data) call_data();
75 return GRPC_ERROR_NONE;
78 static void destroy_call_elem(grpc_call_element* elem,
79 const grpc_call_final_info* final_info,
80 grpc_closure* ignored) {
81 call_data* calld = static_cast<call_data*>(elem->call_data);
82 if (calld->client_stats != nullptr) {
83 // Record call finished, optionally setting client_failed_to_send and
85 calld->client_stats->AddCallFinished(
86 !calld->send_initial_metadata_succeeded /* client_failed_to_send */,
87 calld->recv_initial_metadata_succeeded /* known_received */);
92 static void start_transport_stream_op_batch(
93 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
94 call_data* calld = static_cast<call_data*>(elem->call_data);
95 GPR_TIMER_SCOPE("clr_start_transport_stream_op_batch", 0);
96 // Handle send_initial_metadata.
97 if (batch->send_initial_metadata) {
98 // Grab client stats object from user_data for LB token metadata.
99 grpc_linked_mdelem* lb_token =
100 batch->payload->send_initial_metadata.send_initial_metadata->idx.named
102 if (lb_token != nullptr) {
103 grpc_core::GrpcLbClientStats* client_stats =
104 static_cast<grpc_core::GrpcLbClientStats*>(grpc_mdelem_get_user_data(
105 lb_token->md, grpc_core::GrpcLbClientStats::Destroy));
106 if (client_stats != nullptr) {
107 calld->client_stats = client_stats->Ref();
108 // Intercept completion.
109 calld->original_on_complete_for_send = batch->on_complete;
110 GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send,
111 calld, grpc_schedule_on_exec_ctx);
112 batch->on_complete = &calld->on_complete_for_send;
116 // Intercept completion of recv_initial_metadata.
117 if (batch->recv_initial_metadata) {
118 calld->original_recv_initial_metadata_ready =
119 batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
120 GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
121 recv_initial_metadata_ready, calld,
122 grpc_schedule_on_exec_ctx);
123 batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
124 &calld->recv_initial_metadata_ready;
126 // Chain to next filter.
127 grpc_call_next_op(elem, batch);
130 const grpc_channel_filter grpc_client_load_reporting_filter = {
131 start_transport_stream_op_batch,
132 grpc_channel_next_op,
135 grpc_call_stack_ignore_set_pollset_or_pollset_set,
137 0, // sizeof(channel_data)
139 destroy_channel_elem,
140 grpc_channel_next_get_info,
141 "client_load_reporting"};