Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / ext / filters / http / message_compress / message_compress_filter.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <assert.h>
22 #include <string.h>
23
24 #include <grpc/compression.h>
25 #include <grpc/slice_buffer.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28
29 #include "src/core/ext/filters/http/message_compress/message_compress_filter.h"
30 #include "src/core/lib/channel/channel_args.h"
31 #include "src/core/lib/compression/algorithm_metadata.h"
32 #include "src/core/lib/compression/compression_args.h"
33 #include "src/core/lib/compression/compression_internal.h"
34 #include "src/core/lib/compression/message_compress.h"
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/gprpp/manual_constructor.h"
37 #include "src/core/lib/profiling/timers.h"
38 #include "src/core/lib/slice/slice_internal.h"
39 #include "src/core/lib/slice/slice_string_helpers.h"
40 #include "src/core/lib/surface/call.h"
41 #include "src/core/lib/transport/static_metadata.h"
42
43 static void start_send_message_batch(void* arg, grpc_error* unused);
44 static void send_message_on_complete(void* arg, grpc_error* error);
45 static void on_send_message_next_done(void* arg, grpc_error* error);
46
47 namespace {
48
49 struct channel_data {
50   /** The default, channel-level, compression algorithm */
51   grpc_compression_algorithm default_compression_algorithm;
52   /** Bitset of enabled compression algorithms */
53   uint32_t enabled_compression_algorithms_bitset;
54   /** Bitset of enabled message compression algorithms */
55   uint32_t enabled_message_compression_algorithms_bitset;
56   /** Bitset of enabled stream compression algorithms */
57   uint32_t enabled_stream_compression_algorithms_bitset;
58 };
59
60 struct call_data {
61   call_data(grpc_call_element* elem, const grpc_call_element_args& args)
62       : call_combiner(args.call_combiner) {
63     channel_data* channeld = static_cast<channel_data*>(elem->channel_data);
64     // The call's message compression algorithm is set to channel's default
65     // setting. It can be overridden later by initial metadata.
66     if (GPR_LIKELY(GPR_BITGET(channeld->enabled_compression_algorithms_bitset,
67                               channeld->default_compression_algorithm))) {
68       message_compression_algorithm =
69           grpc_compression_algorithm_to_message_compression_algorithm(
70               channeld->default_compression_algorithm);
71     }
72     GRPC_CLOSURE_INIT(&start_send_message_batch_in_call_combiner,
73                       start_send_message_batch, elem,
74                       grpc_schedule_on_exec_ctx);
75   }
76
77   ~call_data() {
78     if (state_initialized) {
79       grpc_slice_buffer_destroy_internal(&slices);
80     }
81     GRPC_ERROR_UNREF(cancel_error);
82   }
83
84   grpc_core::CallCombiner* call_combiner;
85   grpc_message_compression_algorithm message_compression_algorithm =
86       GRPC_MESSAGE_COMPRESS_NONE;
87   grpc_error* cancel_error = GRPC_ERROR_NONE;
88   grpc_transport_stream_op_batch* send_message_batch = nullptr;
89   bool seen_initial_metadata = false;
90   /* Set to true, if the fields below are initialized. */
91   bool state_initialized = false;
92   grpc_closure start_send_message_batch_in_call_combiner;
93   /* The fields below are only initialized when we compress the payload.
94    * Keep them at the bottom of the struct, so they don't pollute the
95    * cache-lines. */
96   grpc_linked_mdelem message_compression_algorithm_storage;
97   grpc_linked_mdelem stream_compression_algorithm_storage;
98   grpc_linked_mdelem accept_encoding_storage;
99   grpc_linked_mdelem accept_stream_encoding_storage;
100   grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
101   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream>
102       replacement_stream;
103   grpc_closure* original_send_message_on_complete;
104   grpc_closure send_message_on_complete;
105   grpc_closure on_send_message_next_done;
106 };
107
108 }  // namespace
109
110 // Returns true if we should skip message compression for the current message.
111 static bool skip_message_compression(grpc_call_element* elem) {
112   call_data* calld = static_cast<call_data*>(elem->call_data);
113   // If the flags of this message indicate that it shouldn't be compressed, we
114   // skip message compression.
115   uint32_t flags =
116       calld->send_message_batch->payload->send_message.send_message->flags();
117   if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) {
118     return true;
119   }
120   // If this call doesn't have any message compression algorithm set, skip
121   // message compression.
122   return calld->message_compression_algorithm == GRPC_MESSAGE_COMPRESS_NONE;
123 }
124
125 // Determines the compression algorithm from the initial metadata and the
126 // channel's default setting.
127 static grpc_compression_algorithm find_compression_algorithm(
128     grpc_metadata_batch* initial_metadata, channel_data* channeld) {
129   if (initial_metadata->idx.named.grpc_internal_encoding_request == nullptr) {
130     return channeld->default_compression_algorithm;
131   }
132   grpc_compression_algorithm compression_algorithm;
133   // Parse the compression algorithm from the initial metadata.
134   grpc_mdelem md =
135       initial_metadata->idx.named.grpc_internal_encoding_request->md;
136   GPR_ASSERT(grpc_compression_algorithm_parse(GRPC_MDVALUE(md),
137                                               &compression_algorithm));
138   // Remove this metadata since it's an internal one (i.e., it won't be
139   // transmitted out).
140   grpc_metadata_batch_remove(initial_metadata,
141                              GRPC_BATCH_GRPC_INTERNAL_ENCODING_REQUEST);
142   // Check if that algorithm is enabled. Note that GRPC_COMPRESS_NONE is always
143   // enabled.
144   // TODO(juanlishen): Maybe use channel default or abort() if the algorithm
145   // from the initial metadata is disabled.
146   if (GPR_LIKELY(GPR_BITGET(channeld->enabled_compression_algorithms_bitset,
147                             compression_algorithm))) {
148     return compression_algorithm;
149   }
150   const char* algorithm_name;
151   GPR_ASSERT(
152       grpc_compression_algorithm_name(compression_algorithm, &algorithm_name));
153   gpr_log(GPR_ERROR,
154           "Invalid compression algorithm from initial metadata: '%s' "
155           "(previously disabled). "
156           "Will not compress.",
157           algorithm_name);
158   return GRPC_COMPRESS_NONE;
159 }
160
161 static void initialize_state(grpc_call_element* elem, call_data* calld) {
162   GPR_DEBUG_ASSERT(!calld->state_initialized);
163   calld->state_initialized = true;
164   grpc_slice_buffer_init(&calld->slices);
165   GRPC_CLOSURE_INIT(&calld->send_message_on_complete,
166                     ::send_message_on_complete, elem,
167                     grpc_schedule_on_exec_ctx);
168   GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
169                     ::on_send_message_next_done, elem,
170                     grpc_schedule_on_exec_ctx);
171 }
172
173 static grpc_error* process_send_initial_metadata(
174     grpc_call_element* elem,
175     grpc_metadata_batch* initial_metadata) GRPC_MUST_USE_RESULT;
176 static grpc_error* process_send_initial_metadata(
177     grpc_call_element* elem, grpc_metadata_batch* initial_metadata) {
178   call_data* calld = static_cast<call_data*>(elem->call_data);
179   channel_data* channeld = static_cast<channel_data*>(elem->channel_data);
180   // Find the compression algorithm.
181   grpc_compression_algorithm compression_algorithm =
182       find_compression_algorithm(initial_metadata, channeld);
183   // Note that at most one of the following algorithms can be set.
184   calld->message_compression_algorithm =
185       grpc_compression_algorithm_to_message_compression_algorithm(
186           compression_algorithm);
187   grpc_stream_compression_algorithm stream_compression_algorithm =
188       grpc_compression_algorithm_to_stream_compression_algorithm(
189           compression_algorithm);
190   // Hint compression algorithm.
191   grpc_error* error = GRPC_ERROR_NONE;
192   if (calld->message_compression_algorithm != GRPC_MESSAGE_COMPRESS_NONE) {
193     initialize_state(elem, calld);
194     error = grpc_metadata_batch_add_tail(
195         initial_metadata, &calld->message_compression_algorithm_storage,
196         grpc_message_compression_encoding_mdelem(
197             calld->message_compression_algorithm),
198         GRPC_BATCH_GRPC_ENCODING);
199   } else if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) {
200     initialize_state(elem, calld);
201     error = grpc_metadata_batch_add_tail(
202         initial_metadata, &calld->stream_compression_algorithm_storage,
203         grpc_stream_compression_encoding_mdelem(stream_compression_algorithm),
204         GRPC_BATCH_CONTENT_ENCODING);
205   }
206   if (error != GRPC_ERROR_NONE) return error;
207   // Convey supported compression algorithms.
208   error = grpc_metadata_batch_add_tail(
209       initial_metadata, &calld->accept_encoding_storage,
210       GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(
211           channeld->enabled_message_compression_algorithms_bitset),
212       GRPC_BATCH_GRPC_ACCEPT_ENCODING);
213   if (error != GRPC_ERROR_NONE) return error;
214   // Do not overwrite accept-encoding header if it already presents (e.g., added
215   // by some proxy).
216   if (!initial_metadata->idx.named.accept_encoding) {
217     error = grpc_metadata_batch_add_tail(
218         initial_metadata, &calld->accept_stream_encoding_storage,
219         GRPC_MDELEM_ACCEPT_STREAM_ENCODING_FOR_ALGORITHMS(
220             channeld->enabled_stream_compression_algorithms_bitset),
221         GRPC_BATCH_ACCEPT_ENCODING);
222   }
223   return error;
224 }
225
226 static void send_message_on_complete(void* arg, grpc_error* error) {
227   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
228   call_data* calld = static_cast<call_data*>(elem->call_data);
229   grpc_slice_buffer_reset_and_unref_internal(&calld->slices);
230   GRPC_CLOSURE_RUN(calld->original_send_message_on_complete,
231                    GRPC_ERROR_REF(error));
232 }
233
234 static void send_message_batch_continue(grpc_call_element* elem) {
235   call_data* calld = static_cast<call_data*>(elem->call_data);
236   // Note: The call to grpc_call_next_op() results in yielding the
237   // call combiner, so we need to clear calld->send_message_batch
238   // before we do that.
239   grpc_transport_stream_op_batch* send_message_batch =
240       calld->send_message_batch;
241   calld->send_message_batch = nullptr;
242   grpc_call_next_op(elem, send_message_batch);
243 }
244
245 static void finish_send_message(grpc_call_element* elem) {
246   call_data* calld = static_cast<call_data*>(elem->call_data);
247   GPR_DEBUG_ASSERT(calld->message_compression_algorithm !=
248                    GRPC_MESSAGE_COMPRESS_NONE);
249   // Compress the data if appropriate.
250   grpc_slice_buffer tmp;
251   grpc_slice_buffer_init(&tmp);
252   uint32_t send_flags =
253       calld->send_message_batch->payload->send_message.send_message->flags();
254   bool did_compress = grpc_msg_compress(calld->message_compression_algorithm,
255                                         &calld->slices, &tmp);
256   if (did_compress) {
257     if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
258       const char* algo_name;
259       const size_t before_size = calld->slices.length;
260       const size_t after_size = tmp.length;
261       const float savings_ratio = 1.0f - static_cast<float>(after_size) /
262                                              static_cast<float>(before_size);
263       GPR_ASSERT(grpc_message_compression_algorithm_name(
264           calld->message_compression_algorithm, &algo_name));
265       gpr_log(GPR_INFO,
266               "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR
267               " bytes (%.2f%% savings)",
268               algo_name, before_size, after_size, 100 * savings_ratio);
269     }
270     grpc_slice_buffer_swap(&calld->slices, &tmp);
271     send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
272   } else {
273     if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
274       const char* algo_name;
275       GPR_ASSERT(grpc_message_compression_algorithm_name(
276           calld->message_compression_algorithm, &algo_name));
277       gpr_log(GPR_INFO,
278               "Algorithm '%s' enabled but decided not to compress. Input size: "
279               "%" PRIuPTR,
280               algo_name, calld->slices.length);
281     }
282   }
283   grpc_slice_buffer_destroy_internal(&tmp);
284   // Swap out the original byte stream with our new one and send the
285   // batch down.
286   calld->replacement_stream.Init(&calld->slices, send_flags);
287   calld->send_message_batch->payload->send_message.send_message.reset(
288       calld->replacement_stream.get());
289   calld->original_send_message_on_complete =
290       calld->send_message_batch->on_complete;
291   calld->send_message_batch->on_complete = &calld->send_message_on_complete;
292   send_message_batch_continue(elem);
293 }
294
295 static void fail_send_message_batch_in_call_combiner(void* arg,
296                                                      grpc_error* error) {
297   call_data* calld = static_cast<call_data*>(arg);
298   if (calld->send_message_batch != nullptr) {
299     grpc_transport_stream_op_batch_finish_with_failure(
300         calld->send_message_batch, GRPC_ERROR_REF(error), calld->call_combiner);
301     calld->send_message_batch = nullptr;
302   }
303 }
304
305 // Pulls a slice from the send_message byte stream and adds it to calld->slices.
306 static grpc_error* pull_slice_from_send_message(call_data* calld) {
307   grpc_slice incoming_slice;
308   grpc_error* error =
309       calld->send_message_batch->payload->send_message.send_message->Pull(
310           &incoming_slice);
311   if (error == GRPC_ERROR_NONE) {
312     grpc_slice_buffer_add(&calld->slices, incoming_slice);
313   }
314   return error;
315 }
316
317 // Reads as many slices as possible from the send_message byte stream.
318 // If all data has been read, invokes finish_send_message().  Otherwise,
319 // an async call to ByteStream::Next() has been started, which will
320 // eventually result in calling on_send_message_next_done().
321 static void continue_reading_send_message(grpc_call_element* elem) {
322   call_data* calld = static_cast<call_data*>(elem->call_data);
323   while (calld->send_message_batch->payload->send_message.send_message->Next(
324       ~static_cast<size_t>(0), &calld->on_send_message_next_done)) {
325     grpc_error* error = pull_slice_from_send_message(calld);
326     if (error != GRPC_ERROR_NONE) {
327       // Closure callback; does not take ownership of error.
328       fail_send_message_batch_in_call_combiner(calld, error);
329       GRPC_ERROR_UNREF(error);
330       return;
331     }
332     if (calld->slices.length == calld->send_message_batch->payload->send_message
333                                     .send_message->length()) {
334       finish_send_message(elem);
335       break;
336     }
337   }
338 }
339
340 // Async callback for ByteStream::Next().
341 static void on_send_message_next_done(void* arg, grpc_error* error) {
342   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
343   call_data* calld = static_cast<call_data*>(elem->call_data);
344   if (error != GRPC_ERROR_NONE) {
345     // Closure callback; does not take ownership of error.
346     fail_send_message_batch_in_call_combiner(calld, error);
347     return;
348   }
349   error = pull_slice_from_send_message(calld);
350   if (error != GRPC_ERROR_NONE) {
351     // Closure callback; does not take ownership of error.
352     fail_send_message_batch_in_call_combiner(calld, error);
353     GRPC_ERROR_UNREF(error);
354     return;
355   }
356   if (calld->slices.length ==
357       calld->send_message_batch->payload->send_message.send_message->length()) {
358     finish_send_message(elem);
359   } else {
360     continue_reading_send_message(elem);
361   }
362 }
363
364 static void start_send_message_batch(void* arg, grpc_error* unused) {
365   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
366   if (skip_message_compression(elem)) {
367     send_message_batch_continue(elem);
368   } else {
369     continue_reading_send_message(elem);
370   }
371 }
372
373 static void compress_start_transport_stream_op_batch(
374     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
375   GPR_TIMER_SCOPE("compress_start_transport_stream_op_batch", 0);
376   call_data* calld = static_cast<call_data*>(elem->call_data);
377   // Handle cancel_stream.
378   if (batch->cancel_stream) {
379     GRPC_ERROR_UNREF(calld->cancel_error);
380     calld->cancel_error =
381         GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
382     if (calld->send_message_batch != nullptr) {
383       if (!calld->seen_initial_metadata) {
384         GRPC_CALL_COMBINER_START(
385             calld->call_combiner,
386             GRPC_CLOSURE_CREATE(fail_send_message_batch_in_call_combiner, calld,
387                                 grpc_schedule_on_exec_ctx),
388             GRPC_ERROR_REF(calld->cancel_error), "failing send_message op");
389       } else {
390         calld->send_message_batch->payload->send_message.send_message->Shutdown(
391             GRPC_ERROR_REF(calld->cancel_error));
392       }
393     }
394   } else if (calld->cancel_error != GRPC_ERROR_NONE) {
395     grpc_transport_stream_op_batch_finish_with_failure(
396         batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
397     return;
398   }
399   // Handle send_initial_metadata.
400   if (batch->send_initial_metadata) {
401     GPR_ASSERT(!calld->seen_initial_metadata);
402     grpc_error* error = process_send_initial_metadata(
403         elem, batch->payload->send_initial_metadata.send_initial_metadata);
404     if (error != GRPC_ERROR_NONE) {
405       grpc_transport_stream_op_batch_finish_with_failure(batch, error,
406                                                          calld->call_combiner);
407       return;
408     }
409     calld->seen_initial_metadata = true;
410     // If we had previously received a batch containing a send_message op,
411     // handle it now.  Note that we need to re-enter the call combiner
412     // for this, since we can't send two batches down while holding the
413     // call combiner, since the connected_channel filter (at the bottom of
414     // the call stack) will release the call combiner for each batch it sees.
415     if (calld->send_message_batch != nullptr) {
416       GRPC_CALL_COMBINER_START(
417           calld->call_combiner,
418           &calld->start_send_message_batch_in_call_combiner, GRPC_ERROR_NONE,
419           "starting send_message after send_initial_metadata");
420     }
421   }
422   // Handle send_message.
423   if (batch->send_message) {
424     GPR_ASSERT(calld->send_message_batch == nullptr);
425     calld->send_message_batch = batch;
426     // If we have not yet seen send_initial_metadata, then we have to
427     // wait.  We save the batch in calld and then drop the call
428     // combiner, which we'll have to pick up again later when we get
429     // send_initial_metadata.
430     if (!calld->seen_initial_metadata) {
431       GRPC_CALL_COMBINER_STOP(
432           calld->call_combiner,
433           "send_message batch pending send_initial_metadata");
434       return;
435     }
436     start_send_message_batch(elem, GRPC_ERROR_NONE);
437   } else {
438     // Pass control down the stack.
439     grpc_call_next_op(elem, batch);
440   }
441 }
442
443 /* Constructor for call_data */
444 static grpc_error* init_call_elem(grpc_call_element* elem,
445                                   const grpc_call_element_args* args) {
446   new (elem->call_data) call_data(elem, *args);
447   return GRPC_ERROR_NONE;
448 }
449
450 /* Destructor for call_data */
451 static void destroy_call_elem(grpc_call_element* elem,
452                               const grpc_call_final_info* final_info,
453                               grpc_closure* ignored) {
454   call_data* calld = static_cast<call_data*>(elem->call_data);
455   calld->~call_data();
456 }
457
458 /* Constructor for channel_data */
459 static grpc_error* init_channel_elem(grpc_channel_element* elem,
460                                      grpc_channel_element_args* args) {
461   channel_data* channeld = static_cast<channel_data*>(elem->channel_data);
462   // Get the enabled and the default algorithms from channel args.
463   channeld->enabled_compression_algorithms_bitset =
464       grpc_channel_args_compression_algorithm_get_states(args->channel_args);
465   channeld->default_compression_algorithm =
466       grpc_channel_args_get_channel_default_compression_algorithm(
467           args->channel_args);
468   // Make sure the default is enabled.
469   if (!GPR_BITGET(channeld->enabled_compression_algorithms_bitset,
470                   channeld->default_compression_algorithm)) {
471     const char* name;
472     GPR_ASSERT(grpc_compression_algorithm_name(
473                    channeld->default_compression_algorithm, &name) == 1);
474     gpr_log(GPR_ERROR,
475             "default compression algorithm %s not enabled: switching to none",
476             name);
477     channeld->default_compression_algorithm = GRPC_COMPRESS_NONE;
478   }
479   channeld->enabled_message_compression_algorithms_bitset =
480       grpc_compression_bitset_to_message_bitset(
481           channeld->enabled_compression_algorithms_bitset);
482   channeld->enabled_stream_compression_algorithms_bitset =
483       grpc_compression_bitset_to_stream_bitset(
484           channeld->enabled_compression_algorithms_bitset);
485   GPR_ASSERT(!args->is_last);
486   return GRPC_ERROR_NONE;
487 }
488
489 /* Destructor for channel data */
490 static void destroy_channel_elem(grpc_channel_element* elem) {}
491
492 const grpc_channel_filter grpc_message_compress_filter = {
493     compress_start_transport_stream_op_batch,
494     grpc_channel_next_op,
495     sizeof(call_data),
496     init_call_elem,
497     grpc_call_stack_ignore_set_pollset_or_pollset_set,
498     destroy_call_elem,
499     sizeof(channel_data),
500     init_channel_elem,
501     destroy_channel_elem,
502     grpc_channel_next_get_info,
503     "message_compress"};