Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc-cloned / deps / grpc / src / core / ext / filters / client_channel / lb_policy / xds / xds_load_balancer_api.cc
diff --git a/legacy-libs/grpc-cloned/deps/grpc/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc b/legacy-libs/grpc-cloned/deps/grpc/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc
new file mode 100644 (file)
index 0000000..16d5f9a
--- /dev/null
@@ -0,0 +1,455 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <algorithm>
+
+#include <grpc/impl/codegen/log.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+
+#include "envoy/api/v2/core/address.upb.h"
+#include "envoy/api/v2/core/base.upb.h"
+#include "envoy/api/v2/discovery.upb.h"
+#include "envoy/api/v2/eds.upb.h"
+#include "envoy/api/v2/endpoint/endpoint.upb.h"
+#include "envoy/api/v2/endpoint/load_report.upb.h"
+#include "envoy/service/load_stats/v2/lrs.upb.h"
+#include "envoy/type/percent.upb.h"
+#include "google/protobuf/any.upb.h"
+#include "google/protobuf/duration.upb.h"
+#include "google/protobuf/struct.upb.h"
+#include "google/protobuf/timestamp.upb.h"
+#include "google/protobuf/wrappers.upb.h"
+#include "upb/upb.h"
+
+namespace grpc_core {
+
+namespace {
+
+constexpr char kEdsTypeUrl[] =
+    "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
+constexpr char kEndpointRequired[] = "endpointRequired";
+
+}  // namespace
+
+bool XdsDropConfig::ShouldDrop(const UniquePtr<char>** category_name) const {
+  for (size_t i = 0; i < drop_category_list_.size(); ++i) {
+    const auto& drop_category = drop_category_list_[i];
+    // Generate a random number in [0, 1000000).
+    const int random = rand() % 1000000;
+    if (random < drop_category.parts_per_million) {
+      *category_name = &drop_category.name;
+      return true;
+    }
+  }
+  return false;
+}
+
+grpc_slice XdsEdsRequestCreateAndEncode(const char* service_name) {
+  upb::Arena arena;
+  // Create a request.
+  envoy_api_v2_DiscoveryRequest* request =
+      envoy_api_v2_DiscoveryRequest_new(arena.ptr());
+  envoy_api_v2_core_Node* node =
+      envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
+  google_protobuf_Struct* metadata =
+      envoy_api_v2_core_Node_mutable_metadata(node, arena.ptr());
+  google_protobuf_Struct_FieldsEntry* field =
+      google_protobuf_Struct_add_fields(metadata, arena.ptr());
+  google_protobuf_Struct_FieldsEntry_set_key(
+      field, upb_strview_makez(kEndpointRequired));
+  google_protobuf_Value* value =
+      google_protobuf_Struct_FieldsEntry_mutable_value(field, arena.ptr());
+  google_protobuf_Value_set_bool_value(value, true);
+  envoy_api_v2_DiscoveryRequest_add_resource_names(
+      request, upb_strview_makez(service_name), arena.ptr());
+  envoy_api_v2_DiscoveryRequest_set_type_url(request,
+                                             upb_strview_makez(kEdsTypeUrl));
+  // Encode the request.
+  size_t output_length;
+  char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(),
+                                                         &output_length);
+  return grpc_slice_from_copied_buffer(output, output_length);
+}
+
+namespace {
+
+grpc_error* ServerAddressParseAndAppend(
+    const envoy_api_v2_endpoint_LbEndpoint* lb_endpoint,
+    ServerAddressList* list) {
+  // Find the ip:port.
+  const envoy_api_v2_endpoint_Endpoint* endpoint =
+      envoy_api_v2_endpoint_LbEndpoint_endpoint(lb_endpoint);
+  const envoy_api_v2_core_Address* address =
+      envoy_api_v2_endpoint_Endpoint_address(endpoint);
+  const envoy_api_v2_core_SocketAddress* socket_address =
+      envoy_api_v2_core_Address_socket_address(address);
+  upb_strview address_strview =
+      envoy_api_v2_core_SocketAddress_address(socket_address);
+  uint32_t port = envoy_api_v2_core_SocketAddress_port_value(socket_address);
+  if (GPR_UNLIKELY(port >> 16) != 0) {
+    return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Invalid port.");
+  }
+  // Populate grpc_resolved_address.
+  grpc_resolved_address addr;
+  char* address_str = static_cast<char*>(gpr_malloc(address_strview.size + 1));
+  memcpy(address_str, address_strview.data, address_strview.size);
+  address_str[address_strview.size] = '\0';
+  grpc_string_to_sockaddr(&addr, address_str, port);
+  gpr_free(address_str);
+  // Append the address to the list.
+  list->emplace_back(addr, nullptr);
+  return GRPC_ERROR_NONE;
+}
+
+namespace {
+
+UniquePtr<char> StringCopy(const upb_strview& strview) {
+  char* str = static_cast<char*>(gpr_malloc(strview.size + 1));
+  memcpy(str, strview.data, strview.size);
+  str[strview.size] = '\0';
+  return UniquePtr<char>(str);
+}
+
+}  // namespace
+
+grpc_error* LocalityParse(
+    const envoy_api_v2_endpoint_LocalityLbEndpoints* locality_lb_endpoints,
+    XdsLocalityInfo* locality_info) {
+  // Parse LB weight.
+  const google_protobuf_UInt32Value* lb_weight =
+      envoy_api_v2_endpoint_LocalityLbEndpoints_load_balancing_weight(
+          locality_lb_endpoints);
+  // If LB weight is not specified, it means this locality is assigned no load.
+  // TODO(juanlishen): When we support CDS to configure the inter-locality
+  // policy, we should change the LB weight handling.
+  locality_info->lb_weight =
+      lb_weight != nullptr ? google_protobuf_UInt32Value_value(lb_weight) : 0;
+  if (locality_info->lb_weight == 0) return GRPC_ERROR_NONE;
+  // Parse locality name.
+  const envoy_api_v2_core_Locality* locality =
+      envoy_api_v2_endpoint_LocalityLbEndpoints_locality(locality_lb_endpoints);
+  locality_info->locality_name = MakeRefCounted<XdsLocalityName>(
+      StringCopy(envoy_api_v2_core_Locality_region(locality)),
+      StringCopy(envoy_api_v2_core_Locality_zone(locality)),
+      StringCopy(envoy_api_v2_core_Locality_sub_zone(locality)));
+  // Parse the addresses.
+  size_t size;
+  const envoy_api_v2_endpoint_LbEndpoint* const* lb_endpoints =
+      envoy_api_v2_endpoint_LocalityLbEndpoints_lb_endpoints(
+          locality_lb_endpoints, &size);
+  for (size_t i = 0; i < size; ++i) {
+    grpc_error* error = ServerAddressParseAndAppend(lb_endpoints[i],
+                                                    &locality_info->serverlist);
+    if (error != GRPC_ERROR_NONE) return error;
+  }
+  // Parse the priority.
+  locality_info->priority =
+      envoy_api_v2_endpoint_LocalityLbEndpoints_priority(locality_lb_endpoints);
+  return GRPC_ERROR_NONE;
+}
+
+grpc_error* DropParseAndAppend(
+    const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* drop_overload,
+    XdsDropConfig* drop_config, bool* drop_all) {
+  // Get the category.
+  upb_strview category =
+      envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_category(
+          drop_overload);
+  if (category.size == 0) {
+    return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty drop category name");
+  }
+  // Get the drop rate (per million).
+  const envoy_type_FractionalPercent* drop_percentage =
+      envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_drop_percentage(
+          drop_overload);
+  uint32_t numerator = envoy_type_FractionalPercent_numerator(drop_percentage);
+  const auto denominator =
+      static_cast<envoy_type_FractionalPercent_DenominatorType>(
+          envoy_type_FractionalPercent_denominator(drop_percentage));
+  // Normalize to million.
+  switch (denominator) {
+    case envoy_type_FractionalPercent_HUNDRED:
+      numerator *= 10000;
+      break;
+    case envoy_type_FractionalPercent_TEN_THOUSAND:
+      numerator *= 100;
+      break;
+    case envoy_type_FractionalPercent_MILLION:
+      break;
+    default:
+      return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unknown denominator type");
+  }
+  // Cap numerator to 1000000.
+  numerator = GPR_MIN(numerator, 1000000);
+  if (numerator == 1000000) *drop_all = true;
+  drop_config->AddCategory(StringCopy(category), numerator);
+  return GRPC_ERROR_NONE;
+}
+
+}  // namespace
+
+grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response,
+                                         XdsUpdate* update) {
+  upb::Arena arena;
+  // Decode the response.
+  const envoy_api_v2_DiscoveryResponse* response =
+      envoy_api_v2_DiscoveryResponse_parse(
+          reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(encoded_response)),
+          GRPC_SLICE_LENGTH(encoded_response), arena.ptr());
+  // Parse the response.
+  if (response == nullptr) {
+    return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found.");
+  }
+  // Check the type_url of the response.
+  upb_strview type_url = envoy_api_v2_DiscoveryResponse_type_url(response);
+  upb_strview expected_type_url = upb_strview_makez(kEdsTypeUrl);
+  if (!upb_strview_eql(type_url, expected_type_url)) {
+    return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS.");
+  }
+  // Get the resources from the response.
+  size_t size;
+  const google_protobuf_Any* const* resources =
+      envoy_api_v2_DiscoveryResponse_resources(response, &size);
+  if (size < 1) {
+    return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "EDS response contains 0 resource.");
+  }
+  // Check the type_url of the resource.
+  type_url = google_protobuf_Any_type_url(resources[0]);
+  if (!upb_strview_eql(type_url, expected_type_url)) {
+    return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS.");
+  }
+  // Get the cluster_load_assignment.
+  upb_strview encoded_cluster_load_assignment =
+      google_protobuf_Any_value(resources[0]);
+  envoy_api_v2_ClusterLoadAssignment* cluster_load_assignment =
+      envoy_api_v2_ClusterLoadAssignment_parse(
+          encoded_cluster_load_assignment.data,
+          encoded_cluster_load_assignment.size, arena.ptr());
+  // Get the endpoints.
+  const envoy_api_v2_endpoint_LocalityLbEndpoints* const* endpoints =
+      envoy_api_v2_ClusterLoadAssignment_endpoints(cluster_load_assignment,
+                                                   &size);
+  for (size_t i = 0; i < size; ++i) {
+    XdsLocalityInfo locality_info;
+    grpc_error* error = LocalityParse(endpoints[i], &locality_info);
+    if (error != GRPC_ERROR_NONE) return error;
+    // Filter out locality with weight 0.
+    if (locality_info.lb_weight == 0) continue;
+    update->locality_list.push_back(std::move(locality_info));
+  }
+  // The locality list is sorted here into deterministic order so that it's
+  // easier to check if two locality lists contain the same set of localities.
+  std::sort(update->locality_list.data(),
+            update->locality_list.data() + update->locality_list.size(),
+            XdsLocalityInfo::Less());
+  // Get the drop config.
+  update->drop_config = MakeRefCounted<XdsDropConfig>();
+  const envoy_api_v2_ClusterLoadAssignment_Policy* policy =
+      envoy_api_v2_ClusterLoadAssignment_policy(cluster_load_assignment);
+  if (policy != nullptr) {
+    const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* const*
+        drop_overload =
+            envoy_api_v2_ClusterLoadAssignment_Policy_drop_overloads(policy,
+                                                                     &size);
+    for (size_t i = 0; i < size; ++i) {
+      grpc_error* error = DropParseAndAppend(
+          drop_overload[i], update->drop_config.get(), &update->drop_all);
+      if (error != GRPC_ERROR_NONE) return error;
+    }
+  }
+  return GRPC_ERROR_NONE;
+}
+
+namespace {
+
+grpc_slice LrsRequestEncode(
+    const envoy_service_load_stats_v2_LoadStatsRequest* request,
+    upb_arena* arena) {
+  size_t output_length;
+  char* output = envoy_service_load_stats_v2_LoadStatsRequest_serialize(
+      request, arena, &output_length);
+  return grpc_slice_from_copied_buffer(output, output_length);
+}
+
+}  // namespace
+
+grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name) {
+  upb::Arena arena;
+  // Create a request.
+  envoy_service_load_stats_v2_LoadStatsRequest* request =
+      envoy_service_load_stats_v2_LoadStatsRequest_new(arena.ptr());
+  // Add cluster stats. There is only one because we only use one server name in
+  // one channel.
+  envoy_api_v2_endpoint_ClusterStats* cluster_stats =
+      envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats(
+          request, arena.ptr());
+  // Set the cluster name.
+  envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
+      cluster_stats, upb_strview_makez(server_name));
+  return LrsRequestEncode(request, arena.ptr());
+}
+
+namespace {
+
+void LocalityStatsPopulate(envoy_api_v2_endpoint_UpstreamLocalityStats* output,
+#if GRPC_USE_CPP_STD_LIB
+                           // TODO(veblush): Clean up this
+                           // This is to address the difference between
+                           // std::map and Map. #else block will be gone
+                           // once using stdlib is enabled by default.
+                           Pair<const RefCountedPtr<XdsLocalityName>,
+#else
+                           Pair<RefCountedPtr<XdsLocalityName>,
+#endif
+                                XdsClientStats::LocalityStats::Snapshot>& input,
+                           upb_arena* arena) {
+  // Set sub_zone.
+  envoy_api_v2_core_Locality* locality =
+      envoy_api_v2_endpoint_UpstreamLocalityStats_mutable_locality(output,
+                                                                   arena);
+  envoy_api_v2_core_Locality_set_sub_zone(
+      locality, upb_strview_makez(input.first->sub_zone()));
+  // Set total counts.
+  XdsClientStats::LocalityStats::Snapshot& snapshot = input.second;
+  envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_successful_requests(
+      output, snapshot.total_successful_requests);
+  envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_requests_in_progress(
+      output, snapshot.total_requests_in_progress);
+  envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_error_requests(
+      output, snapshot.total_error_requests);
+  envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_issued_requests(
+      output, snapshot.total_issued_requests);
+  // Add load metric stats.
+  for (auto& p : snapshot.load_metric_stats) {
+    const char* metric_name = p.first.get();
+    const XdsClientStats::LocalityStats::LoadMetric::Snapshot& metric_value =
+        p.second;
+    envoy_api_v2_endpoint_EndpointLoadMetricStats* load_metric =
+        envoy_api_v2_endpoint_UpstreamLocalityStats_add_load_metric_stats(
+            output, arena);
+    envoy_api_v2_endpoint_EndpointLoadMetricStats_set_metric_name(
+        load_metric, upb_strview_makez(metric_name));
+    envoy_api_v2_endpoint_EndpointLoadMetricStats_set_num_requests_finished_with_metric(
+        load_metric, metric_value.num_requests_finished_with_metric);
+    envoy_api_v2_endpoint_EndpointLoadMetricStats_set_total_metric_value(
+        load_metric, metric_value.total_metric_value);
+  }
+}
+
+}  // namespace
+
+grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name,
+                                        XdsClientStats* client_stats) {
+  upb::Arena arena;
+  XdsClientStats::Snapshot snapshot = client_stats->GetSnapshotAndReset();
+  // Prune unused locality stats.
+  client_stats->PruneLocalityStats();
+  // When all the counts are zero, return empty slice.
+  if (snapshot.IsAllZero()) return grpc_empty_slice();
+  // Create a request.
+  envoy_service_load_stats_v2_LoadStatsRequest* request =
+      envoy_service_load_stats_v2_LoadStatsRequest_new(arena.ptr());
+  // Add cluster stats. There is only one because we only use one server name in
+  // one channel.
+  envoy_api_v2_endpoint_ClusterStats* cluster_stats =
+      envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats(
+          request, arena.ptr());
+  // Set the cluster name.
+  envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
+      cluster_stats, upb_strview_makez(server_name));
+  // Add locality stats.
+  for (auto& p : snapshot.upstream_locality_stats) {
+    envoy_api_v2_endpoint_UpstreamLocalityStats* locality_stats =
+        envoy_api_v2_endpoint_ClusterStats_add_upstream_locality_stats(
+            cluster_stats, arena.ptr());
+    LocalityStatsPopulate(locality_stats, p, arena.ptr());
+  }
+  // Add dropped requests.
+  for (auto& p : snapshot.dropped_requests) {
+    const char* category = p.first.get();
+    const uint64_t count = p.second;
+    envoy_api_v2_endpoint_ClusterStats_DroppedRequests* dropped_requests =
+        envoy_api_v2_endpoint_ClusterStats_add_dropped_requests(cluster_stats,
+                                                                arena.ptr());
+    envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_category(
+        dropped_requests, upb_strview_makez(category));
+    envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_dropped_count(
+        dropped_requests, count);
+  }
+  // Set total dropped requests.
+  envoy_api_v2_endpoint_ClusterStats_set_total_dropped_requests(
+      cluster_stats, snapshot.total_dropped_requests);
+  // Set real load report interval.
+  gpr_timespec timespec =
+      grpc_millis_to_timespec(snapshot.load_report_interval, GPR_TIMESPAN);
+  google_protobuf_Duration* load_report_interval =
+      envoy_api_v2_endpoint_ClusterStats_mutable_load_report_interval(
+          cluster_stats, arena.ptr());
+  google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec);
+  google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec);
+  return LrsRequestEncode(request, arena.ptr());
+}
+
+grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response,
+                                         grpc_millis* load_reporting_interval,
+                                         const char* expected_server_name) {
+  upb::Arena arena;
+  // Decode the response.
+  const envoy_service_load_stats_v2_LoadStatsResponse* decoded_response =
+      envoy_service_load_stats_v2_LoadStatsResponse_parse(
+          reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(encoded_response)),
+          GRPC_SLICE_LENGTH(encoded_response), arena.ptr());
+  // Parse the response.
+  if (decoded_response == nullptr) {
+    return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found.");
+  }
+  // Check the cluster size in the response.
+  size_t size;
+  const upb_strview* clusters =
+      envoy_service_load_stats_v2_LoadStatsResponse_clusters(decoded_response,
+                                                             &size);
+  if (size != 1) {
+    return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "The number of clusters (server names) is not 1.");
+  }
+  // Check the cluster name in the response
+  if (strncmp(expected_server_name, clusters[0].data, clusters[0].size) != 0) {
+    return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "Unexpected cluster (server name).");
+  }
+  // Get the load report interval.
+  const google_protobuf_Duration* load_reporting_interval_duration =
+      envoy_service_load_stats_v2_LoadStatsResponse_load_reporting_interval(
+          decoded_response);
+  gpr_timespec timespec{
+      google_protobuf_Duration_seconds(load_reporting_interval_duration),
+      google_protobuf_Duration_nanos(load_reporting_interval_duration),
+      GPR_TIMESPAN};
+  *load_reporting_interval = gpr_time_to_millis(timespec);
+  return GRPC_ERROR_NONE;
+}
+
+}  // namespace grpc_core