3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "pb_decode.h"
22 #include "pb_encode.h"
23 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h"
25 #include <grpc/support/alloc.h>
27 /* invoked once for every Server in ServerList */
28 static bool count_serverlist(pb_istream_t* stream, const pb_field_t* field,
30 xds_grpclb_serverlist* sl = static_cast<xds_grpclb_serverlist*>(*arg);
31 xds_grpclb_server server;
32 if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, &server))) {
33 gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
40 typedef struct decode_serverlist_arg {
41 /* The decoding callback is invoked once per server in serverlist. Remember
42 * which index of the serverlist are we currently decoding */
44 /* The decoded serverlist */
45 xds_grpclb_serverlist* serverlist;
46 } decode_serverlist_arg;
48 /* invoked once for every Server in ServerList */
49 static bool decode_serverlist(pb_istream_t* stream, const pb_field_t* field,
51 decode_serverlist_arg* dec_arg = static_cast<decode_serverlist_arg*>(*arg);
52 GPR_ASSERT(dec_arg->serverlist->num_servers >= dec_arg->decoding_idx);
53 xds_grpclb_server* server =
54 static_cast<xds_grpclb_server*>(gpr_zalloc(sizeof(xds_grpclb_server)));
55 if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, server))) {
57 gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
60 dec_arg->serverlist->servers[dec_arg->decoding_idx++] = server;
64 xds_grpclb_request* xds_grpclb_request_create(const char* lb_service_name) {
65 xds_grpclb_request* req =
66 static_cast<xds_grpclb_request*>(gpr_malloc(sizeof(xds_grpclb_request)));
67 req->has_client_stats = false;
68 req->has_initial_request = true;
69 req->initial_request.has_name = true;
70 // GCC warns (-Wstringop-truncation) because the destination
71 // buffer size is identical to max-size, leading to a potential
72 // char[] with no null terminator. nanopb can handle it fine,
73 // and parantheses around strncpy silence that compiler warning.
74 (strncpy(req->initial_request.name, lb_service_name,
75 XDS_SERVICE_NAME_MAX_LENGTH));
79 static void populate_timestamp(gpr_timespec timestamp,
80 xds_grpclb_timestamp* timestamp_pb) {
81 timestamp_pb->has_seconds = true;
82 timestamp_pb->seconds = timestamp.tv_sec;
83 timestamp_pb->has_nanos = true;
84 timestamp_pb->nanos = timestamp.tv_nsec;
87 static bool encode_string(pb_ostream_t* stream, const pb_field_t* field,
89 char* str = static_cast<char*>(*arg);
90 if (!pb_encode_tag_for_field(stream, field)) return false;
91 return pb_encode_string(stream, reinterpret_cast<uint8_t*>(str), strlen(str));
94 static bool encode_drops(pb_ostream_t* stream, const pb_field_t* field,
96 grpc_core::XdsLbClientStats::DroppedCallCounts* drop_entries =
97 static_cast<grpc_core::XdsLbClientStats::DroppedCallCounts*>(*arg);
98 if (drop_entries == nullptr) return true;
99 for (size_t i = 0; i < drop_entries->size(); ++i) {
100 if (!pb_encode_tag_for_field(stream, field)) return false;
101 grpc_lb_v1_ClientStatsPerToken drop_message;
102 drop_message.load_balance_token.funcs.encode = encode_string;
103 drop_message.load_balance_token.arg = (*drop_entries)[i].token.get();
104 drop_message.has_num_calls = true;
105 drop_message.num_calls = (*drop_entries)[i].count;
106 if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields,
114 xds_grpclb_request* xds_grpclb_load_report_request_create_locked(
115 grpc_core::XdsLbClientStats* client_stats) {
116 xds_grpclb_request* req =
117 static_cast<xds_grpclb_request*>(gpr_zalloc(sizeof(xds_grpclb_request)));
118 req->has_client_stats = true;
119 req->client_stats.has_timestamp = true;
120 populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp);
121 req->client_stats.has_num_calls_started = true;
122 req->client_stats.has_num_calls_finished = true;
123 req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
124 req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
125 req->client_stats.has_num_calls_finished_known_received = true;
126 req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops;
127 grpc_core::UniquePtr<grpc_core::XdsLbClientStats::DroppedCallCounts>
129 client_stats->GetLocked(
130 &req->client_stats.num_calls_started,
131 &req->client_stats.num_calls_finished,
132 &req->client_stats.num_calls_finished_with_client_failed_to_send,
133 &req->client_stats.num_calls_finished_known_received, &drop_counts);
134 // Will be deleted in xds_grpclb_request_destroy().
135 req->client_stats.calls_finished_with_drop.arg = drop_counts.release();
139 grpc_slice xds_grpclb_request_encode(const xds_grpclb_request* request) {
140 size_t encoded_length;
141 pb_ostream_t sizestream;
142 pb_ostream_t outputstream;
144 memset(&sizestream, 0, sizeof(pb_ostream_t));
145 pb_encode(&sizestream, grpc_lb_v1_LoadBalanceRequest_fields, request);
146 encoded_length = sizestream.bytes_written;
148 slice = GRPC_SLICE_MALLOC(encoded_length);
150 pb_ostream_from_buffer(GRPC_SLICE_START_PTR(slice), encoded_length);
151 GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v1_LoadBalanceRequest_fields,
156 void xds_grpclb_request_destroy(xds_grpclb_request* request) {
157 if (request->has_client_stats) {
158 grpc_core::XdsLbClientStats::DroppedCallCounts* drop_entries =
159 static_cast<grpc_core::XdsLbClientStats::DroppedCallCounts*>(
160 request->client_stats.calls_finished_with_drop.arg);
161 grpc_core::Delete(drop_entries);
166 typedef grpc_lb_v1_LoadBalanceResponse xds_grpclb_response;
167 xds_grpclb_initial_response* xds_grpclb_initial_response_parse(
168 const grpc_slice& encoded_xds_grpclb_response) {
169 pb_istream_t stream = pb_istream_from_buffer(
170 const_cast<uint8_t*>(GRPC_SLICE_START_PTR(encoded_xds_grpclb_response)),
171 GRPC_SLICE_LENGTH(encoded_xds_grpclb_response));
172 xds_grpclb_response res;
173 memset(&res, 0, sizeof(xds_grpclb_response));
175 !pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res))) {
176 gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
180 if (!res.has_initial_response) return nullptr;
182 xds_grpclb_initial_response* initial_res =
183 static_cast<xds_grpclb_initial_response*>(
184 gpr_malloc(sizeof(xds_grpclb_initial_response)));
185 memcpy(initial_res, &res.initial_response,
186 sizeof(xds_grpclb_initial_response));
191 xds_grpclb_serverlist* xds_grpclb_response_parse_serverlist(
192 const grpc_slice& encoded_xds_grpclb_response) {
193 pb_istream_t stream = pb_istream_from_buffer(
194 const_cast<uint8_t*>(GRPC_SLICE_START_PTR(encoded_xds_grpclb_response)),
195 GRPC_SLICE_LENGTH(encoded_xds_grpclb_response));
196 pb_istream_t stream_at_start = stream;
197 xds_grpclb_serverlist* sl = static_cast<xds_grpclb_serverlist*>(
198 gpr_zalloc(sizeof(xds_grpclb_serverlist)));
199 xds_grpclb_response res;
200 memset(&res, 0, sizeof(xds_grpclb_response));
201 // First pass: count number of servers.
202 res.server_list.servers.funcs.decode = count_serverlist;
203 res.server_list.servers.arg = sl;
204 bool status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
205 if (GPR_UNLIKELY(!status)) {
207 gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
210 // Second pass: populate servers.
211 if (sl->num_servers > 0) {
212 sl->servers = static_cast<xds_grpclb_server**>(
213 gpr_zalloc(sizeof(xds_grpclb_server*) * sl->num_servers));
214 decode_serverlist_arg decode_arg;
215 memset(&decode_arg, 0, sizeof(decode_arg));
216 decode_arg.serverlist = sl;
217 res.server_list.servers.funcs.decode = decode_serverlist;
218 res.server_list.servers.arg = &decode_arg;
219 status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields,
221 if (GPR_UNLIKELY(!status)) {
222 xds_grpclb_destroy_serverlist(sl);
223 gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
230 void xds_grpclb_destroy_serverlist(xds_grpclb_serverlist* serverlist) {
231 if (serverlist == nullptr) {
234 for (size_t i = 0; i < serverlist->num_servers; i++) {
235 gpr_free(serverlist->servers[i]);
237 gpr_free(serverlist->servers);
238 gpr_free(serverlist);
241 xds_grpclb_serverlist* xds_grpclb_serverlist_copy(
242 const xds_grpclb_serverlist* sl) {
243 xds_grpclb_serverlist* copy = static_cast<xds_grpclb_serverlist*>(
244 gpr_zalloc(sizeof(xds_grpclb_serverlist)));
245 copy->num_servers = sl->num_servers;
246 copy->servers = static_cast<xds_grpclb_server**>(
247 gpr_malloc(sizeof(xds_grpclb_server*) * sl->num_servers));
248 for (size_t i = 0; i < sl->num_servers; i++) {
250 static_cast<xds_grpclb_server*>(gpr_malloc(sizeof(xds_grpclb_server)));
251 memcpy(copy->servers[i], sl->servers[i], sizeof(xds_grpclb_server));
256 bool xds_grpclb_serverlist_equals(const xds_grpclb_serverlist* lhs,
257 const xds_grpclb_serverlist* rhs) {
258 if (lhs == nullptr || rhs == nullptr) {
261 if (lhs->num_servers != rhs->num_servers) {
264 for (size_t i = 0; i < lhs->num_servers; i++) {
265 if (!xds_grpclb_server_equals(lhs->servers[i], rhs->servers[i])) {
272 bool xds_grpclb_server_equals(const xds_grpclb_server* lhs,
273 const xds_grpclb_server* rhs) {
274 return memcmp(lhs, rhs, sizeof(xds_grpclb_server)) == 0;
277 int xds_grpclb_duration_compare(const xds_grpclb_duration* lhs,
278 const xds_grpclb_duration* rhs) {
279 GPR_ASSERT(lhs && rhs);
280 if (lhs->has_seconds && rhs->has_seconds) {
281 if (lhs->seconds < rhs->seconds) return -1;
282 if (lhs->seconds > rhs->seconds) return 1;
283 } else if (lhs->has_seconds) {
285 } else if (rhs->has_seconds) {
289 GPR_ASSERT(lhs->seconds == rhs->seconds);
290 if (lhs->has_nanos && rhs->has_nanos) {
291 if (lhs->nanos < rhs->nanos) return -1;
292 if (lhs->nanos > rhs->nanos) return 1;
293 } else if (lhs->has_nanos) {
295 } else if (rhs->has_nanos) {
302 grpc_millis xds_grpclb_duration_to_millis(xds_grpclb_duration* duration_pb) {
303 return static_cast<grpc_millis>(
304 (duration_pb->has_seconds ? duration_pb->seconds : 0) * GPR_MS_PER_SEC +
305 (duration_pb->has_nanos ? duration_pb->nanos : 0) / GPR_NS_PER_MS);
308 void xds_grpclb_initial_response_destroy(
309 xds_grpclb_initial_response* response) {