3  * Copyright 2016 gRPC authors.
 
   5  * Licensed under the Apache License, Version 2.0 (the "License");
 
   6  * you may not use this file except in compliance with the License.
 
   7  * You may obtain a copy of the License at
 
   9  *     http://www.apache.org/licenses/LICENSE-2.0
 
  11  * Unless required by applicable law or agreed to in writing, software
 
  12  * distributed under the License is distributed on an "AS IS" BASIS,
 
  13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  14  * See the License for the specific language governing permissions and
 
  15  * limitations under the License.
 
  19 #include <grpc/support/port_platform.h>
 
  21 #include "pb_decode.h"
 
  22 #include "pb_encode.h"
 
  23 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
 
  25 #include <grpc/support/alloc.h>
 
  27 /* invoked once for every Server in ServerList */
 
  28 static bool count_serverlist(pb_istream_t* stream, const pb_field_t* field,
 
  30   grpc_grpclb_serverlist* sl = static_cast<grpc_grpclb_serverlist*>(*arg);
 
  31   grpc_grpclb_server server;
 
  32   if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, &server))) {
 
  33     gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
 
  40 typedef struct decode_serverlist_arg {
 
  41   /* The decoding callback is invoked once per server in serverlist. Remember
 
  42    * which index of the serverlist are we currently decoding */
 
  44   /* The decoded serverlist */
 
  45   grpc_grpclb_serverlist* serverlist;
 
  46 } decode_serverlist_arg;
 
  48 /* invoked once for every Server in ServerList */
 
  49 static bool decode_serverlist(pb_istream_t* stream, const pb_field_t* field,
 
  51   decode_serverlist_arg* dec_arg = static_cast<decode_serverlist_arg*>(*arg);
 
  52   GPR_ASSERT(dec_arg->serverlist->num_servers >= dec_arg->decoding_idx);
 
  53   grpc_grpclb_server* server =
 
  54       static_cast<grpc_grpclb_server*>(gpr_zalloc(sizeof(grpc_grpclb_server)));
 
  55   if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, server))) {
 
  57     gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
 
  60   dec_arg->serverlist->servers[dec_arg->decoding_idx++] = server;
 
  64 grpc_grpclb_request* grpc_grpclb_request_create(const char* lb_service_name) {
 
  65   grpc_grpclb_request* req = static_cast<grpc_grpclb_request*>(
 
  66       gpr_malloc(sizeof(grpc_grpclb_request)));
 
  67   req->has_client_stats = false;
 
  68   req->has_initial_request = true;
 
  69   req->initial_request.has_name = true;
 
  70   // GCC warns (-Wstringop-truncation) because the destination
 
  71   // buffer size is identical to max-size, leading to a potential
 
  72   // char[] with no null terminator.  nanopb can handle it fine,
 
  73   // and parantheses around strncpy silence that compiler warning.
 
  74   (strncpy(req->initial_request.name, lb_service_name,
 
  75            GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH));
 
  79 static void populate_timestamp(gpr_timespec timestamp,
 
  80                                grpc_grpclb_timestamp* timestamp_pb) {
 
  81   timestamp_pb->has_seconds = true;
 
  82   timestamp_pb->seconds = timestamp.tv_sec;
 
  83   timestamp_pb->has_nanos = true;
 
  84   timestamp_pb->nanos = timestamp.tv_nsec;
 
  87 static bool encode_string(pb_ostream_t* stream, const pb_field_t* field,
 
  89   char* str = static_cast<char*>(*arg);
 
  90   if (!pb_encode_tag_for_field(stream, field)) return false;
 
  91   return pb_encode_string(stream, reinterpret_cast<uint8_t*>(str), strlen(str));
 
  94 static bool encode_drops(pb_ostream_t* stream, const pb_field_t* field,
 
  96   grpc_core::GrpcLbClientStats::DroppedCallCounts* drop_entries =
 
  97       static_cast<grpc_core::GrpcLbClientStats::DroppedCallCounts*>(*arg);
 
  98   if (drop_entries == nullptr) return true;
 
  99   for (size_t i = 0; i < drop_entries->size(); ++i) {
 
 100     if (!pb_encode_tag_for_field(stream, field)) return false;
 
 101     grpc_lb_v1_ClientStatsPerToken drop_message;
 
 102     drop_message.load_balance_token.funcs.encode = encode_string;
 
 103     drop_message.load_balance_token.arg = (*drop_entries)[i].token.get();
 
 104     drop_message.has_num_calls = true;
 
 105     drop_message.num_calls = (*drop_entries)[i].count;
 
 106     if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields,
 
 114 grpc_grpclb_request* grpc_grpclb_load_report_request_create(
 
 115     grpc_core::GrpcLbClientStats* client_stats) {
 
 116   grpc_grpclb_request* req = static_cast<grpc_grpclb_request*>(
 
 117       gpr_zalloc(sizeof(grpc_grpclb_request)));
 
 118   req->has_client_stats = true;
 
 119   req->client_stats.has_timestamp = true;
 
 120   populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp);
 
 121   req->client_stats.has_num_calls_started = true;
 
 122   req->client_stats.has_num_calls_finished = true;
 
 123   req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
 
 124   req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
 
 125   req->client_stats.has_num_calls_finished_known_received = true;
 
 126   req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops;
 
 127   grpc_core::UniquePtr<grpc_core::GrpcLbClientStats::DroppedCallCounts>
 
 130       &req->client_stats.num_calls_started,
 
 131       &req->client_stats.num_calls_finished,
 
 132       &req->client_stats.num_calls_finished_with_client_failed_to_send,
 
 133       &req->client_stats.num_calls_finished_known_received, &drop_counts);
 
 134   // Will be deleted in grpc_grpclb_request_destroy().
 
 135   req->client_stats.calls_finished_with_drop.arg = drop_counts.release();
 
 139 grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request* request) {
 
 140   size_t encoded_length;
 
 141   pb_ostream_t sizestream;
 
 142   pb_ostream_t outputstream;
 
 144   memset(&sizestream, 0, sizeof(pb_ostream_t));
 
 145   pb_encode(&sizestream, grpc_lb_v1_LoadBalanceRequest_fields, request);
 
 146   encoded_length = sizestream.bytes_written;
 
 148   slice = GRPC_SLICE_MALLOC(encoded_length);
 
 150       pb_ostream_from_buffer(GRPC_SLICE_START_PTR(slice), encoded_length);
 
 151   GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v1_LoadBalanceRequest_fields,
 
 156 void grpc_grpclb_request_destroy(grpc_grpclb_request* request) {
 
 157   if (request->has_client_stats) {
 
 158     grpc_core::GrpcLbClientStats::DroppedCallCounts* drop_entries =
 
 159         static_cast<grpc_core::GrpcLbClientStats::DroppedCallCounts*>(
 
 160             request->client_stats.calls_finished_with_drop.arg);
 
 161     grpc_core::Delete(drop_entries);
 
 166 typedef grpc_lb_v1_LoadBalanceResponse grpc_grpclb_response;
 
 167 grpc_grpclb_initial_response* grpc_grpclb_initial_response_parse(
 
 168     const grpc_slice& encoded_grpc_grpclb_response) {
 
 169   pb_istream_t stream = pb_istream_from_buffer(
 
 170       const_cast<uint8_t*>(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response)),
 
 171       GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response));
 
 172   grpc_grpclb_response res;
 
 173   memset(&res, 0, sizeof(grpc_grpclb_response));
 
 175           !pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res))) {
 
 176     gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
 
 180   if (!res.has_initial_response) return nullptr;
 
 182   grpc_grpclb_initial_response* initial_res =
 
 183       static_cast<grpc_grpclb_initial_response*>(
 
 184           gpr_malloc(sizeof(grpc_grpclb_initial_response)));
 
 185   memcpy(initial_res, &res.initial_response,
 
 186          sizeof(grpc_grpclb_initial_response));
 
 191 grpc_grpclb_serverlist* grpc_grpclb_response_parse_serverlist(
 
 192     const grpc_slice& encoded_grpc_grpclb_response) {
 
 193   pb_istream_t stream = pb_istream_from_buffer(
 
 194       const_cast<uint8_t*>(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response)),
 
 195       GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response));
 
 196   pb_istream_t stream_at_start = stream;
 
 197   grpc_grpclb_serverlist* sl = static_cast<grpc_grpclb_serverlist*>(
 
 198       gpr_zalloc(sizeof(grpc_grpclb_serverlist)));
 
 199   grpc_grpclb_response res;
 
 200   memset(&res, 0, sizeof(grpc_grpclb_response));
 
 201   // First pass: count number of servers.
 
 202   res.server_list.servers.funcs.decode = count_serverlist;
 
 203   res.server_list.servers.arg = sl;
 
 204   bool status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
 
 205   if (GPR_UNLIKELY(!status)) {
 
 207     gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
 
 210   // Second pass: populate servers.
 
 211   if (sl->num_servers > 0) {
 
 212     sl->servers = static_cast<grpc_grpclb_server**>(
 
 213         gpr_zalloc(sizeof(grpc_grpclb_server*) * sl->num_servers));
 
 214     decode_serverlist_arg decode_arg;
 
 215     memset(&decode_arg, 0, sizeof(decode_arg));
 
 216     decode_arg.serverlist = sl;
 
 217     res.server_list.servers.funcs.decode = decode_serverlist;
 
 218     res.server_list.servers.arg = &decode_arg;
 
 219     status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields,
 
 221     if (GPR_UNLIKELY(!status)) {
 
 222       grpc_grpclb_destroy_serverlist(sl);
 
 223       gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
 
 230 void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist* serverlist) {
 
 231   if (serverlist == nullptr) {
 
 234   for (size_t i = 0; i < serverlist->num_servers; i++) {
 
 235     gpr_free(serverlist->servers[i]);
 
 237   gpr_free(serverlist->servers);
 
 238   gpr_free(serverlist);
 
 241 grpc_grpclb_serverlist* grpc_grpclb_serverlist_copy(
 
 242     const grpc_grpclb_serverlist* sl) {
 
 243   grpc_grpclb_serverlist* copy = static_cast<grpc_grpclb_serverlist*>(
 
 244       gpr_zalloc(sizeof(grpc_grpclb_serverlist)));
 
 245   copy->num_servers = sl->num_servers;
 
 246   copy->servers = static_cast<grpc_grpclb_server**>(
 
 247       gpr_malloc(sizeof(grpc_grpclb_server*) * sl->num_servers));
 
 248   for (size_t i = 0; i < sl->num_servers; i++) {
 
 249     copy->servers[i] = static_cast<grpc_grpclb_server*>(
 
 250         gpr_malloc(sizeof(grpc_grpclb_server)));
 
 251     memcpy(copy->servers[i], sl->servers[i], sizeof(grpc_grpclb_server));
 
 256 bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist* lhs,
 
 257                                    const grpc_grpclb_serverlist* rhs) {
 
 258   if (lhs == nullptr || rhs == nullptr) {
 
 261   if (lhs->num_servers != rhs->num_servers) {
 
 264   for (size_t i = 0; i < lhs->num_servers; i++) {
 
 265     if (!grpc_grpclb_server_equals(lhs->servers[i], rhs->servers[i])) {
 
 272 bool grpc_grpclb_server_equals(const grpc_grpclb_server* lhs,
 
 273                                const grpc_grpclb_server* rhs) {
 
 274   return memcmp(lhs, rhs, sizeof(grpc_grpclb_server)) == 0;
 
 277 int grpc_grpclb_duration_compare(const grpc_grpclb_duration* lhs,
 
 278                                  const grpc_grpclb_duration* rhs) {
 
 279   GPR_ASSERT(lhs && rhs);
 
 280   if (lhs->has_seconds && rhs->has_seconds) {
 
 281     if (lhs->seconds < rhs->seconds) return -1;
 
 282     if (lhs->seconds > rhs->seconds) return 1;
 
 283   } else if (lhs->has_seconds) {
 
 285   } else if (rhs->has_seconds) {
 
 289   GPR_ASSERT(lhs->seconds == rhs->seconds);
 
 290   if (lhs->has_nanos && rhs->has_nanos) {
 
 291     if (lhs->nanos < rhs->nanos) return -1;
 
 292     if (lhs->nanos > rhs->nanos) return 1;
 
 293   } else if (lhs->has_nanos) {
 
 295   } else if (rhs->has_nanos) {
 
 302 grpc_millis grpc_grpclb_duration_to_millis(grpc_grpclb_duration* duration_pb) {
 
 303   return static_cast<grpc_millis>(
 
 304       (duration_pb->has_seconds ? duration_pb->seconds : 0) * GPR_MS_PER_SEC +
 
 305       (duration_pb->has_nanos ? duration_pb->nanos : 0) / GPR_NS_PER_MS);
 
 308 void grpc_grpclb_initial_response_destroy(
 
 309     grpc_grpclb_initial_response* response) {