3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
23 #include <grpc/grpc_security.h>
24 #include <grpc/slice.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/string_util.h>
29 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
30 #include "src/core/ext/filters/client_channel/parse_address.h"
31 #include "src/core/ext/filters/load_reporting/registered_opencensus_objects.h"
32 #include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
33 #include "src/core/lib/channel/channel_args.h"
34 #include "src/core/lib/channel/context.h"
35 #include "src/core/lib/iomgr/resolve_address.h"
36 #include "src/core/lib/iomgr/sockaddr.h"
37 #include "src/core/lib/iomgr/socket_utils.h"
38 #include "src/core/lib/security/context/security_context.h"
39 #include "src/core/lib/slice/slice_internal.h"
40 #include "src/core/lib/surface/call.h"
41 #include "src/core/lib/uri/uri_parser.h"
45 constexpr char kEncodedIpv4AddressLengthString[] = "08";
46 constexpr char kEncodedIpv6AddressLengthString[] = "32";
47 constexpr char kEmptyAddressLengthString[] = "00";
48 constexpr size_t kLengthPrefixSize = 2;
50 grpc_error* ServerLoadReportingChannelData::Init(
51 grpc_channel_element* /* elem */, grpc_channel_element_args* args) {
52 GPR_ASSERT(!args->is_last);
53 // Find and record the peer_identity.
54 const grpc_auth_context* auth_context =
55 grpc_find_auth_context_in_args(args->channel_args);
56 if (auth_context != nullptr &&
57 grpc_auth_context_peer_is_authenticated(auth_context)) {
58 grpc_auth_property_iterator auth_it =
59 grpc_auth_context_peer_identity(auth_context);
60 const grpc_auth_property* auth_property =
61 grpc_auth_property_iterator_next(&auth_it);
62 if (auth_property != nullptr) {
63 peer_identity_ = auth_property->value;
64 peer_identity_len_ = auth_property->value_length;
67 return GRPC_ERROR_NONE;
70 void ServerLoadReportingCallData::Destroy(
71 grpc_call_element* elem, const grpc_call_final_info* final_info,
72 grpc_closure* then_call_closure) {
73 ServerLoadReportingChannelData* chand =
74 reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
75 // Only record an end if we've recorded its corresponding start, which is
76 // indicated by a non-null client_ip_and_lr_token_. Note that it's possible
77 // that we attempt to record the call end before we have recorded the call
78 // start, because the data needed for recording the start comes from the
79 // initial metadata, which may not be ready before the call finishes.
80 if (client_ip_and_lr_token_ != nullptr) {
81 opencensus::stats::Record(
82 {{::grpc::load_reporter::MeasureEndCount(), 1},
83 {::grpc::load_reporter::MeasureEndBytesSent(),
84 final_info->stats.transport_stream_stats.outgoing.data_bytes},
85 {::grpc::load_reporter::MeasureEndBytesReceived(),
86 final_info->stats.transport_stream_stats.incoming.data_bytes},
87 {::grpc::load_reporter::MeasureEndLatencyMs(),
88 gpr_time_to_millis(final_info->stats.latency)}},
89 {{::grpc::load_reporter::TagKeyToken(),
90 {client_ip_and_lr_token_, client_ip_and_lr_token_len_}},
91 {::grpc::load_reporter::TagKeyHost(),
92 {target_host_, target_host_len_}},
93 {::grpc::load_reporter::TagKeyUserId(),
94 {chand->peer_identity(), chand->peer_identity_len()}},
95 {::grpc::load_reporter::TagKeyStatus(),
96 GetStatusTagForStatus(final_info->final_status)}});
97 gpr_free(client_ip_and_lr_token_);
99 gpr_free(target_host_);
100 grpc_slice_unref_internal(service_method_);
103 void ServerLoadReportingCallData::StartTransportStreamOpBatch(
104 grpc_call_element* elem, TransportStreamOpBatch* op) {
105 GPR_TIMER_SCOPE("lr_start_transport_stream_op", 0);
106 if (op->recv_initial_metadata() != nullptr) {
107 // Save some fields to use when initial metadata is ready.
108 peer_string_ = op->get_peer_string();
109 recv_initial_metadata_ =
110 op->op()->payload->recv_initial_metadata.recv_initial_metadata;
111 original_recv_initial_metadata_ready_ = op->recv_initial_metadata_ready();
112 // Substitute the original closure for the wrapper closure.
113 op->set_recv_initial_metadata_ready(&recv_initial_metadata_ready_);
114 } else if (op->send_trailing_metadata() != nullptr) {
116 "server_load_reporting_filter",
117 grpc_metadata_batch_filter(op->send_trailing_metadata()->batch(),
118 SendTrailingMetadataFilter, elem,
119 "send_trailing_metadata filtering error"));
121 grpc_call_next_op(elem, op->op());
124 void ServerLoadReportingCallData::GetCensusSafeClientIpString(
125 char** client_ip_string, size_t* size) {
126 // Find the client URI string.
127 const char* client_uri_str =
128 reinterpret_cast<const char*>(gpr_atm_acq_load(peer_string_));
129 if (client_uri_str == nullptr) {
131 "Unable to extract client URI string (peer string) from gRPC "
133 *client_ip_string = nullptr;
137 // Parse the client URI string into grpc_uri.
138 grpc_uri* client_uri = grpc_uri_parse(client_uri_str, true);
139 if (client_uri == nullptr) {
141 "Unable to parse the client URI string (peer string) to a client "
143 *client_ip_string = nullptr;
147 // Parse the client URI into grpc_resolved_address.
148 grpc_resolved_address resolved_address;
149 bool success = grpc_parse_uri(client_uri, &resolved_address);
150 grpc_uri_destroy(client_uri);
153 "Unable to parse client URI into a grpc_resolved_address.");
154 *client_ip_string = nullptr;
158 // Convert the socket address in the grpc_resolved_address into a hex string
159 // according to the address family.
160 grpc_sockaddr* addr = reinterpret_cast<grpc_sockaddr*>(resolved_address.addr);
161 if (addr->sa_family == GRPC_AF_INET) {
162 grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(addr);
163 gpr_asprintf(client_ip_string, "%08x", grpc_ntohl(addr4->sin_addr.s_addr));
165 } else if (addr->sa_family == GRPC_AF_INET6) {
166 grpc_sockaddr_in6* addr6 = reinterpret_cast<grpc_sockaddr_in6*>(addr);
167 *client_ip_string = static_cast<char*>(gpr_malloc(32 + 1));
168 uint32_t* addr6_next_long = reinterpret_cast<uint32_t*>(&addr6->sin6_addr);
169 for (size_t i = 0; i < 4; ++i) {
170 snprintf(*client_ip_string + 8 * i, 8 + 1, "%08x",
171 grpc_ntohl(*addr6_next_long++));
175 GPR_UNREACHABLE_CODE();
179 void ServerLoadReportingCallData::StoreClientIpAndLrToken(const char* lr_token,
180 size_t lr_token_len) {
182 size_t client_ip_len;
183 GetCensusSafeClientIpString(&client_ip, &client_ip_len);
184 client_ip_and_lr_token_len_ =
185 kLengthPrefixSize + client_ip_len + lr_token_len;
186 client_ip_and_lr_token_ = static_cast<char*>(
187 gpr_zalloc(client_ip_and_lr_token_len_ * sizeof(char)));
188 char* cur_pos = client_ip_and_lr_token_;
189 // Store the IP length prefix.
190 if (client_ip_len == 0) {
191 strncpy(cur_pos, kEmptyAddressLengthString, kLengthPrefixSize);
192 } else if (client_ip_len == 8) {
193 strncpy(cur_pos, kEncodedIpv4AddressLengthString, kLengthPrefixSize);
194 } else if (client_ip_len == 32) {
195 strncpy(cur_pos, kEncodedIpv6AddressLengthString, kLengthPrefixSize);
197 GPR_UNREACHABLE_CODE();
199 cur_pos += kLengthPrefixSize;
201 if (client_ip_len != 0) {
202 strncpy(cur_pos, client_ip, client_ip_len);
205 cur_pos += client_ip_len;
206 // Store the LR token.
207 if (lr_token_len != 0) {
208 strncpy(cur_pos, lr_token, lr_token_len);
210 GPR_ASSERT(cur_pos + lr_token_len - client_ip_and_lr_token_ ==
211 client_ip_and_lr_token_len_);
214 grpc_filtered_mdelem ServerLoadReportingCallData::RecvInitialMetadataFilter(
215 void* user_data, grpc_mdelem md) {
216 grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
217 ServerLoadReportingCallData* calld =
218 reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
219 if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_PATH)) {
220 calld->service_method_ = grpc_slice_ref_internal(GRPC_MDVALUE(md));
221 } else if (calld->target_host_ == nullptr &&
222 grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_AUTHORITY)) {
223 grpc_slice target_host_slice = GRPC_MDVALUE(md);
224 calld->target_host_len_ = GRPC_SLICE_LENGTH(target_host_slice);
225 calld->target_host_ =
226 reinterpret_cast<char*>(gpr_zalloc(calld->target_host_len_));
227 for (size_t i = 0; i < calld->target_host_len_; ++i) {
228 calld->target_host_[i] = static_cast<char>(
229 tolower(GRPC_SLICE_START_PTR(target_host_slice)[i]));
231 } else if (grpc_slice_str_cmp(GRPC_MDKEY(md),
232 grpc_core::kGrpcLbLbTokenMetadataKey) == 0) {
233 if (calld->client_ip_and_lr_token_ == nullptr) {
234 calld->StoreClientIpAndLrToken(
235 reinterpret_cast<const char*> GRPC_SLICE_START_PTR(GRPC_MDVALUE(md)),
236 GRPC_SLICE_LENGTH(GRPC_MDVALUE(md)));
238 return GRPC_FILTERED_REMOVE();
240 return GRPC_FILTERED_MDELEM(md);
243 void ServerLoadReportingCallData::RecvInitialMetadataReady(void* arg,
245 grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(arg);
246 ServerLoadReportingCallData* calld =
247 reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
248 ServerLoadReportingChannelData* chand =
249 reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
250 if (err == GRPC_ERROR_NONE) {
252 "server_load_reporting_filter",
253 grpc_metadata_batch_filter(calld->recv_initial_metadata_,
254 RecvInitialMetadataFilter, elem,
255 "recv_initial_metadata filtering error"));
256 // If the LB token was not found in the recv_initial_metadata, only the
257 // client IP part will be recorded (with an empty LB token).
258 if (calld->client_ip_and_lr_token_ == nullptr) {
259 calld->StoreClientIpAndLrToken(nullptr, 0);
261 opencensus::stats::Record(
262 {{::grpc::load_reporter::MeasureStartCount(), 1}},
263 {{::grpc::load_reporter::TagKeyToken(),
264 {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}},
265 {::grpc::load_reporter::TagKeyHost(),
266 {calld->target_host_, calld->target_host_len_}},
267 {::grpc::load_reporter::TagKeyUserId(),
268 {chand->peer_identity(), chand->peer_identity_len()}}});
270 GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready_,
271 GRPC_ERROR_REF(err));
274 grpc_error* ServerLoadReportingCallData::Init(
275 grpc_call_element* elem, const grpc_call_element_args* args) {
276 service_method_ = grpc_empty_slice();
277 GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
278 elem, grpc_schedule_on_exec_ctx);
279 return GRPC_ERROR_NONE;
282 grpc_filtered_mdelem ServerLoadReportingCallData::SendTrailingMetadataFilter(
283 void* user_data, grpc_mdelem md) {
284 grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
285 ServerLoadReportingCallData* calld =
286 reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
287 ServerLoadReportingChannelData* chand =
288 reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
289 if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_COST_BIN)) {
290 const grpc_slice value = GRPC_MDVALUE(md);
291 const size_t cost_entry_size = GRPC_SLICE_LENGTH(value);
292 if (cost_entry_size < sizeof(double)) {
294 "Cost metadata value too small (%zu bytes) to hold valid data. "
297 return GRPC_FILTERED_REMOVE();
299 const double* cost_entry_ptr =
300 reinterpret_cast<const double*>(GRPC_SLICE_START_PTR(value));
301 double cost_value = *cost_entry_ptr++;
302 const char* cost_name = reinterpret_cast<const char*>(cost_entry_ptr);
303 const size_t cost_name_len = cost_entry_size - sizeof(double);
304 opencensus::stats::Record(
305 {{::grpc::load_reporter::MeasureOtherCallMetric(), cost_value}},
306 {{::grpc::load_reporter::TagKeyToken(),
307 {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}},
308 {::grpc::load_reporter::TagKeyHost(),
309 {calld->target_host_, calld->target_host_len_}},
310 {::grpc::load_reporter::TagKeyUserId(),
311 {chand->peer_identity(), chand->peer_identity_len()}},
312 {::grpc::load_reporter::TagKeyMetricName(),
313 {cost_name, cost_name_len}}});
314 return GRPC_FILTERED_REMOVE();
316 return GRPC_FILTERED_MDELEM(md);
319 const char* ServerLoadReportingCallData::GetStatusTagForStatus(
320 grpc_status_code status) {
323 return ::grpc::load_reporter::kCallStatusOk;
324 case GRPC_STATUS_UNKNOWN:
325 case GRPC_STATUS_DEADLINE_EXCEEDED:
326 case GRPC_STATUS_UNIMPLEMENTED:
327 case GRPC_STATUS_INTERNAL:
328 case GRPC_STATUS_UNAVAILABLE:
329 case GRPC_STATUS_DATA_LOSS:
330 return ::grpc::load_reporter::kCallStatusServerError;
332 return ::grpc::load_reporter::kCallStatusClientError;
337 bool MaybeAddServerLoadReportingFilter(const grpc_channel_args& args) {
338 return grpc_channel_arg_get_bool(
339 grpc_channel_args_find(&args, GRPC_ARG_ENABLE_LOAD_REPORTING), false);
343 // TODO(juanlishen): We should register the filter during grpc initialization
344 // time once OpenCensus is compatible with our build system. For now, we force
345 // registration of the server load reporting filter at static initialization
346 // time if we build with the filter target.
347 struct ServerLoadReportingFilterStaticRegistrar {
348 ServerLoadReportingFilterStaticRegistrar() {
349 static grpc_core::Atomic<bool> registered{false};
350 if (registered.Load(grpc_core::MemoryOrder::ACQUIRE)) return;
351 RegisterChannelFilter<ServerLoadReportingChannelData,
352 ServerLoadReportingCallData>(
353 "server_load_reporting", GRPC_SERVER_CHANNEL, INT_MAX,
354 MaybeAddServerLoadReportingFilter);
355 // Access measures to ensure they are initialized. Otherwise, we can't
356 // create any valid view before the first RPC.
357 ::grpc::load_reporter::MeasureStartCount();
358 ::grpc::load_reporter::MeasureEndCount();
359 ::grpc::load_reporter::MeasureEndBytesSent();
360 ::grpc::load_reporter::MeasureEndBytesReceived();
361 ::grpc::load_reporter::MeasureEndLatencyMs();
362 ::grpc::load_reporter::MeasureOtherCallMetric();
363 registered.Store(true, grpc_core::MemoryOrder::RELEASE);
365 } server_load_reporting_filter_static_registrar;