Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / ext / filters / client_channel / lb_policy / grpclb / client_load_reporting_filter.cc
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
22
23 #include <string.h>
24
25 #include <grpc/support/atm.h>
26 #include <grpc/support/log.h>
27
28 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
30 #include "src/core/lib/iomgr/error.h"
31 #include "src/core/lib/profiling/timers.h"
32
33 static grpc_error* init_channel_elem(grpc_channel_element* elem,
34                                      grpc_channel_element_args* args) {
35   return GRPC_ERROR_NONE;
36 }
37
38 static void destroy_channel_elem(grpc_channel_element* elem) {}
39
40 namespace {
41
42 struct call_data {
43   // Stats object to update.
44   grpc_core::RefCountedPtr<grpc_core::GrpcLbClientStats> client_stats;
45   // State for intercepting send_initial_metadata.
46   grpc_closure on_complete_for_send;
47   grpc_closure* original_on_complete_for_send;
48   bool send_initial_metadata_succeeded = false;
49   // State for intercepting recv_initial_metadata.
50   grpc_closure recv_initial_metadata_ready;
51   grpc_closure* original_recv_initial_metadata_ready;
52   bool recv_initial_metadata_succeeded = false;
53 };
54
55 }  // namespace
56
57 static void on_complete_for_send(void* arg, grpc_error* error) {
58   call_data* calld = static_cast<call_data*>(arg);
59   if (error == GRPC_ERROR_NONE) {
60     calld->send_initial_metadata_succeeded = true;
61   }
62   GRPC_CLOSURE_RUN(calld->original_on_complete_for_send, GRPC_ERROR_REF(error));
63 }
64
65 static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
66   call_data* calld = static_cast<call_data*>(arg);
67   if (error == GRPC_ERROR_NONE) {
68     calld->recv_initial_metadata_succeeded = true;
69   }
70   GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready,
71                    GRPC_ERROR_REF(error));
72 }
73
74 static grpc_error* init_call_elem(grpc_call_element* elem,
75                                   const grpc_call_element_args* args) {
76   GPR_ASSERT(args->context != nullptr);
77   new (elem->call_data) call_data();
78   return GRPC_ERROR_NONE;
79 }
80
81 static void destroy_call_elem(grpc_call_element* elem,
82                               const grpc_call_final_info* final_info,
83                               grpc_closure* ignored) {
84   call_data* calld = static_cast<call_data*>(elem->call_data);
85   if (calld->client_stats != nullptr) {
86     // Record call finished, optionally setting client_failed_to_send and
87     // received.
88     calld->client_stats->AddCallFinished(
89         !calld->send_initial_metadata_succeeded /* client_failed_to_send */,
90         calld->recv_initial_metadata_succeeded /* known_received */);
91   }
92   calld->~call_data();
93 }
94
95 static void start_transport_stream_op_batch(
96     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
97   call_data* calld = static_cast<call_data*>(elem->call_data);
98   GPR_TIMER_SCOPE("clr_start_transport_stream_op_batch", 0);
99   // Handle send_initial_metadata.
100   if (batch->send_initial_metadata) {
101     // Grab client stats object from metadata.
102     grpc_linked_mdelem* client_stats_md =
103         batch->payload->send_initial_metadata.send_initial_metadata->list.head;
104     for (; client_stats_md != nullptr;
105          client_stats_md = client_stats_md->next) {
106       if (GRPC_SLICE_START_PTR(GRPC_MDKEY(client_stats_md->md)) ==
107           static_cast<const void*>(grpc_core::kGrpcLbClientStatsMetadataKey)) {
108         break;
109       }
110     }
111     if (client_stats_md != nullptr) {
112       grpc_core::GrpcLbClientStats* client_stats =
113           const_cast<grpc_core::GrpcLbClientStats*>(
114               reinterpret_cast<const grpc_core::GrpcLbClientStats*>(
115                   GRPC_SLICE_START_PTR(GRPC_MDVALUE(client_stats_md->md))));
116       if (client_stats != nullptr) {
117         calld->client_stats.reset(client_stats);
118         // Intercept completion.
119         calld->original_on_complete_for_send = batch->on_complete;
120         GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send,
121                           calld, grpc_schedule_on_exec_ctx);
122         batch->on_complete = &calld->on_complete_for_send;
123       }
124       // Remove metadata so it doesn't go out on the wire.
125       grpc_metadata_batch_remove(
126           batch->payload->send_initial_metadata.send_initial_metadata,
127           client_stats_md);
128     }
129   }
130   // Intercept completion of recv_initial_metadata.
131   if (batch->recv_initial_metadata) {
132     calld->original_recv_initial_metadata_ready =
133         batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
134     GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
135                       recv_initial_metadata_ready, calld,
136                       grpc_schedule_on_exec_ctx);
137     batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
138         &calld->recv_initial_metadata_ready;
139   }
140   // Chain to next filter.
141   grpc_call_next_op(elem, batch);
142 }
143
144 const grpc_channel_filter grpc_client_load_reporting_filter = {
145     start_transport_stream_op_batch,
146     grpc_channel_next_op,
147     sizeof(call_data),
148     init_call_elem,
149     grpc_call_stack_ignore_set_pollset_or_pollset_set,
150     destroy_call_elem,
151     0,  // sizeof(channel_data)
152     init_channel_elem,
153     destroy_channel_elem,
154     grpc_channel_next_get_info,
155     "client_load_reporting"};