Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / ext / filters / load_reporting / server_load_reporting_filter.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <string.h>
22
23 #include <grpc/grpc_security.h>
24 #include <grpc/slice.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/string_util.h>
28
29 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
30 #include "src/core/ext/filters/client_channel/parse_address.h"
31 #include "src/core/ext/filters/load_reporting/registered_opencensus_objects.h"
32 #include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
33 #include "src/core/lib/channel/channel_args.h"
34 #include "src/core/lib/channel/context.h"
35 #include "src/core/lib/iomgr/resolve_address.h"
36 #include "src/core/lib/iomgr/sockaddr.h"
37 #include "src/core/lib/iomgr/socket_utils.h"
38 #include "src/core/lib/security/context/security_context.h"
39 #include "src/core/lib/slice/slice_internal.h"
40 #include "src/core/lib/surface/call.h"
41 #include "src/core/lib/uri/uri_parser.h"
42
43 namespace grpc {
44
45 constexpr char kEncodedIpv4AddressLengthString[] = "08";
46 constexpr char kEncodedIpv6AddressLengthString[] = "32";
47 constexpr char kEmptyAddressLengthString[] = "00";
48 constexpr size_t kLengthPrefixSize = 2;
49
50 grpc_error* ServerLoadReportingChannelData::Init(
51     grpc_channel_element* /* elem */, grpc_channel_element_args* args) {
52   GPR_ASSERT(!args->is_last);
53   // Find and record the peer_identity.
54   const grpc_auth_context* auth_context =
55       grpc_find_auth_context_in_args(args->channel_args);
56   if (auth_context != nullptr &&
57       grpc_auth_context_peer_is_authenticated(auth_context)) {
58     grpc_auth_property_iterator auth_it =
59         grpc_auth_context_peer_identity(auth_context);
60     const grpc_auth_property* auth_property =
61         grpc_auth_property_iterator_next(&auth_it);
62     if (auth_property != nullptr) {
63       peer_identity_ = auth_property->value;
64       peer_identity_len_ = auth_property->value_length;
65     }
66   }
67   return GRPC_ERROR_NONE;
68 }
69
70 void ServerLoadReportingCallData::Destroy(
71     grpc_call_element* elem, const grpc_call_final_info* final_info,
72     grpc_closure* then_call_closure) {
73   ServerLoadReportingChannelData* chand =
74       reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
75   // Only record an end if we've recorded its corresponding start, which is
76   // indicated by a non-null client_ip_and_lr_token_. Note that it's possible
77   // that we attempt to record the call end before we have recorded the call
78   // start, because the data needed for recording the start comes from the
79   // initial metadata, which may not be ready before the call finishes.
80   if (client_ip_and_lr_token_ != nullptr) {
81     opencensus::stats::Record(
82         {{::grpc::load_reporter::MeasureEndCount(), 1},
83          {::grpc::load_reporter::MeasureEndBytesSent(),
84           final_info->stats.transport_stream_stats.outgoing.data_bytes},
85          {::grpc::load_reporter::MeasureEndBytesReceived(),
86           final_info->stats.transport_stream_stats.incoming.data_bytes},
87          {::grpc::load_reporter::MeasureEndLatencyMs(),
88           gpr_time_to_millis(final_info->stats.latency)}},
89         {{::grpc::load_reporter::TagKeyToken(),
90           {client_ip_and_lr_token_, client_ip_and_lr_token_len_}},
91          {::grpc::load_reporter::TagKeyHost(),
92           {target_host_, target_host_len_}},
93          {::grpc::load_reporter::TagKeyUserId(),
94           {chand->peer_identity(), chand->peer_identity_len()}},
95          {::grpc::load_reporter::TagKeyStatus(),
96           GetStatusTagForStatus(final_info->final_status)}});
97     gpr_free(client_ip_and_lr_token_);
98   }
99   gpr_free(target_host_);
100   grpc_slice_unref_internal(service_method_);
101 }
102
103 void ServerLoadReportingCallData::StartTransportStreamOpBatch(
104     grpc_call_element* elem, TransportStreamOpBatch* op) {
105   GPR_TIMER_SCOPE("lr_start_transport_stream_op", 0);
106   if (op->recv_initial_metadata() != nullptr) {
107     // Save some fields to use when initial metadata is ready.
108     peer_string_ = op->get_peer_string();
109     recv_initial_metadata_ =
110         op->op()->payload->recv_initial_metadata.recv_initial_metadata;
111     original_recv_initial_metadata_ready_ = op->recv_initial_metadata_ready();
112     // Substitute the original closure for the wrapper closure.
113     op->set_recv_initial_metadata_ready(&recv_initial_metadata_ready_);
114   } else if (op->send_trailing_metadata() != nullptr) {
115     GRPC_LOG_IF_ERROR(
116         "server_load_reporting_filter",
117         grpc_metadata_batch_filter(op->send_trailing_metadata()->batch(),
118                                    SendTrailingMetadataFilter, elem,
119                                    "send_trailing_metadata filtering error"));
120   }
121   grpc_call_next_op(elem, op->op());
122 }
123
124 void ServerLoadReportingCallData::GetCensusSafeClientIpString(
125     char** client_ip_string, size_t* size) {
126   // Find the client URI string.
127   const char* client_uri_str =
128       reinterpret_cast<const char*>(gpr_atm_acq_load(peer_string_));
129   if (client_uri_str == nullptr) {
130     gpr_log(GPR_ERROR,
131             "Unable to extract client URI string (peer string) from gRPC "
132             "metadata.");
133     *client_ip_string = nullptr;
134     *size = 0;
135     return;
136   }
137   // Parse the client URI string into grpc_uri.
138   grpc_uri* client_uri = grpc_uri_parse(client_uri_str, true);
139   if (client_uri == nullptr) {
140     gpr_log(GPR_ERROR,
141             "Unable to parse the client URI string (peer string) to a client "
142             "URI.");
143     *client_ip_string = nullptr;
144     *size = 0;
145     return;
146   }
147   // Parse the client URI into grpc_resolved_address.
148   grpc_resolved_address resolved_address;
149   bool success = grpc_parse_uri(client_uri, &resolved_address);
150   grpc_uri_destroy(client_uri);
151   if (!success) {
152     gpr_log(GPR_ERROR,
153             "Unable to parse client URI into a grpc_resolved_address.");
154     *client_ip_string = nullptr;
155     *size = 0;
156     return;
157   }
158   // Convert the socket address in the grpc_resolved_address into a hex string
159   // according to the address family.
160   grpc_sockaddr* addr = reinterpret_cast<grpc_sockaddr*>(resolved_address.addr);
161   if (addr->sa_family == GRPC_AF_INET) {
162     grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(addr);
163     gpr_asprintf(client_ip_string, "%08x", grpc_ntohl(addr4->sin_addr.s_addr));
164     *size = 8;
165   } else if (addr->sa_family == GRPC_AF_INET6) {
166     grpc_sockaddr_in6* addr6 = reinterpret_cast<grpc_sockaddr_in6*>(addr);
167     *client_ip_string = static_cast<char*>(gpr_malloc(32 + 1));
168     uint32_t* addr6_next_long = reinterpret_cast<uint32_t*>(&addr6->sin6_addr);
169     for (size_t i = 0; i < 4; ++i) {
170       snprintf(*client_ip_string + 8 * i, 8 + 1, "%08x",
171                grpc_ntohl(*addr6_next_long++));
172     }
173     *size = 32;
174   } else {
175     GPR_UNREACHABLE_CODE();
176   }
177 }
178
179 void ServerLoadReportingCallData::StoreClientIpAndLrToken(const char* lr_token,
180                                                           size_t lr_token_len) {
181   char* client_ip;
182   size_t client_ip_len;
183   GetCensusSafeClientIpString(&client_ip, &client_ip_len);
184   client_ip_and_lr_token_len_ =
185       kLengthPrefixSize + client_ip_len + lr_token_len;
186   client_ip_and_lr_token_ = static_cast<char*>(
187       gpr_zalloc(client_ip_and_lr_token_len_ * sizeof(char)));
188   char* cur_pos = client_ip_and_lr_token_;
189   // Store the IP length prefix.
190   if (client_ip_len == 0) {
191     strncpy(cur_pos, kEmptyAddressLengthString, kLengthPrefixSize);
192   } else if (client_ip_len == 8) {
193     strncpy(cur_pos, kEncodedIpv4AddressLengthString, kLengthPrefixSize);
194   } else if (client_ip_len == 32) {
195     strncpy(cur_pos, kEncodedIpv6AddressLengthString, kLengthPrefixSize);
196   } else {
197     GPR_UNREACHABLE_CODE();
198   }
199   cur_pos += kLengthPrefixSize;
200   // Store the IP.
201   if (client_ip_len != 0) {
202     strncpy(cur_pos, client_ip, client_ip_len);
203   }
204   gpr_free(client_ip);
205   cur_pos += client_ip_len;
206   // Store the LR token.
207   if (lr_token_len != 0) {
208     strncpy(cur_pos, lr_token, lr_token_len);
209   }
210   GPR_ASSERT(cur_pos + lr_token_len - client_ip_and_lr_token_ ==
211              client_ip_and_lr_token_len_);
212 }
213
214 grpc_filtered_mdelem ServerLoadReportingCallData::RecvInitialMetadataFilter(
215     void* user_data, grpc_mdelem md) {
216   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
217   ServerLoadReportingCallData* calld =
218       reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
219   if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_PATH)) {
220     calld->service_method_ = grpc_slice_ref_internal(GRPC_MDVALUE(md));
221   } else if (calld->target_host_ == nullptr &&
222              grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_AUTHORITY)) {
223     grpc_slice target_host_slice = GRPC_MDVALUE(md);
224     calld->target_host_len_ = GRPC_SLICE_LENGTH(target_host_slice);
225     calld->target_host_ =
226         reinterpret_cast<char*>(gpr_zalloc(calld->target_host_len_));
227     for (size_t i = 0; i < calld->target_host_len_; ++i) {
228       calld->target_host_[i] = static_cast<char>(
229           tolower(GRPC_SLICE_START_PTR(target_host_slice)[i]));
230     }
231   } else if (grpc_slice_str_cmp(GRPC_MDKEY(md),
232                                 grpc_core::kGrpcLbLbTokenMetadataKey) == 0) {
233     if (calld->client_ip_and_lr_token_ == nullptr) {
234       calld->StoreClientIpAndLrToken(
235           reinterpret_cast<const char*> GRPC_SLICE_START_PTR(GRPC_MDVALUE(md)),
236           GRPC_SLICE_LENGTH(GRPC_MDVALUE(md)));
237     }
238     return GRPC_FILTERED_REMOVE();
239   }
240   return GRPC_FILTERED_MDELEM(md);
241 }
242
243 void ServerLoadReportingCallData::RecvInitialMetadataReady(void* arg,
244                                                            grpc_error* err) {
245   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(arg);
246   ServerLoadReportingCallData* calld =
247       reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
248   ServerLoadReportingChannelData* chand =
249       reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
250   if (err == GRPC_ERROR_NONE) {
251     GRPC_LOG_IF_ERROR(
252         "server_load_reporting_filter",
253         grpc_metadata_batch_filter(calld->recv_initial_metadata_,
254                                    RecvInitialMetadataFilter, elem,
255                                    "recv_initial_metadata filtering error"));
256     // If the LB token was not found in the recv_initial_metadata, only the
257     // client IP part will be recorded (with an empty LB token).
258     if (calld->client_ip_and_lr_token_ == nullptr) {
259       calld->StoreClientIpAndLrToken(nullptr, 0);
260     }
261     opencensus::stats::Record(
262         {{::grpc::load_reporter::MeasureStartCount(), 1}},
263         {{::grpc::load_reporter::TagKeyToken(),
264           {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}},
265          {::grpc::load_reporter::TagKeyHost(),
266           {calld->target_host_, calld->target_host_len_}},
267          {::grpc::load_reporter::TagKeyUserId(),
268           {chand->peer_identity(), chand->peer_identity_len()}}});
269   }
270   GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready_,
271                    GRPC_ERROR_REF(err));
272 }
273
274 grpc_error* ServerLoadReportingCallData::Init(
275     grpc_call_element* elem, const grpc_call_element_args* args) {
276   service_method_ = grpc_empty_slice();
277   GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
278                     elem, grpc_schedule_on_exec_ctx);
279   return GRPC_ERROR_NONE;
280 }
281
282 grpc_filtered_mdelem ServerLoadReportingCallData::SendTrailingMetadataFilter(
283     void* user_data, grpc_mdelem md) {
284   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
285   ServerLoadReportingCallData* calld =
286       reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
287   ServerLoadReportingChannelData* chand =
288       reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
289   if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_COST_BIN)) {
290     const grpc_slice value = GRPC_MDVALUE(md);
291     const size_t cost_entry_size = GRPC_SLICE_LENGTH(value);
292     if (cost_entry_size < sizeof(double)) {
293       gpr_log(GPR_ERROR,
294               "Cost metadata value too small (%zu bytes) to hold valid data. "
295               "Ignoring.",
296               cost_entry_size);
297       return GRPC_FILTERED_REMOVE();
298     }
299     const double* cost_entry_ptr =
300         reinterpret_cast<const double*>(GRPC_SLICE_START_PTR(value));
301     double cost_value = *cost_entry_ptr++;
302     const char* cost_name = reinterpret_cast<const char*>(cost_entry_ptr);
303     const size_t cost_name_len = cost_entry_size - sizeof(double);
304     opencensus::stats::Record(
305         {{::grpc::load_reporter::MeasureOtherCallMetric(), cost_value}},
306         {{::grpc::load_reporter::TagKeyToken(),
307           {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}},
308          {::grpc::load_reporter::TagKeyHost(),
309           {calld->target_host_, calld->target_host_len_}},
310          {::grpc::load_reporter::TagKeyUserId(),
311           {chand->peer_identity(), chand->peer_identity_len()}},
312          {::grpc::load_reporter::TagKeyMetricName(),
313           {cost_name, cost_name_len}}});
314     return GRPC_FILTERED_REMOVE();
315   }
316   return GRPC_FILTERED_MDELEM(md);
317 }
318
319 const char* ServerLoadReportingCallData::GetStatusTagForStatus(
320     grpc_status_code status) {
321   switch (status) {
322     case GRPC_STATUS_OK:
323       return ::grpc::load_reporter::kCallStatusOk;
324     case GRPC_STATUS_UNKNOWN:
325     case GRPC_STATUS_DEADLINE_EXCEEDED:
326     case GRPC_STATUS_UNIMPLEMENTED:
327     case GRPC_STATUS_INTERNAL:
328     case GRPC_STATUS_UNAVAILABLE:
329     case GRPC_STATUS_DATA_LOSS:
330       return ::grpc::load_reporter::kCallStatusServerError;
331     default:
332       return ::grpc::load_reporter::kCallStatusClientError;
333   }
334 }
335
336 namespace {
337 bool MaybeAddServerLoadReportingFilter(const grpc_channel_args& args) {
338   return grpc_channel_arg_get_bool(
339       grpc_channel_args_find(&args, GRPC_ARG_ENABLE_LOAD_REPORTING), false);
340 }
341 }  // namespace
342
343 // TODO(juanlishen): We should register the filter during grpc initialization
344 // time once OpenCensus is compatible with our build system. For now, we force
345 // registration of the server load reporting filter at static initialization
346 // time if we build with the filter target.
347 struct ServerLoadReportingFilterStaticRegistrar {
348   ServerLoadReportingFilterStaticRegistrar() {
349     static grpc_core::Atomic<bool> registered{false};
350     if (registered.Load(grpc_core::MemoryOrder::ACQUIRE)) return;
351     RegisterChannelFilter<ServerLoadReportingChannelData,
352                           ServerLoadReportingCallData>(
353         "server_load_reporting", GRPC_SERVER_CHANNEL, INT_MAX,
354         MaybeAddServerLoadReportingFilter);
355     // Access measures to ensure they are initialized. Otherwise, we can't
356     // create any valid view before the first RPC.
357     ::grpc::load_reporter::MeasureStartCount();
358     ::grpc::load_reporter::MeasureEndCount();
359     ::grpc::load_reporter::MeasureEndBytesSent();
360     ::grpc::load_reporter::MeasureEndBytesReceived();
361     ::grpc::load_reporter::MeasureEndLatencyMs();
362     ::grpc::load_reporter::MeasureOtherCallMetric();
363     registered.Store(true, grpc_core::MemoryOrder::RELEASE);
364   }
365 } server_load_reporting_filter_static_registrar;
366
367 }  // namespace grpc