Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / ext / filters / load_reporting / server_load_reporting_filter.cc
diff --git a/legacy-libs/grpc/deps/grpc/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/legacy-libs/grpc/deps/grpc/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
new file mode 100644 (file)
index 0000000..f48b0f4
--- /dev/null
@@ -0,0 +1,367 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <string.h>
+
+#include <grpc/grpc_security.h>
+#include <grpc/slice.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
+#include "src/core/ext/filters/client_channel/parse_address.h"
+#include "src/core/ext/filters/load_reporting/registered_opencensus_objects.h"
+#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/context.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/socket_utils.h"
+#include "src/core/lib/security/context/security_context.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/surface/call.h"
+#include "src/core/lib/uri/uri_parser.h"
+
+namespace grpc {
+
+constexpr char kEncodedIpv4AddressLengthString[] = "08";
+constexpr char kEncodedIpv6AddressLengthString[] = "32";
+constexpr char kEmptyAddressLengthString[] = "00";
+constexpr size_t kLengthPrefixSize = 2;
+
+grpc_error* ServerLoadReportingChannelData::Init(
+    grpc_channel_element* /* elem */, grpc_channel_element_args* args) {
+  GPR_ASSERT(!args->is_last);
+  // Find and record the peer_identity.
+  const grpc_auth_context* auth_context =
+      grpc_find_auth_context_in_args(args->channel_args);
+  if (auth_context != nullptr &&
+      grpc_auth_context_peer_is_authenticated(auth_context)) {
+    grpc_auth_property_iterator auth_it =
+        grpc_auth_context_peer_identity(auth_context);
+    const grpc_auth_property* auth_property =
+        grpc_auth_property_iterator_next(&auth_it);
+    if (auth_property != nullptr) {
+      peer_identity_ = auth_property->value;
+      peer_identity_len_ = auth_property->value_length;
+    }
+  }
+  return GRPC_ERROR_NONE;
+}
+
+void ServerLoadReportingCallData::Destroy(
+    grpc_call_element* elem, const grpc_call_final_info* final_info,
+    grpc_closure* then_call_closure) {
+  ServerLoadReportingChannelData* chand =
+      reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
+  // Only record an end if we've recorded its corresponding start, which is
+  // indicated by a non-null client_ip_and_lr_token_. Note that it's possible
+  // that we attempt to record the call end before we have recorded the call
+  // start, because the data needed for recording the start comes from the
+  // initial metadata, which may not be ready before the call finishes.
+  if (client_ip_and_lr_token_ != nullptr) {
+    opencensus::stats::Record(
+        {{::grpc::load_reporter::MeasureEndCount(), 1},
+         {::grpc::load_reporter::MeasureEndBytesSent(),
+          final_info->stats.transport_stream_stats.outgoing.data_bytes},
+         {::grpc::load_reporter::MeasureEndBytesReceived(),
+          final_info->stats.transport_stream_stats.incoming.data_bytes},
+         {::grpc::load_reporter::MeasureEndLatencyMs(),
+          gpr_time_to_millis(final_info->stats.latency)}},
+        {{::grpc::load_reporter::TagKeyToken(),
+          {client_ip_and_lr_token_, client_ip_and_lr_token_len_}},
+         {::grpc::load_reporter::TagKeyHost(),
+          {target_host_, target_host_len_}},
+         {::grpc::load_reporter::TagKeyUserId(),
+          {chand->peer_identity(), chand->peer_identity_len()}},
+         {::grpc::load_reporter::TagKeyStatus(),
+          GetStatusTagForStatus(final_info->final_status)}});
+    gpr_free(client_ip_and_lr_token_);
+  }
+  gpr_free(target_host_);
+  grpc_slice_unref_internal(service_method_);
+}
+
+void ServerLoadReportingCallData::StartTransportStreamOpBatch(
+    grpc_call_element* elem, TransportStreamOpBatch* op) {
+  GPR_TIMER_SCOPE("lr_start_transport_stream_op", 0);
+  if (op->recv_initial_metadata() != nullptr) {
+    // Save some fields to use when initial metadata is ready.
+    peer_string_ = op->get_peer_string();
+    recv_initial_metadata_ =
+        op->op()->payload->recv_initial_metadata.recv_initial_metadata;
+    original_recv_initial_metadata_ready_ = op->recv_initial_metadata_ready();
+    // Substitute the original closure for the wrapper closure.
+    op->set_recv_initial_metadata_ready(&recv_initial_metadata_ready_);
+  } else if (op->send_trailing_metadata() != nullptr) {
+    GRPC_LOG_IF_ERROR(
+        "server_load_reporting_filter",
+        grpc_metadata_batch_filter(op->send_trailing_metadata()->batch(),
+                                   SendTrailingMetadataFilter, elem,
+                                   "send_trailing_metadata filtering error"));
+  }
+  grpc_call_next_op(elem, op->op());
+}
+
+void ServerLoadReportingCallData::GetCensusSafeClientIpString(
+    char** client_ip_string, size_t* size) {
+  // Find the client URI string.
+  const char* client_uri_str =
+      reinterpret_cast<const char*>(gpr_atm_acq_load(peer_string_));
+  if (client_uri_str == nullptr) {
+    gpr_log(GPR_ERROR,
+            "Unable to extract client URI string (peer string) from gRPC "
+            "metadata.");
+    *client_ip_string = nullptr;
+    *size = 0;
+    return;
+  }
+  // Parse the client URI string into grpc_uri.
+  grpc_uri* client_uri = grpc_uri_parse(client_uri_str, true);
+  if (client_uri == nullptr) {
+    gpr_log(GPR_ERROR,
+            "Unable to parse the client URI string (peer string) to a client "
+            "URI.");
+    *client_ip_string = nullptr;
+    *size = 0;
+    return;
+  }
+  // Parse the client URI into grpc_resolved_address.
+  grpc_resolved_address resolved_address;
+  bool success = grpc_parse_uri(client_uri, &resolved_address);
+  grpc_uri_destroy(client_uri);
+  if (!success) {
+    gpr_log(GPR_ERROR,
+            "Unable to parse client URI into a grpc_resolved_address.");
+    *client_ip_string = nullptr;
+    *size = 0;
+    return;
+  }
+  // Convert the socket address in the grpc_resolved_address into a hex string
+  // according to the address family.
+  grpc_sockaddr* addr = reinterpret_cast<grpc_sockaddr*>(resolved_address.addr);
+  if (addr->sa_family == GRPC_AF_INET) {
+    grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(addr);
+    gpr_asprintf(client_ip_string, "%08x", grpc_ntohl(addr4->sin_addr.s_addr));
+    *size = 8;
+  } else if (addr->sa_family == GRPC_AF_INET6) {
+    grpc_sockaddr_in6* addr6 = reinterpret_cast<grpc_sockaddr_in6*>(addr);
+    *client_ip_string = static_cast<char*>(gpr_malloc(32 + 1));
+    uint32_t* addr6_next_long = reinterpret_cast<uint32_t*>(&addr6->sin6_addr);
+    for (size_t i = 0; i < 4; ++i) {
+      snprintf(*client_ip_string + 8 * i, 8 + 1, "%08x",
+               grpc_ntohl(*addr6_next_long++));
+    }
+    *size = 32;
+  } else {
+    GPR_UNREACHABLE_CODE();
+  }
+}
+
+void ServerLoadReportingCallData::StoreClientIpAndLrToken(const char* lr_token,
+                                                          size_t lr_token_len) {
+  char* client_ip;
+  size_t client_ip_len;
+  GetCensusSafeClientIpString(&client_ip, &client_ip_len);
+  client_ip_and_lr_token_len_ =
+      kLengthPrefixSize + client_ip_len + lr_token_len;
+  client_ip_and_lr_token_ = static_cast<char*>(
+      gpr_zalloc(client_ip_and_lr_token_len_ * sizeof(char)));
+  char* cur_pos = client_ip_and_lr_token_;
+  // Store the IP length prefix.
+  if (client_ip_len == 0) {
+    strncpy(cur_pos, kEmptyAddressLengthString, kLengthPrefixSize);
+  } else if (client_ip_len == 8) {
+    strncpy(cur_pos, kEncodedIpv4AddressLengthString, kLengthPrefixSize);
+  } else if (client_ip_len == 32) {
+    strncpy(cur_pos, kEncodedIpv6AddressLengthString, kLengthPrefixSize);
+  } else {
+    GPR_UNREACHABLE_CODE();
+  }
+  cur_pos += kLengthPrefixSize;
+  // Store the IP.
+  if (client_ip_len != 0) {
+    strncpy(cur_pos, client_ip, client_ip_len);
+  }
+  gpr_free(client_ip);
+  cur_pos += client_ip_len;
+  // Store the LR token.
+  if (lr_token_len != 0) {
+    strncpy(cur_pos, lr_token, lr_token_len);
+  }
+  GPR_ASSERT(cur_pos + lr_token_len - client_ip_and_lr_token_ ==
+             client_ip_and_lr_token_len_);
+}
+
+grpc_filtered_mdelem ServerLoadReportingCallData::RecvInitialMetadataFilter(
+    void* user_data, grpc_mdelem md) {
+  grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
+  ServerLoadReportingCallData* calld =
+      reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
+  if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_PATH)) {
+    calld->service_method_ = grpc_slice_ref_internal(GRPC_MDVALUE(md));
+  } else if (calld->target_host_ == nullptr &&
+             grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_AUTHORITY)) {
+    grpc_slice target_host_slice = GRPC_MDVALUE(md);
+    calld->target_host_len_ = GRPC_SLICE_LENGTH(target_host_slice);
+    calld->target_host_ =
+        reinterpret_cast<char*>(gpr_zalloc(calld->target_host_len_));
+    for (size_t i = 0; i < calld->target_host_len_; ++i) {
+      calld->target_host_[i] = static_cast<char>(
+          tolower(GRPC_SLICE_START_PTR(target_host_slice)[i]));
+    }
+  } else if (grpc_slice_str_cmp(GRPC_MDKEY(md),
+                                grpc_core::kGrpcLbLbTokenMetadataKey) == 0) {
+    if (calld->client_ip_and_lr_token_ == nullptr) {
+      calld->StoreClientIpAndLrToken(
+          reinterpret_cast<const char*> GRPC_SLICE_START_PTR(GRPC_MDVALUE(md)),
+          GRPC_SLICE_LENGTH(GRPC_MDVALUE(md)));
+    }
+    return GRPC_FILTERED_REMOVE();
+  }
+  return GRPC_FILTERED_MDELEM(md);
+}
+
+void ServerLoadReportingCallData::RecvInitialMetadataReady(void* arg,
+                                                           grpc_error* err) {
+  grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(arg);
+  ServerLoadReportingCallData* calld =
+      reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
+  ServerLoadReportingChannelData* chand =
+      reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
+  if (err == GRPC_ERROR_NONE) {
+    GRPC_LOG_IF_ERROR(
+        "server_load_reporting_filter",
+        grpc_metadata_batch_filter(calld->recv_initial_metadata_,
+                                   RecvInitialMetadataFilter, elem,
+                                   "recv_initial_metadata filtering error"));
+    // If the LB token was not found in the recv_initial_metadata, only the
+    // client IP part will be recorded (with an empty LB token).
+    if (calld->client_ip_and_lr_token_ == nullptr) {
+      calld->StoreClientIpAndLrToken(nullptr, 0);
+    }
+    opencensus::stats::Record(
+        {{::grpc::load_reporter::MeasureStartCount(), 1}},
+        {{::grpc::load_reporter::TagKeyToken(),
+          {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}},
+         {::grpc::load_reporter::TagKeyHost(),
+          {calld->target_host_, calld->target_host_len_}},
+         {::grpc::load_reporter::TagKeyUserId(),
+          {chand->peer_identity(), chand->peer_identity_len()}}});
+  }
+  GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready_,
+                   GRPC_ERROR_REF(err));
+}
+
+grpc_error* ServerLoadReportingCallData::Init(
+    grpc_call_element* elem, const grpc_call_element_args* args) {
+  service_method_ = grpc_empty_slice();
+  GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
+                    elem, grpc_schedule_on_exec_ctx);
+  return GRPC_ERROR_NONE;
+}
+
+grpc_filtered_mdelem ServerLoadReportingCallData::SendTrailingMetadataFilter(
+    void* user_data, grpc_mdelem md) {
+  grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
+  ServerLoadReportingCallData* calld =
+      reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
+  ServerLoadReportingChannelData* chand =
+      reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
+  if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_COST_BIN)) {
+    const grpc_slice value = GRPC_MDVALUE(md);
+    const size_t cost_entry_size = GRPC_SLICE_LENGTH(value);
+    if (cost_entry_size < sizeof(double)) {
+      gpr_log(GPR_ERROR,
+              "Cost metadata value too small (%zu bytes) to hold valid data. "
+              "Ignoring.",
+              cost_entry_size);
+      return GRPC_FILTERED_REMOVE();
+    }
+    const double* cost_entry_ptr =
+        reinterpret_cast<const double*>(GRPC_SLICE_START_PTR(value));
+    double cost_value = *cost_entry_ptr++;
+    const char* cost_name = reinterpret_cast<const char*>(cost_entry_ptr);
+    const size_t cost_name_len = cost_entry_size - sizeof(double);
+    opencensus::stats::Record(
+        {{::grpc::load_reporter::MeasureOtherCallMetric(), cost_value}},
+        {{::grpc::load_reporter::TagKeyToken(),
+          {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}},
+         {::grpc::load_reporter::TagKeyHost(),
+          {calld->target_host_, calld->target_host_len_}},
+         {::grpc::load_reporter::TagKeyUserId(),
+          {chand->peer_identity(), chand->peer_identity_len()}},
+         {::grpc::load_reporter::TagKeyMetricName(),
+          {cost_name, cost_name_len}}});
+    return GRPC_FILTERED_REMOVE();
+  }
+  return GRPC_FILTERED_MDELEM(md);
+}
+
+const char* ServerLoadReportingCallData::GetStatusTagForStatus(
+    grpc_status_code status) {
+  switch (status) {
+    case GRPC_STATUS_OK:
+      return ::grpc::load_reporter::kCallStatusOk;
+    case GRPC_STATUS_UNKNOWN:
+    case GRPC_STATUS_DEADLINE_EXCEEDED:
+    case GRPC_STATUS_UNIMPLEMENTED:
+    case GRPC_STATUS_INTERNAL:
+    case GRPC_STATUS_UNAVAILABLE:
+    case GRPC_STATUS_DATA_LOSS:
+      return ::grpc::load_reporter::kCallStatusServerError;
+    default:
+      return ::grpc::load_reporter::kCallStatusClientError;
+  }
+}
+
+namespace {
+bool MaybeAddServerLoadReportingFilter(const grpc_channel_args& args) {
+  return grpc_channel_arg_get_bool(
+      grpc_channel_args_find(&args, GRPC_ARG_ENABLE_LOAD_REPORTING), false);
+}
+}  // namespace
+
+// TODO(juanlishen): We should register the filter during grpc initialization
+// time once OpenCensus is compatible with our build system. For now, we force
+// registration of the server load reporting filter at static initialization
+// time if we build with the filter target.
+struct ServerLoadReportingFilterStaticRegistrar {
+  ServerLoadReportingFilterStaticRegistrar() {
+    static grpc_core::Atomic<bool> registered{false};
+    if (registered.Load(grpc_core::MemoryOrder::ACQUIRE)) return;
+    RegisterChannelFilter<ServerLoadReportingChannelData,
+                          ServerLoadReportingCallData>(
+        "server_load_reporting", GRPC_SERVER_CHANNEL, INT_MAX,
+        MaybeAddServerLoadReportingFilter);
+    // Access measures to ensure they are initialized. Otherwise, we can't
+    // create any valid view before the first RPC.
+    ::grpc::load_reporter::MeasureStartCount();
+    ::grpc::load_reporter::MeasureEndCount();
+    ::grpc::load_reporter::MeasureEndBytesSent();
+    ::grpc::load_reporter::MeasureEndBytesReceived();
+    ::grpc::load_reporter::MeasureEndLatencyMs();
+    ::grpc::load_reporter::MeasureOtherCallMetric();
+    registered.Store(true, grpc_core::MemoryOrder::RELEASE);
+  }
+} server_load_reporting_filter_static_registrar;
+
+}  // namespace grpc