3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
23 #include <grpc/impl/codegen/log.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/string_util.h>
27 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h"
28 #include "src/core/lib/iomgr/error.h"
29 #include "src/core/lib/iomgr/sockaddr_utils.h"
31 #include "envoy/api/v2/core/address.upb.h"
32 #include "envoy/api/v2/core/base.upb.h"
33 #include "envoy/api/v2/discovery.upb.h"
34 #include "envoy/api/v2/eds.upb.h"
35 #include "envoy/api/v2/endpoint/endpoint.upb.h"
36 #include "envoy/api/v2/endpoint/load_report.upb.h"
37 #include "envoy/service/load_stats/v2/lrs.upb.h"
38 #include "envoy/type/percent.upb.h"
39 #include "google/protobuf/any.upb.h"
40 #include "google/protobuf/duration.upb.h"
41 #include "google/protobuf/struct.upb.h"
42 #include "google/protobuf/timestamp.upb.h"
43 #include "google/protobuf/wrappers.upb.h"
50 constexpr char kEdsTypeUrl[] =
51 "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
52 constexpr char kEndpointRequired[] = "endpointRequired";
56 bool XdsDropConfig::ShouldDrop(const UniquePtr<char>** category_name) const {
57 for (size_t i = 0; i < drop_category_list_.size(); ++i) {
58 const auto& drop_category = drop_category_list_[i];
59 // Generate a random number in [0, 1000000).
60 const int random = rand() % 1000000;
61 if (random < drop_category.parts_per_million) {
62 *category_name = &drop_category.name;
69 grpc_slice XdsEdsRequestCreateAndEncode(const char* service_name) {
72 envoy_api_v2_DiscoveryRequest* request =
73 envoy_api_v2_DiscoveryRequest_new(arena.ptr());
74 envoy_api_v2_core_Node* node =
75 envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
76 google_protobuf_Struct* metadata =
77 envoy_api_v2_core_Node_mutable_metadata(node, arena.ptr());
78 google_protobuf_Struct_FieldsEntry* field =
79 google_protobuf_Struct_add_fields(metadata, arena.ptr());
80 google_protobuf_Struct_FieldsEntry_set_key(
81 field, upb_strview_makez(kEndpointRequired));
82 google_protobuf_Value* value =
83 google_protobuf_Struct_FieldsEntry_mutable_value(field, arena.ptr());
84 google_protobuf_Value_set_bool_value(value, true);
85 envoy_api_v2_DiscoveryRequest_add_resource_names(
86 request, upb_strview_makez(service_name), arena.ptr());
87 envoy_api_v2_DiscoveryRequest_set_type_url(request,
88 upb_strview_makez(kEdsTypeUrl));
89 // Encode the request.
91 char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(),
93 return grpc_slice_from_copied_buffer(output, output_length);
98 grpc_error* ServerAddressParseAndAppend(
99 const envoy_api_v2_endpoint_LbEndpoint* lb_endpoint,
100 ServerAddressList* list) {
102 const envoy_api_v2_endpoint_Endpoint* endpoint =
103 envoy_api_v2_endpoint_LbEndpoint_endpoint(lb_endpoint);
104 const envoy_api_v2_core_Address* address =
105 envoy_api_v2_endpoint_Endpoint_address(endpoint);
106 const envoy_api_v2_core_SocketAddress* socket_address =
107 envoy_api_v2_core_Address_socket_address(address);
108 upb_strview address_strview =
109 envoy_api_v2_core_SocketAddress_address(socket_address);
110 uint32_t port = envoy_api_v2_core_SocketAddress_port_value(socket_address);
111 if (GPR_UNLIKELY(port >> 16) != 0) {
112 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Invalid port.");
114 // Populate grpc_resolved_address.
115 grpc_resolved_address addr;
116 char* address_str = static_cast<char*>(gpr_malloc(address_strview.size + 1));
117 memcpy(address_str, address_strview.data, address_strview.size);
118 address_str[address_strview.size] = '\0';
119 grpc_string_to_sockaddr(&addr, address_str, port);
120 gpr_free(address_str);
121 // Append the address to the list.
122 list->emplace_back(addr, nullptr);
123 return GRPC_ERROR_NONE;
128 UniquePtr<char> StringCopy(const upb_strview& strview) {
129 char* str = static_cast<char*>(gpr_malloc(strview.size + 1));
130 memcpy(str, strview.data, strview.size);
131 str[strview.size] = '\0';
132 return UniquePtr<char>(str);
137 grpc_error* LocalityParse(
138 const envoy_api_v2_endpoint_LocalityLbEndpoints* locality_lb_endpoints,
139 XdsLocalityInfo* locality_info) {
141 const google_protobuf_UInt32Value* lb_weight =
142 envoy_api_v2_endpoint_LocalityLbEndpoints_load_balancing_weight(
143 locality_lb_endpoints);
144 // If LB weight is not specified, it means this locality is assigned no load.
145 // TODO(juanlishen): When we support CDS to configure the inter-locality
146 // policy, we should change the LB weight handling.
147 locality_info->lb_weight =
148 lb_weight != nullptr ? google_protobuf_UInt32Value_value(lb_weight) : 0;
149 if (locality_info->lb_weight == 0) return GRPC_ERROR_NONE;
150 // Parse locality name.
151 const envoy_api_v2_core_Locality* locality =
152 envoy_api_v2_endpoint_LocalityLbEndpoints_locality(locality_lb_endpoints);
153 locality_info->locality_name = MakeRefCounted<XdsLocalityName>(
154 StringCopy(envoy_api_v2_core_Locality_region(locality)),
155 StringCopy(envoy_api_v2_core_Locality_zone(locality)),
156 StringCopy(envoy_api_v2_core_Locality_sub_zone(locality)));
157 // Parse the addresses.
159 const envoy_api_v2_endpoint_LbEndpoint* const* lb_endpoints =
160 envoy_api_v2_endpoint_LocalityLbEndpoints_lb_endpoints(
161 locality_lb_endpoints, &size);
162 for (size_t i = 0; i < size; ++i) {
163 grpc_error* error = ServerAddressParseAndAppend(lb_endpoints[i],
164 &locality_info->serverlist);
165 if (error != GRPC_ERROR_NONE) return error;
167 // Parse the priority.
168 locality_info->priority =
169 envoy_api_v2_endpoint_LocalityLbEndpoints_priority(locality_lb_endpoints);
170 return GRPC_ERROR_NONE;
173 grpc_error* DropParseAndAppend(
174 const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* drop_overload,
175 XdsDropConfig* drop_config, bool* drop_all) {
177 upb_strview category =
178 envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_category(
180 if (category.size == 0) {
181 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty drop category name");
183 // Get the drop rate (per million).
184 const envoy_type_FractionalPercent* drop_percentage =
185 envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_drop_percentage(
187 uint32_t numerator = envoy_type_FractionalPercent_numerator(drop_percentage);
188 const auto denominator =
189 static_cast<envoy_type_FractionalPercent_DenominatorType>(
190 envoy_type_FractionalPercent_denominator(drop_percentage));
191 // Normalize to million.
192 switch (denominator) {
193 case envoy_type_FractionalPercent_HUNDRED:
196 case envoy_type_FractionalPercent_TEN_THOUSAND:
199 case envoy_type_FractionalPercent_MILLION:
202 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unknown denominator type");
204 // Cap numerator to 1000000.
205 numerator = GPR_MIN(numerator, 1000000);
206 if (numerator == 1000000) *drop_all = true;
207 drop_config->AddCategory(StringCopy(category), numerator);
208 return GRPC_ERROR_NONE;
213 grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response,
216 // Decode the response.
217 const envoy_api_v2_DiscoveryResponse* response =
218 envoy_api_v2_DiscoveryResponse_parse(
219 reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(encoded_response)),
220 GRPC_SLICE_LENGTH(encoded_response), arena.ptr());
221 // Parse the response.
222 if (response == nullptr) {
223 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found.");
225 // Check the type_url of the response.
226 upb_strview type_url = envoy_api_v2_DiscoveryResponse_type_url(response);
227 upb_strview expected_type_url = upb_strview_makez(kEdsTypeUrl);
228 if (!upb_strview_eql(type_url, expected_type_url)) {
229 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS.");
231 // Get the resources from the response.
233 const google_protobuf_Any* const* resources =
234 envoy_api_v2_DiscoveryResponse_resources(response, &size);
236 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
237 "EDS response contains 0 resource.");
239 // Check the type_url of the resource.
240 type_url = google_protobuf_Any_type_url(resources[0]);
241 if (!upb_strview_eql(type_url, expected_type_url)) {
242 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS.");
244 // Get the cluster_load_assignment.
245 upb_strview encoded_cluster_load_assignment =
246 google_protobuf_Any_value(resources[0]);
247 envoy_api_v2_ClusterLoadAssignment* cluster_load_assignment =
248 envoy_api_v2_ClusterLoadAssignment_parse(
249 encoded_cluster_load_assignment.data,
250 encoded_cluster_load_assignment.size, arena.ptr());
251 // Get the endpoints.
252 const envoy_api_v2_endpoint_LocalityLbEndpoints* const* endpoints =
253 envoy_api_v2_ClusterLoadAssignment_endpoints(cluster_load_assignment,
255 for (size_t i = 0; i < size; ++i) {
256 XdsLocalityInfo locality_info;
257 grpc_error* error = LocalityParse(endpoints[i], &locality_info);
258 if (error != GRPC_ERROR_NONE) return error;
259 // Filter out locality with weight 0.
260 if (locality_info.lb_weight == 0) continue;
261 update->locality_list.push_back(std::move(locality_info));
263 // The locality list is sorted here into deterministic order so that it's
264 // easier to check if two locality lists contain the same set of localities.
265 std::sort(update->locality_list.data(),
266 update->locality_list.data() + update->locality_list.size(),
267 XdsLocalityInfo::Less());
268 // Get the drop config.
269 update->drop_config = MakeRefCounted<XdsDropConfig>();
270 const envoy_api_v2_ClusterLoadAssignment_Policy* policy =
271 envoy_api_v2_ClusterLoadAssignment_policy(cluster_load_assignment);
272 if (policy != nullptr) {
273 const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* const*
275 envoy_api_v2_ClusterLoadAssignment_Policy_drop_overloads(policy,
277 for (size_t i = 0; i < size; ++i) {
278 grpc_error* error = DropParseAndAppend(
279 drop_overload[i], update->drop_config.get(), &update->drop_all);
280 if (error != GRPC_ERROR_NONE) return error;
283 return GRPC_ERROR_NONE;
288 grpc_slice LrsRequestEncode(
289 const envoy_service_load_stats_v2_LoadStatsRequest* request,
291 size_t output_length;
292 char* output = envoy_service_load_stats_v2_LoadStatsRequest_serialize(
293 request, arena, &output_length);
294 return grpc_slice_from_copied_buffer(output, output_length);
299 grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name) {
302 envoy_service_load_stats_v2_LoadStatsRequest* request =
303 envoy_service_load_stats_v2_LoadStatsRequest_new(arena.ptr());
304 // Add cluster stats. There is only one because we only use one server name in
306 envoy_api_v2_endpoint_ClusterStats* cluster_stats =
307 envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats(
308 request, arena.ptr());
309 // Set the cluster name.
310 envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
311 cluster_stats, upb_strview_makez(server_name));
312 return LrsRequestEncode(request, arena.ptr());
317 void LocalityStatsPopulate(envoy_api_v2_endpoint_UpstreamLocalityStats* output,
318 #if GRPC_USE_CPP_STD_LIB
319 // TODO(veblush): Clean up this
320 // This is to address the difference between
321 // std::map and Map. #else block will be gone
322 // once using stdlib is enabled by default.
323 Pair<const RefCountedPtr<XdsLocalityName>,
325 Pair<RefCountedPtr<XdsLocalityName>,
327 XdsClientStats::LocalityStats::Snapshot>& input,
330 envoy_api_v2_core_Locality* locality =
331 envoy_api_v2_endpoint_UpstreamLocalityStats_mutable_locality(output,
333 envoy_api_v2_core_Locality_set_sub_zone(
334 locality, upb_strview_makez(input.first->sub_zone()));
336 XdsClientStats::LocalityStats::Snapshot& snapshot = input.second;
337 envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_successful_requests(
338 output, snapshot.total_successful_requests);
339 envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_requests_in_progress(
340 output, snapshot.total_requests_in_progress);
341 envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_error_requests(
342 output, snapshot.total_error_requests);
343 envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_issued_requests(
344 output, snapshot.total_issued_requests);
345 // Add load metric stats.
346 for (auto& p : snapshot.load_metric_stats) {
347 const char* metric_name = p.first.get();
348 const XdsClientStats::LocalityStats::LoadMetric::Snapshot& metric_value =
350 envoy_api_v2_endpoint_EndpointLoadMetricStats* load_metric =
351 envoy_api_v2_endpoint_UpstreamLocalityStats_add_load_metric_stats(
353 envoy_api_v2_endpoint_EndpointLoadMetricStats_set_metric_name(
354 load_metric, upb_strview_makez(metric_name));
355 envoy_api_v2_endpoint_EndpointLoadMetricStats_set_num_requests_finished_with_metric(
356 load_metric, metric_value.num_requests_finished_with_metric);
357 envoy_api_v2_endpoint_EndpointLoadMetricStats_set_total_metric_value(
358 load_metric, metric_value.total_metric_value);
364 grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name,
365 XdsClientStats* client_stats) {
367 XdsClientStats::Snapshot snapshot = client_stats->GetSnapshotAndReset();
368 // Prune unused locality stats.
369 client_stats->PruneLocalityStats();
370 // When all the counts are zero, return empty slice.
371 if (snapshot.IsAllZero()) return grpc_empty_slice();
373 envoy_service_load_stats_v2_LoadStatsRequest* request =
374 envoy_service_load_stats_v2_LoadStatsRequest_new(arena.ptr());
375 // Add cluster stats. There is only one because we only use one server name in
377 envoy_api_v2_endpoint_ClusterStats* cluster_stats =
378 envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats(
379 request, arena.ptr());
380 // Set the cluster name.
381 envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
382 cluster_stats, upb_strview_makez(server_name));
383 // Add locality stats.
384 for (auto& p : snapshot.upstream_locality_stats) {
385 envoy_api_v2_endpoint_UpstreamLocalityStats* locality_stats =
386 envoy_api_v2_endpoint_ClusterStats_add_upstream_locality_stats(
387 cluster_stats, arena.ptr());
388 LocalityStatsPopulate(locality_stats, p, arena.ptr());
390 // Add dropped requests.
391 for (auto& p : snapshot.dropped_requests) {
392 const char* category = p.first.get();
393 const uint64_t count = p.second;
394 envoy_api_v2_endpoint_ClusterStats_DroppedRequests* dropped_requests =
395 envoy_api_v2_endpoint_ClusterStats_add_dropped_requests(cluster_stats,
397 envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_category(
398 dropped_requests, upb_strview_makez(category));
399 envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_dropped_count(
400 dropped_requests, count);
402 // Set total dropped requests.
403 envoy_api_v2_endpoint_ClusterStats_set_total_dropped_requests(
404 cluster_stats, snapshot.total_dropped_requests);
405 // Set real load report interval.
406 gpr_timespec timespec =
407 grpc_millis_to_timespec(snapshot.load_report_interval, GPR_TIMESPAN);
408 google_protobuf_Duration* load_report_interval =
409 envoy_api_v2_endpoint_ClusterStats_mutable_load_report_interval(
410 cluster_stats, arena.ptr());
411 google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec);
412 google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec);
413 return LrsRequestEncode(request, arena.ptr());
416 grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response,
417 grpc_millis* load_reporting_interval,
418 const char* expected_server_name) {
420 // Decode the response.
421 const envoy_service_load_stats_v2_LoadStatsResponse* decoded_response =
422 envoy_service_load_stats_v2_LoadStatsResponse_parse(
423 reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(encoded_response)),
424 GRPC_SLICE_LENGTH(encoded_response), arena.ptr());
425 // Parse the response.
426 if (decoded_response == nullptr) {
427 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found.");
429 // Check the cluster size in the response.
431 const upb_strview* clusters =
432 envoy_service_load_stats_v2_LoadStatsResponse_clusters(decoded_response,
435 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
436 "The number of clusters (server names) is not 1.");
438 // Check the cluster name in the response
439 if (strncmp(expected_server_name, clusters[0].data, clusters[0].size) != 0) {
440 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
441 "Unexpected cluster (server name).");
443 // Get the load report interval.
444 const google_protobuf_Duration* load_reporting_interval_duration =
445 envoy_service_load_stats_v2_LoadStatsResponse_load_reporting_interval(
447 gpr_timespec timespec{
448 google_protobuf_Duration_seconds(load_reporting_interval_duration),
449 google_protobuf_Duration_nanos(load_reporting_interval_duration),
451 *load_reporting_interval = gpr_time_to_millis(timespec);
452 return GRPC_ERROR_NONE;
455 } // namespace grpc_core