Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / ext / filters / deadline / deadline_filter.cc
diff --git a/legacy-libs/grpc/deps/grpc/src/core/ext/filters/deadline/deadline_filter.cc b/legacy-libs/grpc/deps/grpc/src/core/ext/filters/deadline/deadline_filter.cc
new file mode 100644 (file)
index 0000000..20a2953
--- /dev/null
@@ -0,0 +1,384 @@
+//
+// Copyright 2016 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/filters/deadline/deadline_filter.h"
+
+#include <stdbool.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
+
+#include "src/core/lib/channel/channel_stack_builder.h"
+#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/surface/channel_init.h"
+
+//
+// grpc_deadline_state
+//
+
+// The on_complete callback used when sending a cancel_error batch down the
+// filter stack.  Yields the call combiner when the batch returns.
+static void yield_call_combiner(void* arg, grpc_error* ignored) {
+  grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
+  GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
+                          "got on_complete from cancel_stream batch");
+  GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "deadline_timer");
+}
+
+// This is called via the call combiner, so access to deadline_state is
+// synchronized.
+static void send_cancel_op_in_call_combiner(void* arg, grpc_error* error) {
+  grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
+  grpc_deadline_state* deadline_state =
+      static_cast<grpc_deadline_state*>(elem->call_data);
+  grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op(
+      GRPC_CLOSURE_INIT(&deadline_state->timer_callback, yield_call_combiner,
+                        deadline_state, grpc_schedule_on_exec_ctx));
+  batch->cancel_stream = true;
+  batch->payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error);
+  elem->filter->start_transport_stream_op_batch(elem, batch);
+}
+
+// Timer callback.
+static void timer_callback(void* arg, grpc_error* error) {
+  grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
+  grpc_deadline_state* deadline_state =
+      static_cast<grpc_deadline_state*>(elem->call_data);
+  if (error != GRPC_ERROR_CANCELLED) {
+    error = grpc_error_set_int(
+        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
+        GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED);
+    deadline_state->call_combiner->Cancel(GRPC_ERROR_REF(error));
+    GRPC_CLOSURE_INIT(&deadline_state->timer_callback,
+                      send_cancel_op_in_call_combiner, elem,
+                      grpc_schedule_on_exec_ctx);
+    GRPC_CALL_COMBINER_START(deadline_state->call_combiner,
+                             &deadline_state->timer_callback, error,
+                             "deadline exceeded -- sending cancel_stream op");
+  } else {
+    GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "deadline_timer");
+  }
+}
+
+// Starts the deadline timer.
+// This is called via the call combiner, so access to deadline_state is
+// synchronized.
+static void start_timer_if_needed(grpc_call_element* elem,
+                                  grpc_millis deadline) {
+  if (deadline == GRPC_MILLIS_INF_FUTURE) {
+    return;
+  }
+  grpc_deadline_state* deadline_state =
+      static_cast<grpc_deadline_state*>(elem->call_data);
+  grpc_closure* closure = nullptr;
+  switch (deadline_state->timer_state) {
+    case GRPC_DEADLINE_STATE_PENDING:
+      // Note: We do not start the timer if there is already a timer
+      return;
+    case GRPC_DEADLINE_STATE_FINISHED:
+      deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING;
+      // If we've already created and destroyed a timer, we always create a
+      // new closure: we have no other guarantee that the inlined closure is
+      // not in use (it may hold a pending call to timer_callback)
+      closure =
+          GRPC_CLOSURE_CREATE(timer_callback, elem, grpc_schedule_on_exec_ctx);
+      break;
+    case GRPC_DEADLINE_STATE_INITIAL:
+      deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING;
+      closure =
+          GRPC_CLOSURE_INIT(&deadline_state->timer_callback, timer_callback,
+                            elem, grpc_schedule_on_exec_ctx);
+      break;
+  }
+  GPR_ASSERT(closure != nullptr);
+  GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
+  grpc_timer_init(&deadline_state->timer, deadline, closure);
+}
+
+// Cancels the deadline timer.
+// This is called via the call combiner, so access to deadline_state is
+// synchronized.
+static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) {
+  if (deadline_state->timer_state == GRPC_DEADLINE_STATE_PENDING) {
+    deadline_state->timer_state = GRPC_DEADLINE_STATE_FINISHED;
+    grpc_timer_cancel(&deadline_state->timer);
+  } else {
+    // timer was either in STATE_INITIAL (nothing to cancel)
+    // OR in STATE_FINISHED (again nothing to cancel)
+  }
+}
+
+// Callback run when we receive trailing metadata.
+static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
+  grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
+  cancel_timer_if_needed(deadline_state);
+  // Invoke the original callback.
+  GRPC_CLOSURE_RUN(deadline_state->original_recv_trailing_metadata_ready,
+                   GRPC_ERROR_REF(error));
+}
+
+// Inject our own recv_trailing_metadata_ready callback into op.
+static void inject_recv_trailing_metadata_ready(
+    grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
+  deadline_state->original_recv_trailing_metadata_ready =
+      op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+  GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
+                    recv_trailing_metadata_ready, deadline_state,
+                    grpc_schedule_on_exec_ctx);
+  op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+      &deadline_state->recv_trailing_metadata_ready;
+}
+
+// Callback and associated state for starting the timer after call stack
+// initialization has been completed.
+struct start_timer_after_init_state {
+  start_timer_after_init_state(grpc_call_element* elem, grpc_millis deadline)
+      : elem(elem), deadline(deadline) {}
+  ~start_timer_after_init_state() { start_timer_if_needed(elem, deadline); }
+
+  bool in_call_combiner = false;
+  grpc_call_element* elem;
+  grpc_millis deadline;
+  grpc_closure closure;
+};
+static void start_timer_after_init(void* arg, grpc_error* error) {
+  struct start_timer_after_init_state* state =
+      static_cast<struct start_timer_after_init_state*>(arg);
+  grpc_deadline_state* deadline_state =
+      static_cast<grpc_deadline_state*>(state->elem->call_data);
+  if (!state->in_call_combiner) {
+    // We are initially called without holding the call combiner, so we
+    // need to bounce ourselves into it.
+    state->in_call_combiner = true;
+    GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &state->closure,
+                             GRPC_ERROR_REF(error),
+                             "scheduling deadline timer");
+    return;
+  }
+  grpc_core::Delete(state);
+  GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
+                          "done scheduling deadline timer");
+}
+
+grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
+                                         grpc_call_stack* call_stack,
+                                         grpc_core::CallCombiner* call_combiner,
+                                         grpc_millis deadline)
+    : call_stack(call_stack), call_combiner(call_combiner) {
+  // Deadline will always be infinite on servers, so the timer will only be
+  // set on clients with a finite deadline.
+  if (deadline != GRPC_MILLIS_INF_FUTURE) {
+    // When the deadline passes, we indicate the failure by sending down
+    // an op with cancel_error set.  However, we can't send down any ops
+    // until after the call stack is fully initialized.  If we start the
+    // timer here, we have no guarantee that the timer won't pop before
+    // call stack initialization is finished.  To avoid that problem, we
+    // create a closure to start the timer, and we schedule that closure
+    // to be run after call stack initialization is done.
+    struct start_timer_after_init_state* state =
+        grpc_core::New<start_timer_after_init_state>(elem, deadline);
+    GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
+                      grpc_schedule_on_exec_ctx);
+    GRPC_CLOSURE_SCHED(&state->closure, GRPC_ERROR_NONE);
+  }
+}
+
+grpc_deadline_state::~grpc_deadline_state() { cancel_timer_if_needed(this); }
+
+void grpc_deadline_state_reset(grpc_call_element* elem,
+                               grpc_millis new_deadline) {
+  grpc_deadline_state* deadline_state =
+      static_cast<grpc_deadline_state*>(elem->call_data);
+  cancel_timer_if_needed(deadline_state);
+  start_timer_if_needed(elem, new_deadline);
+}
+
+void grpc_deadline_state_client_start_transport_stream_op_batch(
+    grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
+  grpc_deadline_state* deadline_state =
+      static_cast<grpc_deadline_state*>(elem->call_data);
+  if (op->cancel_stream) {
+    cancel_timer_if_needed(deadline_state);
+  } else {
+    // Make sure we know when the call is complete, so that we can cancel
+    // the timer.
+    if (op->recv_trailing_metadata) {
+      inject_recv_trailing_metadata_ready(deadline_state, op);
+    }
+  }
+}
+
+//
+// filter code
+//
+
+// Constructor for channel_data.  Used for both client and server filters.
+static grpc_error* init_channel_elem(grpc_channel_element* elem,
+                                     grpc_channel_element_args* args) {
+  GPR_ASSERT(!args->is_last);
+  return GRPC_ERROR_NONE;
+}
+
+// Destructor for channel_data.  Used for both client and server filters.
+static void destroy_channel_elem(grpc_channel_element* elem) {}
+
+// Call data used for both client and server filter.
+typedef struct base_call_data {
+  grpc_deadline_state deadline_state;
+} base_call_data;
+
+// Additional call data used only for the server filter.
+typedef struct server_call_data {
+  base_call_data base;  // Must be first.
+  // The closure for receiving initial metadata.
+  grpc_closure recv_initial_metadata_ready;
+  // Received initial metadata batch.
+  grpc_metadata_batch* recv_initial_metadata;
+  // The original recv_initial_metadata_ready closure, which we chain to
+  // after our own closure is invoked.
+  grpc_closure* next_recv_initial_metadata_ready;
+} server_call_data;
+
+// Constructor for call_data.  Used for both client and server filters.
+static grpc_error* init_call_elem(grpc_call_element* elem,
+                                  const grpc_call_element_args* args) {
+  new (elem->call_data) grpc_deadline_state(
+      elem, args->call_stack, args->call_combiner, args->deadline);
+  return GRPC_ERROR_NONE;
+}
+
+// Destructor for call_data.  Used for both client and server filters.
+static void destroy_call_elem(grpc_call_element* elem,
+                              const grpc_call_final_info* final_info,
+                              grpc_closure* ignored) {
+  grpc_deadline_state* deadline_state =
+      static_cast<grpc_deadline_state*>(elem->call_data);
+  deadline_state->~grpc_deadline_state();
+}
+
+// Method for starting a call op for client filter.
+static void client_start_transport_stream_op_batch(
+    grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
+  grpc_deadline_state_client_start_transport_stream_op_batch(elem, op);
+  // Chain to next filter.
+  grpc_call_next_op(elem, op);
+}
+
+// Callback for receiving initial metadata on the server.
+static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
+  grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
+  server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
+  start_timer_if_needed(elem, calld->recv_initial_metadata->deadline);
+  // Invoke the next callback.
+  GRPC_CLOSURE_RUN(calld->next_recv_initial_metadata_ready,
+                   GRPC_ERROR_REF(error));
+}
+
+// Method for starting a call op for server filter.
+static void server_start_transport_stream_op_batch(
+    grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
+  server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
+  if (op->cancel_stream) {
+    cancel_timer_if_needed(&calld->base.deadline_state);
+  } else {
+    // If we're receiving initial metadata, we need to get the deadline
+    // from the recv_initial_metadata_ready callback.  So we inject our
+    // own callback into that hook.
+    if (op->recv_initial_metadata) {
+      calld->next_recv_initial_metadata_ready =
+          op->payload->recv_initial_metadata.recv_initial_metadata_ready;
+      calld->recv_initial_metadata =
+          op->payload->recv_initial_metadata.recv_initial_metadata;
+      GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
+                        recv_initial_metadata_ready, elem,
+                        grpc_schedule_on_exec_ctx);
+      op->payload->recv_initial_metadata.recv_initial_metadata_ready =
+          &calld->recv_initial_metadata_ready;
+    }
+    // Make sure we know when the call is complete, so that we can cancel
+    // the timer.
+    // Note that we trigger this on recv_trailing_metadata, even though
+    // the client never sends trailing metadata, because this is the
+    // hook that tells us when the call is complete on the server side.
+    if (op->recv_trailing_metadata) {
+      inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op);
+    }
+  }
+  // Chain to next filter.
+  grpc_call_next_op(elem, op);
+}
+
+const grpc_channel_filter grpc_client_deadline_filter = {
+    client_start_transport_stream_op_batch,
+    grpc_channel_next_op,
+    sizeof(base_call_data),
+    init_call_elem,
+    grpc_call_stack_ignore_set_pollset_or_pollset_set,
+    destroy_call_elem,
+    0,  // sizeof(channel_data)
+    init_channel_elem,
+    destroy_channel_elem,
+    grpc_channel_next_get_info,
+    "deadline",
+};
+
+const grpc_channel_filter grpc_server_deadline_filter = {
+    server_start_transport_stream_op_batch,
+    grpc_channel_next_op,
+    sizeof(server_call_data),
+    init_call_elem,
+    grpc_call_stack_ignore_set_pollset_or_pollset_set,
+    destroy_call_elem,
+    0,  // sizeof(channel_data)
+    init_channel_elem,
+    destroy_channel_elem,
+    grpc_channel_next_get_info,
+    "deadline",
+};
+
+bool grpc_deadline_checking_enabled(const grpc_channel_args* channel_args) {
+  return grpc_channel_arg_get_bool(
+      grpc_channel_args_find(channel_args, GRPC_ARG_ENABLE_DEADLINE_CHECKS),
+      !grpc_channel_args_want_minimal_stack(channel_args));
+}
+
+static bool maybe_add_deadline_filter(grpc_channel_stack_builder* builder,
+                                      void* arg) {
+  return grpc_deadline_checking_enabled(
+             grpc_channel_stack_builder_get_channel_arguments(builder))
+             ? grpc_channel_stack_builder_prepend_filter(
+                   builder, static_cast<const grpc_channel_filter*>(arg),
+                   nullptr, nullptr)
+             : true;
+}
+
+void grpc_deadline_filter_init(void) {
+  grpc_channel_init_register_stage(
+      GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+      maybe_add_deadline_filter, (void*)&grpc_client_deadline_filter);
+  grpc_channel_init_register_stage(
+      GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+      maybe_add_deadline_filter, (void*)&grpc_server_deadline_filter);
+}
+
+void grpc_deadline_filter_shutdown(void) {}