Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc-cloned / deps / grpc / src / core / ext / filters / client_channel / lb_policy / xds / xds_load_balancer_api.cc
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <algorithm>
22
23 #include <grpc/impl/codegen/log.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/string_util.h>
26
27 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h"
28 #include "src/core/lib/iomgr/error.h"
29 #include "src/core/lib/iomgr/sockaddr_utils.h"
30
31 #include "envoy/api/v2/core/address.upb.h"
32 #include "envoy/api/v2/core/base.upb.h"
33 #include "envoy/api/v2/discovery.upb.h"
34 #include "envoy/api/v2/eds.upb.h"
35 #include "envoy/api/v2/endpoint/endpoint.upb.h"
36 #include "envoy/api/v2/endpoint/load_report.upb.h"
37 #include "envoy/service/load_stats/v2/lrs.upb.h"
38 #include "envoy/type/percent.upb.h"
39 #include "google/protobuf/any.upb.h"
40 #include "google/protobuf/duration.upb.h"
41 #include "google/protobuf/struct.upb.h"
42 #include "google/protobuf/timestamp.upb.h"
43 #include "google/protobuf/wrappers.upb.h"
44 #include "upb/upb.h"
45
46 namespace grpc_core {
47
48 namespace {
49
50 constexpr char kEdsTypeUrl[] =
51     "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
52 constexpr char kEndpointRequired[] = "endpointRequired";
53
54 }  // namespace
55
56 bool XdsDropConfig::ShouldDrop(const UniquePtr<char>** category_name) const {
57   for (size_t i = 0; i < drop_category_list_.size(); ++i) {
58     const auto& drop_category = drop_category_list_[i];
59     // Generate a random number in [0, 1000000).
60     const int random = rand() % 1000000;
61     if (random < drop_category.parts_per_million) {
62       *category_name = &drop_category.name;
63       return true;
64     }
65   }
66   return false;
67 }
68
69 grpc_slice XdsEdsRequestCreateAndEncode(const char* service_name) {
70   upb::Arena arena;
71   // Create a request.
72   envoy_api_v2_DiscoveryRequest* request =
73       envoy_api_v2_DiscoveryRequest_new(arena.ptr());
74   envoy_api_v2_core_Node* node =
75       envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
76   google_protobuf_Struct* metadata =
77       envoy_api_v2_core_Node_mutable_metadata(node, arena.ptr());
78   google_protobuf_Struct_FieldsEntry* field =
79       google_protobuf_Struct_add_fields(metadata, arena.ptr());
80   google_protobuf_Struct_FieldsEntry_set_key(
81       field, upb_strview_makez(kEndpointRequired));
82   google_protobuf_Value* value =
83       google_protobuf_Struct_FieldsEntry_mutable_value(field, arena.ptr());
84   google_protobuf_Value_set_bool_value(value, true);
85   envoy_api_v2_DiscoveryRequest_add_resource_names(
86       request, upb_strview_makez(service_name), arena.ptr());
87   envoy_api_v2_DiscoveryRequest_set_type_url(request,
88                                              upb_strview_makez(kEdsTypeUrl));
89   // Encode the request.
90   size_t output_length;
91   char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(),
92                                                          &output_length);
93   return grpc_slice_from_copied_buffer(output, output_length);
94 }
95
96 namespace {
97
98 grpc_error* ServerAddressParseAndAppend(
99     const envoy_api_v2_endpoint_LbEndpoint* lb_endpoint,
100     ServerAddressList* list) {
101   // Find the ip:port.
102   const envoy_api_v2_endpoint_Endpoint* endpoint =
103       envoy_api_v2_endpoint_LbEndpoint_endpoint(lb_endpoint);
104   const envoy_api_v2_core_Address* address =
105       envoy_api_v2_endpoint_Endpoint_address(endpoint);
106   const envoy_api_v2_core_SocketAddress* socket_address =
107       envoy_api_v2_core_Address_socket_address(address);
108   upb_strview address_strview =
109       envoy_api_v2_core_SocketAddress_address(socket_address);
110   uint32_t port = envoy_api_v2_core_SocketAddress_port_value(socket_address);
111   if (GPR_UNLIKELY(port >> 16) != 0) {
112     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Invalid port.");
113   }
114   // Populate grpc_resolved_address.
115   grpc_resolved_address addr;
116   char* address_str = static_cast<char*>(gpr_malloc(address_strview.size + 1));
117   memcpy(address_str, address_strview.data, address_strview.size);
118   address_str[address_strview.size] = '\0';
119   grpc_string_to_sockaddr(&addr, address_str, port);
120   gpr_free(address_str);
121   // Append the address to the list.
122   list->emplace_back(addr, nullptr);
123   return GRPC_ERROR_NONE;
124 }
125
126 namespace {
127
128 UniquePtr<char> StringCopy(const upb_strview& strview) {
129   char* str = static_cast<char*>(gpr_malloc(strview.size + 1));
130   memcpy(str, strview.data, strview.size);
131   str[strview.size] = '\0';
132   return UniquePtr<char>(str);
133 }
134
135 }  // namespace
136
137 grpc_error* LocalityParse(
138     const envoy_api_v2_endpoint_LocalityLbEndpoints* locality_lb_endpoints,
139     XdsLocalityInfo* locality_info) {
140   // Parse LB weight.
141   const google_protobuf_UInt32Value* lb_weight =
142       envoy_api_v2_endpoint_LocalityLbEndpoints_load_balancing_weight(
143           locality_lb_endpoints);
144   // If LB weight is not specified, it means this locality is assigned no load.
145   // TODO(juanlishen): When we support CDS to configure the inter-locality
146   // policy, we should change the LB weight handling.
147   locality_info->lb_weight =
148       lb_weight != nullptr ? google_protobuf_UInt32Value_value(lb_weight) : 0;
149   if (locality_info->lb_weight == 0) return GRPC_ERROR_NONE;
150   // Parse locality name.
151   const envoy_api_v2_core_Locality* locality =
152       envoy_api_v2_endpoint_LocalityLbEndpoints_locality(locality_lb_endpoints);
153   locality_info->locality_name = MakeRefCounted<XdsLocalityName>(
154       StringCopy(envoy_api_v2_core_Locality_region(locality)),
155       StringCopy(envoy_api_v2_core_Locality_zone(locality)),
156       StringCopy(envoy_api_v2_core_Locality_sub_zone(locality)));
157   // Parse the addresses.
158   size_t size;
159   const envoy_api_v2_endpoint_LbEndpoint* const* lb_endpoints =
160       envoy_api_v2_endpoint_LocalityLbEndpoints_lb_endpoints(
161           locality_lb_endpoints, &size);
162   for (size_t i = 0; i < size; ++i) {
163     grpc_error* error = ServerAddressParseAndAppend(lb_endpoints[i],
164                                                     &locality_info->serverlist);
165     if (error != GRPC_ERROR_NONE) return error;
166   }
167   // Parse the priority.
168   locality_info->priority =
169       envoy_api_v2_endpoint_LocalityLbEndpoints_priority(locality_lb_endpoints);
170   return GRPC_ERROR_NONE;
171 }
172
173 grpc_error* DropParseAndAppend(
174     const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* drop_overload,
175     XdsDropConfig* drop_config, bool* drop_all) {
176   // Get the category.
177   upb_strview category =
178       envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_category(
179           drop_overload);
180   if (category.size == 0) {
181     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty drop category name");
182   }
183   // Get the drop rate (per million).
184   const envoy_type_FractionalPercent* drop_percentage =
185       envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_drop_percentage(
186           drop_overload);
187   uint32_t numerator = envoy_type_FractionalPercent_numerator(drop_percentage);
188   const auto denominator =
189       static_cast<envoy_type_FractionalPercent_DenominatorType>(
190           envoy_type_FractionalPercent_denominator(drop_percentage));
191   // Normalize to million.
192   switch (denominator) {
193     case envoy_type_FractionalPercent_HUNDRED:
194       numerator *= 10000;
195       break;
196     case envoy_type_FractionalPercent_TEN_THOUSAND:
197       numerator *= 100;
198       break;
199     case envoy_type_FractionalPercent_MILLION:
200       break;
201     default:
202       return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unknown denominator type");
203   }
204   // Cap numerator to 1000000.
205   numerator = GPR_MIN(numerator, 1000000);
206   if (numerator == 1000000) *drop_all = true;
207   drop_config->AddCategory(StringCopy(category), numerator);
208   return GRPC_ERROR_NONE;
209 }
210
211 }  // namespace
212
213 grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response,
214                                          XdsUpdate* update) {
215   upb::Arena arena;
216   // Decode the response.
217   const envoy_api_v2_DiscoveryResponse* response =
218       envoy_api_v2_DiscoveryResponse_parse(
219           reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(encoded_response)),
220           GRPC_SLICE_LENGTH(encoded_response), arena.ptr());
221   // Parse the response.
222   if (response == nullptr) {
223     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found.");
224   }
225   // Check the type_url of the response.
226   upb_strview type_url = envoy_api_v2_DiscoveryResponse_type_url(response);
227   upb_strview expected_type_url = upb_strview_makez(kEdsTypeUrl);
228   if (!upb_strview_eql(type_url, expected_type_url)) {
229     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS.");
230   }
231   // Get the resources from the response.
232   size_t size;
233   const google_protobuf_Any* const* resources =
234       envoy_api_v2_DiscoveryResponse_resources(response, &size);
235   if (size < 1) {
236     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
237         "EDS response contains 0 resource.");
238   }
239   // Check the type_url of the resource.
240   type_url = google_protobuf_Any_type_url(resources[0]);
241   if (!upb_strview_eql(type_url, expected_type_url)) {
242     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS.");
243   }
244   // Get the cluster_load_assignment.
245   upb_strview encoded_cluster_load_assignment =
246       google_protobuf_Any_value(resources[0]);
247   envoy_api_v2_ClusterLoadAssignment* cluster_load_assignment =
248       envoy_api_v2_ClusterLoadAssignment_parse(
249           encoded_cluster_load_assignment.data,
250           encoded_cluster_load_assignment.size, arena.ptr());
251   // Get the endpoints.
252   const envoy_api_v2_endpoint_LocalityLbEndpoints* const* endpoints =
253       envoy_api_v2_ClusterLoadAssignment_endpoints(cluster_load_assignment,
254                                                    &size);
255   for (size_t i = 0; i < size; ++i) {
256     XdsLocalityInfo locality_info;
257     grpc_error* error = LocalityParse(endpoints[i], &locality_info);
258     if (error != GRPC_ERROR_NONE) return error;
259     // Filter out locality with weight 0.
260     if (locality_info.lb_weight == 0) continue;
261     update->locality_list.push_back(std::move(locality_info));
262   }
263   // The locality list is sorted here into deterministic order so that it's
264   // easier to check if two locality lists contain the same set of localities.
265   std::sort(update->locality_list.data(),
266             update->locality_list.data() + update->locality_list.size(),
267             XdsLocalityInfo::Less());
268   // Get the drop config.
269   update->drop_config = MakeRefCounted<XdsDropConfig>();
270   const envoy_api_v2_ClusterLoadAssignment_Policy* policy =
271       envoy_api_v2_ClusterLoadAssignment_policy(cluster_load_assignment);
272   if (policy != nullptr) {
273     const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* const*
274         drop_overload =
275             envoy_api_v2_ClusterLoadAssignment_Policy_drop_overloads(policy,
276                                                                      &size);
277     for (size_t i = 0; i < size; ++i) {
278       grpc_error* error = DropParseAndAppend(
279           drop_overload[i], update->drop_config.get(), &update->drop_all);
280       if (error != GRPC_ERROR_NONE) return error;
281     }
282   }
283   return GRPC_ERROR_NONE;
284 }
285
286 namespace {
287
288 grpc_slice LrsRequestEncode(
289     const envoy_service_load_stats_v2_LoadStatsRequest* request,
290     upb_arena* arena) {
291   size_t output_length;
292   char* output = envoy_service_load_stats_v2_LoadStatsRequest_serialize(
293       request, arena, &output_length);
294   return grpc_slice_from_copied_buffer(output, output_length);
295 }
296
297 }  // namespace
298
299 grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name) {
300   upb::Arena arena;
301   // Create a request.
302   envoy_service_load_stats_v2_LoadStatsRequest* request =
303       envoy_service_load_stats_v2_LoadStatsRequest_new(arena.ptr());
304   // Add cluster stats. There is only one because we only use one server name in
305   // one channel.
306   envoy_api_v2_endpoint_ClusterStats* cluster_stats =
307       envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats(
308           request, arena.ptr());
309   // Set the cluster name.
310   envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
311       cluster_stats, upb_strview_makez(server_name));
312   return LrsRequestEncode(request, arena.ptr());
313 }
314
315 namespace {
316
317 void LocalityStatsPopulate(envoy_api_v2_endpoint_UpstreamLocalityStats* output,
318 #if GRPC_USE_CPP_STD_LIB
319                            // TODO(veblush): Clean up this
320                            // This is to address the difference between
321                            // std::map and Map. #else block will be gone
322                            // once using stdlib is enabled by default.
323                            Pair<const RefCountedPtr<XdsLocalityName>,
324 #else
325                            Pair<RefCountedPtr<XdsLocalityName>,
326 #endif
327                                 XdsClientStats::LocalityStats::Snapshot>& input,
328                            upb_arena* arena) {
329   // Set sub_zone.
330   envoy_api_v2_core_Locality* locality =
331       envoy_api_v2_endpoint_UpstreamLocalityStats_mutable_locality(output,
332                                                                    arena);
333   envoy_api_v2_core_Locality_set_sub_zone(
334       locality, upb_strview_makez(input.first->sub_zone()));
335   // Set total counts.
336   XdsClientStats::LocalityStats::Snapshot& snapshot = input.second;
337   envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_successful_requests(
338       output, snapshot.total_successful_requests);
339   envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_requests_in_progress(
340       output, snapshot.total_requests_in_progress);
341   envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_error_requests(
342       output, snapshot.total_error_requests);
343   envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_issued_requests(
344       output, snapshot.total_issued_requests);
345   // Add load metric stats.
346   for (auto& p : snapshot.load_metric_stats) {
347     const char* metric_name = p.first.get();
348     const XdsClientStats::LocalityStats::LoadMetric::Snapshot& metric_value =
349         p.second;
350     envoy_api_v2_endpoint_EndpointLoadMetricStats* load_metric =
351         envoy_api_v2_endpoint_UpstreamLocalityStats_add_load_metric_stats(
352             output, arena);
353     envoy_api_v2_endpoint_EndpointLoadMetricStats_set_metric_name(
354         load_metric, upb_strview_makez(metric_name));
355     envoy_api_v2_endpoint_EndpointLoadMetricStats_set_num_requests_finished_with_metric(
356         load_metric, metric_value.num_requests_finished_with_metric);
357     envoy_api_v2_endpoint_EndpointLoadMetricStats_set_total_metric_value(
358         load_metric, metric_value.total_metric_value);
359   }
360 }
361
362 }  // namespace
363
364 grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name,
365                                         XdsClientStats* client_stats) {
366   upb::Arena arena;
367   XdsClientStats::Snapshot snapshot = client_stats->GetSnapshotAndReset();
368   // Prune unused locality stats.
369   client_stats->PruneLocalityStats();
370   // When all the counts are zero, return empty slice.
371   if (snapshot.IsAllZero()) return grpc_empty_slice();
372   // Create a request.
373   envoy_service_load_stats_v2_LoadStatsRequest* request =
374       envoy_service_load_stats_v2_LoadStatsRequest_new(arena.ptr());
375   // Add cluster stats. There is only one because we only use one server name in
376   // one channel.
377   envoy_api_v2_endpoint_ClusterStats* cluster_stats =
378       envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats(
379           request, arena.ptr());
380   // Set the cluster name.
381   envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
382       cluster_stats, upb_strview_makez(server_name));
383   // Add locality stats.
384   for (auto& p : snapshot.upstream_locality_stats) {
385     envoy_api_v2_endpoint_UpstreamLocalityStats* locality_stats =
386         envoy_api_v2_endpoint_ClusterStats_add_upstream_locality_stats(
387             cluster_stats, arena.ptr());
388     LocalityStatsPopulate(locality_stats, p, arena.ptr());
389   }
390   // Add dropped requests.
391   for (auto& p : snapshot.dropped_requests) {
392     const char* category = p.first.get();
393     const uint64_t count = p.second;
394     envoy_api_v2_endpoint_ClusterStats_DroppedRequests* dropped_requests =
395         envoy_api_v2_endpoint_ClusterStats_add_dropped_requests(cluster_stats,
396                                                                 arena.ptr());
397     envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_category(
398         dropped_requests, upb_strview_makez(category));
399     envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_dropped_count(
400         dropped_requests, count);
401   }
402   // Set total dropped requests.
403   envoy_api_v2_endpoint_ClusterStats_set_total_dropped_requests(
404       cluster_stats, snapshot.total_dropped_requests);
405   // Set real load report interval.
406   gpr_timespec timespec =
407       grpc_millis_to_timespec(snapshot.load_report_interval, GPR_TIMESPAN);
408   google_protobuf_Duration* load_report_interval =
409       envoy_api_v2_endpoint_ClusterStats_mutable_load_report_interval(
410           cluster_stats, arena.ptr());
411   google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec);
412   google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec);
413   return LrsRequestEncode(request, arena.ptr());
414 }
415
416 grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response,
417                                          grpc_millis* load_reporting_interval,
418                                          const char* expected_server_name) {
419   upb::Arena arena;
420   // Decode the response.
421   const envoy_service_load_stats_v2_LoadStatsResponse* decoded_response =
422       envoy_service_load_stats_v2_LoadStatsResponse_parse(
423           reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(encoded_response)),
424           GRPC_SLICE_LENGTH(encoded_response), arena.ptr());
425   // Parse the response.
426   if (decoded_response == nullptr) {
427     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found.");
428   }
429   // Check the cluster size in the response.
430   size_t size;
431   const upb_strview* clusters =
432       envoy_service_load_stats_v2_LoadStatsResponse_clusters(decoded_response,
433                                                              &size);
434   if (size != 1) {
435     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
436         "The number of clusters (server names) is not 1.");
437   }
438   // Check the cluster name in the response
439   if (strncmp(expected_server_name, clusters[0].data, clusters[0].size) != 0) {
440     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
441         "Unexpected cluster (server name).");
442   }
443   // Get the load report interval.
444   const google_protobuf_Duration* load_reporting_interval_duration =
445       envoy_service_load_stats_v2_LoadStatsResponse_load_reporting_interval(
446           decoded_response);
447   gpr_timespec timespec{
448       google_protobuf_Duration_seconds(load_reporting_interval_duration),
449       google_protobuf_Duration_nanos(load_reporting_interval_duration),
450       GPR_TIMESPAN};
451   *load_reporting_interval = gpr_time_to_millis(timespec);
452   return GRPC_ERROR_NONE;
453 }
454
455 }  // namespace grpc_core