/* * * Copyright 2018 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include #include #include #include #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "envoy/api/v2/core/address.upb.h" #include "envoy/api/v2/core/base.upb.h" #include "envoy/api/v2/discovery.upb.h" #include "envoy/api/v2/eds.upb.h" #include "envoy/api/v2/endpoint/endpoint.upb.h" #include "envoy/api/v2/endpoint/load_report.upb.h" #include "envoy/service/load_stats/v2/lrs.upb.h" #include "envoy/type/percent.upb.h" #include "google/protobuf/any.upb.h" #include "google/protobuf/duration.upb.h" #include "google/protobuf/struct.upb.h" #include "google/protobuf/timestamp.upb.h" #include "google/protobuf/wrappers.upb.h" #include "upb/upb.h" namespace grpc_core { namespace { constexpr char kEdsTypeUrl[] = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"; constexpr char kEndpointRequired[] = "endpointRequired"; } // namespace bool XdsDropConfig::ShouldDrop(const UniquePtr** category_name) const { for (size_t i = 0; i < drop_category_list_.size(); ++i) { const auto& drop_category = drop_category_list_[i]; // Generate a random number in [0, 1000000). const int random = rand() % 1000000; if (random < drop_category.parts_per_million) { *category_name = &drop_category.name; return true; } } return false; } grpc_slice XdsEdsRequestCreateAndEncode(const char* service_name) { upb::Arena arena; // Create a request. envoy_api_v2_DiscoveryRequest* request = envoy_api_v2_DiscoveryRequest_new(arena.ptr()); envoy_api_v2_core_Node* node = envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr()); google_protobuf_Struct* metadata = envoy_api_v2_core_Node_mutable_metadata(node, arena.ptr()); google_protobuf_Struct_FieldsEntry* field = google_protobuf_Struct_add_fields(metadata, arena.ptr()); google_protobuf_Struct_FieldsEntry_set_key( field, upb_strview_makez(kEndpointRequired)); google_protobuf_Value* value = google_protobuf_Struct_FieldsEntry_mutable_value(field, arena.ptr()); google_protobuf_Value_set_bool_value(value, true); envoy_api_v2_DiscoveryRequest_add_resource_names( request, upb_strview_makez(service_name), arena.ptr()); envoy_api_v2_DiscoveryRequest_set_type_url(request, upb_strview_makez(kEdsTypeUrl)); // Encode the request. size_t output_length; char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(), &output_length); return grpc_slice_from_copied_buffer(output, output_length); } namespace { grpc_error* ServerAddressParseAndAppend( const envoy_api_v2_endpoint_LbEndpoint* lb_endpoint, ServerAddressList* list) { // Find the ip:port. const envoy_api_v2_endpoint_Endpoint* endpoint = envoy_api_v2_endpoint_LbEndpoint_endpoint(lb_endpoint); const envoy_api_v2_core_Address* address = envoy_api_v2_endpoint_Endpoint_address(endpoint); const envoy_api_v2_core_SocketAddress* socket_address = envoy_api_v2_core_Address_socket_address(address); upb_strview address_strview = envoy_api_v2_core_SocketAddress_address(socket_address); uint32_t port = envoy_api_v2_core_SocketAddress_port_value(socket_address); if (GPR_UNLIKELY(port >> 16) != 0) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Invalid port."); } // Populate grpc_resolved_address. grpc_resolved_address addr; char* address_str = static_cast(gpr_malloc(address_strview.size + 1)); memcpy(address_str, address_strview.data, address_strview.size); address_str[address_strview.size] = '\0'; grpc_string_to_sockaddr(&addr, address_str, port); gpr_free(address_str); // Append the address to the list. list->emplace_back(addr, nullptr); return GRPC_ERROR_NONE; } namespace { UniquePtr StringCopy(const upb_strview& strview) { char* str = static_cast(gpr_malloc(strview.size + 1)); memcpy(str, strview.data, strview.size); str[strview.size] = '\0'; return UniquePtr(str); } } // namespace grpc_error* LocalityParse( const envoy_api_v2_endpoint_LocalityLbEndpoints* locality_lb_endpoints, XdsLocalityInfo* locality_info) { // Parse LB weight. const google_protobuf_UInt32Value* lb_weight = envoy_api_v2_endpoint_LocalityLbEndpoints_load_balancing_weight( locality_lb_endpoints); // If LB weight is not specified, it means this locality is assigned no load. // TODO(juanlishen): When we support CDS to configure the inter-locality // policy, we should change the LB weight handling. locality_info->lb_weight = lb_weight != nullptr ? google_protobuf_UInt32Value_value(lb_weight) : 0; if (locality_info->lb_weight == 0) return GRPC_ERROR_NONE; // Parse locality name. const envoy_api_v2_core_Locality* locality = envoy_api_v2_endpoint_LocalityLbEndpoints_locality(locality_lb_endpoints); locality_info->locality_name = MakeRefCounted( StringCopy(envoy_api_v2_core_Locality_region(locality)), StringCopy(envoy_api_v2_core_Locality_zone(locality)), StringCopy(envoy_api_v2_core_Locality_sub_zone(locality))); // Parse the addresses. size_t size; const envoy_api_v2_endpoint_LbEndpoint* const* lb_endpoints = envoy_api_v2_endpoint_LocalityLbEndpoints_lb_endpoints( locality_lb_endpoints, &size); for (size_t i = 0; i < size; ++i) { grpc_error* error = ServerAddressParseAndAppend(lb_endpoints[i], &locality_info->serverlist); if (error != GRPC_ERROR_NONE) return error; } // Parse the priority. locality_info->priority = envoy_api_v2_endpoint_LocalityLbEndpoints_priority(locality_lb_endpoints); return GRPC_ERROR_NONE; } grpc_error* DropParseAndAppend( const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* drop_overload, XdsDropConfig* drop_config, bool* drop_all) { // Get the category. upb_strview category = envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_category( drop_overload); if (category.size == 0) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty drop category name"); } // Get the drop rate (per million). const envoy_type_FractionalPercent* drop_percentage = envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_drop_percentage( drop_overload); uint32_t numerator = envoy_type_FractionalPercent_numerator(drop_percentage); const auto denominator = static_cast( envoy_type_FractionalPercent_denominator(drop_percentage)); // Normalize to million. switch (denominator) { case envoy_type_FractionalPercent_HUNDRED: numerator *= 10000; break; case envoy_type_FractionalPercent_TEN_THOUSAND: numerator *= 100; break; case envoy_type_FractionalPercent_MILLION: break; default: return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unknown denominator type"); } // Cap numerator to 1000000. numerator = GPR_MIN(numerator, 1000000); if (numerator == 1000000) *drop_all = true; drop_config->AddCategory(StringCopy(category), numerator); return GRPC_ERROR_NONE; } } // namespace grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response, XdsUpdate* update) { upb::Arena arena; // Decode the response. const envoy_api_v2_DiscoveryResponse* response = envoy_api_v2_DiscoveryResponse_parse( reinterpret_cast(GRPC_SLICE_START_PTR(encoded_response)), GRPC_SLICE_LENGTH(encoded_response), arena.ptr()); // Parse the response. if (response == nullptr) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found."); } // Check the type_url of the response. upb_strview type_url = envoy_api_v2_DiscoveryResponse_type_url(response); upb_strview expected_type_url = upb_strview_makez(kEdsTypeUrl); if (!upb_strview_eql(type_url, expected_type_url)) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS."); } // Get the resources from the response. size_t size; const google_protobuf_Any* const* resources = envoy_api_v2_DiscoveryResponse_resources(response, &size); if (size < 1) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "EDS response contains 0 resource."); } // Check the type_url of the resource. type_url = google_protobuf_Any_type_url(resources[0]); if (!upb_strview_eql(type_url, expected_type_url)) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS."); } // Get the cluster_load_assignment. upb_strview encoded_cluster_load_assignment = google_protobuf_Any_value(resources[0]); envoy_api_v2_ClusterLoadAssignment* cluster_load_assignment = envoy_api_v2_ClusterLoadAssignment_parse( encoded_cluster_load_assignment.data, encoded_cluster_load_assignment.size, arena.ptr()); // Get the endpoints. const envoy_api_v2_endpoint_LocalityLbEndpoints* const* endpoints = envoy_api_v2_ClusterLoadAssignment_endpoints(cluster_load_assignment, &size); for (size_t i = 0; i < size; ++i) { XdsLocalityInfo locality_info; grpc_error* error = LocalityParse(endpoints[i], &locality_info); if (error != GRPC_ERROR_NONE) return error; // Filter out locality with weight 0. if (locality_info.lb_weight == 0) continue; update->locality_list.push_back(std::move(locality_info)); } // The locality list is sorted here into deterministic order so that it's // easier to check if two locality lists contain the same set of localities. std::sort(update->locality_list.data(), update->locality_list.data() + update->locality_list.size(), XdsLocalityInfo::Less()); // Get the drop config. update->drop_config = MakeRefCounted(); const envoy_api_v2_ClusterLoadAssignment_Policy* policy = envoy_api_v2_ClusterLoadAssignment_policy(cluster_load_assignment); if (policy != nullptr) { const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* const* drop_overload = envoy_api_v2_ClusterLoadAssignment_Policy_drop_overloads(policy, &size); for (size_t i = 0; i < size; ++i) { grpc_error* error = DropParseAndAppend( drop_overload[i], update->drop_config.get(), &update->drop_all); if (error != GRPC_ERROR_NONE) return error; } } return GRPC_ERROR_NONE; } namespace { grpc_slice LrsRequestEncode( const envoy_service_load_stats_v2_LoadStatsRequest* request, upb_arena* arena) { size_t output_length; char* output = envoy_service_load_stats_v2_LoadStatsRequest_serialize( request, arena, &output_length); return grpc_slice_from_copied_buffer(output, output_length); } } // namespace grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name) { upb::Arena arena; // Create a request. envoy_service_load_stats_v2_LoadStatsRequest* request = envoy_service_load_stats_v2_LoadStatsRequest_new(arena.ptr()); // Add cluster stats. There is only one because we only use one server name in // one channel. envoy_api_v2_endpoint_ClusterStats* cluster_stats = envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats( request, arena.ptr()); // Set the cluster name. envoy_api_v2_endpoint_ClusterStats_set_cluster_name( cluster_stats, upb_strview_makez(server_name)); return LrsRequestEncode(request, arena.ptr()); } namespace { void LocalityStatsPopulate(envoy_api_v2_endpoint_UpstreamLocalityStats* output, #if GRPC_USE_CPP_STD_LIB // TODO(veblush): Clean up this // This is to address the difference between // std::map and Map. #else block will be gone // once using stdlib is enabled by default. Pair, #else Pair, #endif XdsClientStats::LocalityStats::Snapshot>& input, upb_arena* arena) { // Set sub_zone. envoy_api_v2_core_Locality* locality = envoy_api_v2_endpoint_UpstreamLocalityStats_mutable_locality(output, arena); envoy_api_v2_core_Locality_set_sub_zone( locality, upb_strview_makez(input.first->sub_zone())); // Set total counts. XdsClientStats::LocalityStats::Snapshot& snapshot = input.second; envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_successful_requests( output, snapshot.total_successful_requests); envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_requests_in_progress( output, snapshot.total_requests_in_progress); envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_error_requests( output, snapshot.total_error_requests); envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_issued_requests( output, snapshot.total_issued_requests); // Add load metric stats. for (auto& p : snapshot.load_metric_stats) { const char* metric_name = p.first.get(); const XdsClientStats::LocalityStats::LoadMetric::Snapshot& metric_value = p.second; envoy_api_v2_endpoint_EndpointLoadMetricStats* load_metric = envoy_api_v2_endpoint_UpstreamLocalityStats_add_load_metric_stats( output, arena); envoy_api_v2_endpoint_EndpointLoadMetricStats_set_metric_name( load_metric, upb_strview_makez(metric_name)); envoy_api_v2_endpoint_EndpointLoadMetricStats_set_num_requests_finished_with_metric( load_metric, metric_value.num_requests_finished_with_metric); envoy_api_v2_endpoint_EndpointLoadMetricStats_set_total_metric_value( load_metric, metric_value.total_metric_value); } } } // namespace grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name, XdsClientStats* client_stats) { upb::Arena arena; XdsClientStats::Snapshot snapshot = client_stats->GetSnapshotAndReset(); // Prune unused locality stats. client_stats->PruneLocalityStats(); // When all the counts are zero, return empty slice. if (snapshot.IsAllZero()) return grpc_empty_slice(); // Create a request. envoy_service_load_stats_v2_LoadStatsRequest* request = envoy_service_load_stats_v2_LoadStatsRequest_new(arena.ptr()); // Add cluster stats. There is only one because we only use one server name in // one channel. envoy_api_v2_endpoint_ClusterStats* cluster_stats = envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats( request, arena.ptr()); // Set the cluster name. envoy_api_v2_endpoint_ClusterStats_set_cluster_name( cluster_stats, upb_strview_makez(server_name)); // Add locality stats. for (auto& p : snapshot.upstream_locality_stats) { envoy_api_v2_endpoint_UpstreamLocalityStats* locality_stats = envoy_api_v2_endpoint_ClusterStats_add_upstream_locality_stats( cluster_stats, arena.ptr()); LocalityStatsPopulate(locality_stats, p, arena.ptr()); } // Add dropped requests. for (auto& p : snapshot.dropped_requests) { const char* category = p.first.get(); const uint64_t count = p.second; envoy_api_v2_endpoint_ClusterStats_DroppedRequests* dropped_requests = envoy_api_v2_endpoint_ClusterStats_add_dropped_requests(cluster_stats, arena.ptr()); envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_category( dropped_requests, upb_strview_makez(category)); envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_dropped_count( dropped_requests, count); } // Set total dropped requests. envoy_api_v2_endpoint_ClusterStats_set_total_dropped_requests( cluster_stats, snapshot.total_dropped_requests); // Set real load report interval. gpr_timespec timespec = grpc_millis_to_timespec(snapshot.load_report_interval, GPR_TIMESPAN); google_protobuf_Duration* load_report_interval = envoy_api_v2_endpoint_ClusterStats_mutable_load_report_interval( cluster_stats, arena.ptr()); google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec); google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec); return LrsRequestEncode(request, arena.ptr()); } grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response, grpc_millis* load_reporting_interval, const char* expected_server_name) { upb::Arena arena; // Decode the response. const envoy_service_load_stats_v2_LoadStatsResponse* decoded_response = envoy_service_load_stats_v2_LoadStatsResponse_parse( reinterpret_cast(GRPC_SLICE_START_PTR(encoded_response)), GRPC_SLICE_LENGTH(encoded_response), arena.ptr()); // Parse the response. if (decoded_response == nullptr) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found."); } // Check the cluster size in the response. size_t size; const upb_strview* clusters = envoy_service_load_stats_v2_LoadStatsResponse_clusters(decoded_response, &size); if (size != 1) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "The number of clusters (server names) is not 1."); } // Check the cluster name in the response if (strncmp(expected_server_name, clusters[0].data, clusters[0].size) != 0) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Unexpected cluster (server name)."); } // Get the load report interval. const google_protobuf_Duration* load_reporting_interval_duration = envoy_service_load_stats_v2_LoadStatsResponse_load_reporting_interval( decoded_response); gpr_timespec timespec{ google_protobuf_Duration_seconds(load_reporting_interval_duration), google_protobuf_Duration_nanos(load_reporting_interval_duration), GPR_TIMESPAN}; *load_reporting_interval = gpr_time_to_millis(timespec); return GRPC_ERROR_NONE; } } // namespace grpc_core