--- /dev/null
+/*
+ *
+ * Copyright 2015-2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/surface/server.h"
+
+#include <limits.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include <utility>
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/channelz.h"
+#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/debug/stats.h"
+#include "src/core/lib/gpr/mpscq.h"
+#include "src/core/lib/gpr/spinlock.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/iomgr/executor.h"
+#include "src/core/lib/iomgr/iomgr.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/surface/api_trace.h"
+#include "src/core/lib/surface/call.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/surface/init.h"
+#include "src/core/lib/transport/metadata.h"
+#include "src/core/lib/transport/static_metadata.h"
+
+grpc_core::TraceFlag grpc_server_channel_trace(false, "server_channel");
+
+static void server_on_recv_initial_metadata(void* ptr, grpc_error* error);
+static void server_recv_trailing_metadata_ready(void* user_data,
+ grpc_error* error);
+
+namespace {
+struct listener {
+ void* arg;
+ void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets,
+ size_t pollset_count);
+ void (*destroy)(grpc_server* server, void* arg, grpc_closure* closure);
+ struct listener* next;
+ intptr_t socket_uuid;
+ grpc_closure destroy_done;
+};
+
+enum requested_call_type { BATCH_CALL, REGISTERED_CALL };
+
+struct registered_method;
+
+struct requested_call {
+ gpr_mpscq_node request_link; /* must be first */
+ requested_call_type type;
+ size_t cq_idx;
+ void* tag;
+ grpc_server* server;
+ grpc_completion_queue* cq_bound_to_call;
+ grpc_call** call;
+ grpc_cq_completion completion;
+ grpc_metadata_array* initial_metadata;
+ union {
+ struct {
+ grpc_call_details* details;
+ } batch;
+ struct {
+ registered_method* method;
+ gpr_timespec* deadline;
+ grpc_byte_buffer** optional_payload;
+ } registered;
+ } data;
+};
+
+struct channel_registered_method {
+ registered_method* server_registered_method;
+ uint32_t flags;
+ bool has_host;
+ grpc_slice method;
+ grpc_slice host;
+};
+
+struct channel_data {
+ grpc_server* server;
+ grpc_connectivity_state connectivity_state;
+ grpc_channel* channel;
+ size_t cq_idx;
+ /* linked list of all channels on a server */
+ channel_data* next;
+ channel_data* prev;
+ channel_registered_method* registered_methods;
+ uint32_t registered_method_slots;
+ uint32_t registered_method_max_probes;
+ grpc_closure finish_destroy_channel_closure;
+ grpc_closure channel_connectivity_changed;
+ intptr_t channelz_socket_uuid;
+};
+
+typedef struct shutdown_tag {
+ void* tag;
+ grpc_completion_queue* cq;
+ grpc_cq_completion completion;
+} shutdown_tag;
+
+typedef enum {
+ /* waiting for metadata */
+ NOT_STARTED,
+ /* initial metadata read, not flow controlled in yet */
+ PENDING,
+ /* flow controlled in, on completion queue */
+ ACTIVATED,
+ /* cancelled before being queued */
+ ZOMBIED
+} call_state;
+
+typedef struct request_matcher request_matcher;
+
+struct call_data {
+ call_data(grpc_call_element* elem, const grpc_call_element_args& args)
+ : call(grpc_call_from_top_element(elem)),
+ call_combiner(args.call_combiner) {
+ GRPC_CLOSURE_INIT(&server_on_recv_initial_metadata,
+ ::server_on_recv_initial_metadata, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready,
+ server_recv_trailing_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
+ }
+ ~call_data() {
+ GPR_ASSERT(state != PENDING);
+ GRPC_ERROR_UNREF(recv_initial_metadata_error);
+ if (host_set) {
+ grpc_slice_unref_internal(host);
+ }
+ if (path_set) {
+ grpc_slice_unref_internal(path);
+ }
+ grpc_metadata_array_destroy(&initial_metadata);
+ grpc_byte_buffer_destroy(payload);
+ }
+
+ grpc_call* call;
+
+ gpr_atm state = NOT_STARTED;
+
+ bool path_set = false;
+ bool host_set = false;
+ grpc_slice path;
+ grpc_slice host;
+ grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
+
+ grpc_completion_queue* cq_new = nullptr;
+
+ grpc_metadata_batch* recv_initial_metadata = nullptr;
+ uint32_t recv_initial_metadata_flags = 0;
+ grpc_metadata_array initial_metadata =
+ grpc_metadata_array(); // Zero-initialize the C struct.
+
+ request_matcher* matcher = nullptr;
+ grpc_byte_buffer* payload = nullptr;
+
+ grpc_closure got_initial_metadata;
+ grpc_closure server_on_recv_initial_metadata;
+ grpc_closure kill_zombie_closure;
+ grpc_closure* on_done_recv_initial_metadata;
+ grpc_closure recv_trailing_metadata_ready;
+ grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
+ grpc_closure* original_recv_trailing_metadata_ready;
+ grpc_error* recv_trailing_metadata_error = GRPC_ERROR_NONE;
+ bool seen_recv_trailing_metadata_ready = false;
+
+ grpc_closure publish;
+
+ call_data* pending_next = nullptr;
+ grpc_core::CallCombiner* call_combiner;
+};
+
+struct request_matcher {
+ grpc_server* server;
+ call_data* pending_head;
+ call_data* pending_tail;
+ gpr_locked_mpscq* requests_per_cq;
+};
+
+struct registered_method {
+ char* method;
+ char* host;
+ grpc_server_register_method_payload_handling payload_handling;
+ uint32_t flags;
+ /* one request matcher per method */
+ request_matcher matcher;
+ registered_method* next;
+};
+
+typedef struct {
+ grpc_channel** channels;
+ size_t num_channels;
+} channel_broadcaster;
+} // namespace
+
+struct grpc_server {
+ grpc_channel_args* channel_args;
+
+ grpc_resource_user* default_resource_user;
+
+ grpc_completion_queue** cqs;
+ grpc_pollset** pollsets;
+ size_t cq_count;
+ size_t pollset_count;
+ bool started;
+
+ /* The two following mutexes control access to server-state
+ mu_global controls access to non-call-related state (e.g., channel state)
+ mu_call controls access to call-related state (e.g., the call lists)
+
+ If they are ever required to be nested, you must lock mu_global
+ before mu_call. This is currently used in shutdown processing
+ (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
+ gpr_mu mu_global; /* mutex for server and channel state */
+ gpr_mu mu_call; /* mutex for call-specific state */
+
+ /* startup synchronization: flag is protected by mu_global, signals whether
+ we are doing the listener start routine or not */
+ bool starting;
+ gpr_cv starting_cv;
+
+ registered_method* registered_methods;
+ /** one request matcher for unregistered methods */
+ request_matcher unregistered_request_matcher;
+
+ gpr_atm shutdown_flag;
+ uint8_t shutdown_published;
+ size_t num_shutdown_tags;
+ shutdown_tag* shutdown_tags;
+
+ channel_data root_channel_data;
+
+ listener* listeners;
+ int listeners_destroyed;
+ grpc_core::RefCount internal_refcount;
+
+ /** when did we print the last shutdown progress message */
+ gpr_timespec last_shutdown_message_time;
+
+ grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server;
+};
+
+#define SERVER_FROM_CALL_ELEM(elem) \
+ (((channel_data*)(elem)->channel_data)->server)
+
+static void publish_new_rpc(void* calld, grpc_error* error);
+static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
+ grpc_error* error);
+/* Before calling maybe_finish_shutdown, we must hold mu_global and not
+ hold mu_call */
+static void maybe_finish_shutdown(grpc_server* server);
+
+/*
+ * channel broadcaster
+ */
+
+/* assumes server locked */
+static void channel_broadcaster_init(grpc_server* s, channel_broadcaster* cb) {
+ channel_data* c;
+ size_t count = 0;
+ for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
+ count++;
+ }
+ cb->num_channels = count;
+ cb->channels = static_cast<grpc_channel**>(
+ gpr_malloc(sizeof(*cb->channels) * cb->num_channels));
+ count = 0;
+ for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
+ cb->channels[count++] = c->channel;
+ GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
+ }
+}
+
+struct shutdown_cleanup_args {
+ grpc_closure closure;
+ grpc_slice slice;
+};
+
+static void shutdown_cleanup(void* arg, grpc_error* error) {
+ struct shutdown_cleanup_args* a =
+ static_cast<struct shutdown_cleanup_args*>(arg);
+ grpc_slice_unref_internal(a->slice);
+ gpr_free(a);
+}
+
+static void send_shutdown(grpc_channel* channel, bool send_goaway,
+ grpc_error* send_disconnect) {
+ struct shutdown_cleanup_args* sc =
+ static_cast<struct shutdown_cleanup_args*>(gpr_malloc(sizeof(*sc)));
+ GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc,
+ grpc_schedule_on_exec_ctx);
+ grpc_transport_op* op = grpc_make_transport_op(&sc->closure);
+ grpc_channel_element* elem;
+
+ op->goaway_error =
+ send_goaway ? grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK)
+ : GRPC_ERROR_NONE;
+ op->set_accept_stream = true;
+ sc->slice = grpc_slice_from_copied_string("Server shutdown");
+ op->disconnect_with_error = send_disconnect;
+
+ elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
+ elem->filter->start_transport_op(elem, op);
+}
+
+static void channel_broadcaster_shutdown(channel_broadcaster* cb,
+ bool send_goaway,
+ grpc_error* force_disconnect) {
+ size_t i;
+
+ for (i = 0; i < cb->num_channels; i++) {
+ send_shutdown(cb->channels[i], send_goaway,
+ GRPC_ERROR_REF(force_disconnect));
+ GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
+ }
+ gpr_free(cb->channels);
+ GRPC_ERROR_UNREF(force_disconnect);
+}
+
+/*
+ * request_matcher
+ */
+
+static void request_matcher_init(request_matcher* rm, grpc_server* server) {
+ rm->server = server;
+ rm->pending_head = rm->pending_tail = nullptr;
+ rm->requests_per_cq = static_cast<gpr_locked_mpscq*>(
+ gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count));
+ for (size_t i = 0; i < server->cq_count; i++) {
+ gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
+ }
+}
+
+static void request_matcher_destroy(request_matcher* rm) {
+ for (size_t i = 0; i < rm->server->cq_count; i++) {
+ GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == nullptr);
+ gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
+ }
+ gpr_free(rm->requests_per_cq);
+}
+
+static void kill_zombie(void* elem, grpc_error* error) {
+ grpc_call_unref(
+ grpc_call_from_top_element(static_cast<grpc_call_element*>(elem)));
+}
+
+static void request_matcher_zombify_all_pending_calls(request_matcher* rm) {
+ while (rm->pending_head) {
+ call_data* calld = rm->pending_head;
+ rm->pending_head = calld->pending_next;
+ gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
+ GRPC_CLOSURE_INIT(
+ &calld->kill_zombie_closure, kill_zombie,
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
+ }
+}
+
+static void request_matcher_kill_requests(grpc_server* server,
+ request_matcher* rm,
+ grpc_error* error) {
+ requested_call* rc;
+ for (size_t i = 0; i < server->cq_count; i++) {
+ while ((rc = reinterpret_cast<requested_call*>(
+ gpr_locked_mpscq_pop(&rm->requests_per_cq[i]))) != nullptr) {
+ fail_call(server, i, rc, GRPC_ERROR_REF(error));
+ }
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+/*
+ * server proper
+ */
+
+static void server_ref(grpc_server* server) { server->internal_refcount.Ref(); }
+
+static void server_delete(grpc_server* server) {
+ registered_method* rm;
+ size_t i;
+ server->channelz_server.reset();
+ grpc_channel_args_destroy(server->channel_args);
+ gpr_mu_destroy(&server->mu_global);
+ gpr_mu_destroy(&server->mu_call);
+ gpr_cv_destroy(&server->starting_cv);
+ while ((rm = server->registered_methods) != nullptr) {
+ server->registered_methods = rm->next;
+ if (server->started) {
+ request_matcher_destroy(&rm->matcher);
+ }
+ gpr_free(rm->method);
+ gpr_free(rm->host);
+ gpr_free(rm);
+ }
+ if (server->started) {
+ request_matcher_destroy(&server->unregistered_request_matcher);
+ }
+ for (i = 0; i < server->cq_count; i++) {
+ GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
+ }
+ gpr_free(server->cqs);
+ gpr_free(server->pollsets);
+ gpr_free(server->shutdown_tags);
+ gpr_free(server);
+}
+
+static void server_unref(grpc_server* server) {
+ if (GPR_UNLIKELY(server->internal_refcount.Unref())) {
+ server_delete(server);
+ }
+}
+
+static int is_channel_orphaned(channel_data* chand) {
+ return chand->next == chand;
+}
+
+static void orphan_channel(channel_data* chand) {
+ chand->next->prev = chand->prev;
+ chand->prev->next = chand->next;
+ chand->next = chand->prev = chand;
+}
+
+static void finish_destroy_channel(void* cd, grpc_error* error) {
+ channel_data* chand = static_cast<channel_data*>(cd);
+ grpc_server* server = chand->server;
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
+ server_unref(server);
+}
+
+static void destroy_channel(channel_data* chand, grpc_error* error) {
+ if (is_channel_orphaned(chand)) return;
+ GPR_ASSERT(chand->server != nullptr);
+ orphan_channel(chand);
+ server_ref(chand->server);
+ maybe_finish_shutdown(chand->server);
+ GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure,
+ finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);
+
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_server_channel_trace) &&
+ error != GRPC_ERROR_NONE) {
+ const char* msg = grpc_error_string(error);
+ gpr_log(GPR_INFO, "Disconnected client: %s", msg);
+ }
+ GRPC_ERROR_UNREF(error);
+
+ grpc_transport_op* op =
+ grpc_make_transport_op(&chand->finish_destroy_channel_closure);
+ op->set_accept_stream = true;
+ grpc_channel_next_op(grpc_channel_stack_element(
+ grpc_channel_get_channel_stack(chand->channel), 0),
+ op);
+}
+
+static void done_request_event(void* req, grpc_cq_completion* c) {
+ gpr_free(req);
+}
+
+static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
+ requested_call* rc) {
+ grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
+ grpc_call* call = calld->call;
+ *rc->call = call;
+ calld->cq_new = server->cqs[cq_idx];
+ GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
+ switch (rc->type) {
+ case BATCH_CALL:
+ GPR_ASSERT(calld->host_set);
+ GPR_ASSERT(calld->path_set);
+ rc->data.batch.details->host = grpc_slice_ref_internal(calld->host);
+ rc->data.batch.details->method = grpc_slice_ref_internal(calld->path);
+ rc->data.batch.details->deadline =
+ grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
+ rc->data.batch.details->flags = calld->recv_initial_metadata_flags;
+ break;
+ case REGISTERED_CALL:
+ *rc->data.registered.deadline =
+ grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
+ if (rc->data.registered.optional_payload) {
+ *rc->data.registered.optional_payload = calld->payload;
+ calld->payload = nullptr;
+ }
+ break;
+ default:
+ GPR_UNREACHABLE_CODE(return );
+ }
+
+ grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event,
+ rc, &rc->completion, true);
+}
+
+static void publish_new_rpc(void* arg, grpc_error* error) {
+ grpc_call_element* call_elem = static_cast<grpc_call_element*>(arg);
+ call_data* calld = static_cast<call_data*>(call_elem->call_data);
+ channel_data* chand = static_cast<channel_data*>(call_elem->channel_data);
+ request_matcher* rm = calld->matcher;
+ grpc_server* server = rm->server;
+
+ if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) {
+ gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
+ GRPC_CLOSURE_INIT(
+ &calld->kill_zombie_closure, kill_zombie,
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_REF(error));
+ return;
+ }
+
+ for (size_t i = 0; i < server->cq_count; i++) {
+ size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
+ requested_call* rc = reinterpret_cast<requested_call*>(
+ gpr_locked_mpscq_try_pop(&rm->requests_per_cq[cq_idx]));
+ if (rc == nullptr) {
+ continue;
+ } else {
+ GRPC_STATS_INC_SERVER_CQS_CHECKED(i);
+ gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
+ publish_call(server, calld, cq_idx, rc);
+ return; /* early out */
+ }
+ }
+
+ /* no cq to take the request found: queue it on the slow list */
+ GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED();
+ gpr_mu_lock(&server->mu_call);
+
+ // We need to ensure that all the queues are empty. We do this under
+ // the server mu_call lock to ensure that if something is added to
+ // an empty request queue, it will block until the call is actually
+ // added to the pending list.
+ for (size_t i = 0; i < server->cq_count; i++) {
+ size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
+ requested_call* rc = reinterpret_cast<requested_call*>(
+ gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
+ if (rc == nullptr) {
+ continue;
+ } else {
+ gpr_mu_unlock(&server->mu_call);
+ GRPC_STATS_INC_SERVER_CQS_CHECKED(i + server->cq_count);
+ gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
+ publish_call(server, calld, cq_idx, rc);
+ return; /* early out */
+ }
+ }
+
+ gpr_atm_no_barrier_store(&calld->state, PENDING);
+ if (rm->pending_head == nullptr) {
+ rm->pending_tail = rm->pending_head = calld;
+ } else {
+ rm->pending_tail->pending_next = calld;
+ rm->pending_tail = calld;
+ }
+ calld->pending_next = nullptr;
+ gpr_mu_unlock(&server->mu_call);
+}
+
+static void finish_start_new_rpc(
+ grpc_server* server, grpc_call_element* elem, request_matcher* rm,
+ grpc_server_register_method_payload_handling payload_handling) {
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+
+ if (gpr_atm_acq_load(&server->shutdown_flag)) {
+ gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
+ GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
+ return;
+ }
+
+ calld->matcher = rm;
+
+ switch (payload_handling) {
+ case GRPC_SRM_PAYLOAD_NONE:
+ publish_new_rpc(elem, GRPC_ERROR_NONE);
+ break;
+ case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
+ grpc_op op;
+ op.op = GRPC_OP_RECV_MESSAGE;
+ op.flags = 0;
+ op.reserved = nullptr;
+ op.data.recv_message.recv_message = &calld->payload;
+ GRPC_CLOSURE_INIT(&calld->publish, publish_new_rpc, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_call_start_batch_and_execute(calld->call, &op, 1, &calld->publish);
+ break;
+ }
+ }
+}
+
+static void start_new_rpc(grpc_call_element* elem) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ grpc_server* server = chand->server;
+ uint32_t i;
+ uint32_t hash;
+ channel_registered_method* rm;
+
+ if (chand->registered_methods && calld->path_set && calld->host_set) {
+ /* TODO(ctiller): unify these two searches */
+ /* check for an exact match with host */
+ hash = GRPC_MDSTR_KV_HASH(grpc_slice_hash_internal(calld->host),
+ grpc_slice_hash_internal(calld->path));
+ for (i = 0; i <= chand->registered_method_max_probes; i++) {
+ rm = &chand->registered_methods[(hash + i) %
+ chand->registered_method_slots];
+ if (rm->server_registered_method == nullptr) break;
+ if (!rm->has_host) continue;
+ if (!grpc_slice_eq(rm->host, calld->host)) continue;
+ if (!grpc_slice_eq(rm->method, calld->path)) continue;
+ if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
+ 0 == (calld->recv_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
+ continue;
+ }
+ finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher,
+ rm->server_registered_method->payload_handling);
+ return;
+ }
+ /* check for a wildcard method definition (no host set) */
+ hash = GRPC_MDSTR_KV_HASH(0, grpc_slice_hash_internal(calld->path));
+ for (i = 0; i <= chand->registered_method_max_probes; i++) {
+ rm = &chand->registered_methods[(hash + i) %
+ chand->registered_method_slots];
+ if (rm->server_registered_method == nullptr) break;
+ if (rm->has_host) continue;
+ if (!grpc_slice_eq(rm->method, calld->path)) continue;
+ if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
+ 0 == (calld->recv_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
+ continue;
+ }
+ finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher,
+ rm->server_registered_method->payload_handling);
+ return;
+ }
+ }
+ finish_start_new_rpc(server, elem, &server->unregistered_request_matcher,
+ GRPC_SRM_PAYLOAD_NONE);
+}
+
+static int num_listeners(grpc_server* server) {
+ listener* l;
+ int n = 0;
+ for (l = server->listeners; l; l = l->next) {
+ n++;
+ }
+ return n;
+}
+
+static void done_shutdown_event(void* server, grpc_cq_completion* completion) {
+ server_unref(static_cast<grpc_server*>(server));
+}
+
+static int num_channels(grpc_server* server) {
+ channel_data* chand;
+ int n = 0;
+ for (chand = server->root_channel_data.next;
+ chand != &server->root_channel_data; chand = chand->next) {
+ n++;
+ }
+ return n;
+}
+
+static void kill_pending_work_locked(grpc_server* server, grpc_error* error) {
+ if (server->started) {
+ request_matcher_kill_requests(server, &server->unregistered_request_matcher,
+ GRPC_ERROR_REF(error));
+ request_matcher_zombify_all_pending_calls(
+ &server->unregistered_request_matcher);
+ for (registered_method* rm = server->registered_methods; rm;
+ rm = rm->next) {
+ request_matcher_kill_requests(server, &rm->matcher,
+ GRPC_ERROR_REF(error));
+ request_matcher_zombify_all_pending_calls(&rm->matcher);
+ }
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+static void maybe_finish_shutdown(grpc_server* server) {
+ size_t i;
+ if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
+ return;
+ }
+
+ gpr_mu_lock(&server->mu_call);
+ kill_pending_work_locked(
+ server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
+ gpr_mu_unlock(&server->mu_call);
+
+ if (server->root_channel_data.next != &server->root_channel_data ||
+ server->listeners_destroyed < num_listeners(server)) {
+ if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
+ server->last_shutdown_message_time),
+ gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
+ server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
+ gpr_log(GPR_DEBUG,
+ "Waiting for %d channels and %d/%d listeners to be destroyed"
+ " before shutting down server",
+ num_channels(server),
+ num_listeners(server) - server->listeners_destroyed,
+ num_listeners(server));
+ }
+ return;
+ }
+ server->shutdown_published = 1;
+ for (i = 0; i < server->num_shutdown_tags; i++) {
+ server_ref(server);
+ grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
+ GRPC_ERROR_NONE, done_shutdown_event, server,
+ &server->shutdown_tags[i].completion);
+ }
+}
+
+static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ grpc_millis op_deadline;
+
+ if (error == GRPC_ERROR_NONE) {
+ GPR_DEBUG_ASSERT(calld->recv_initial_metadata->idx.named.path != nullptr);
+ GPR_DEBUG_ASSERT(calld->recv_initial_metadata->idx.named.authority !=
+ nullptr);
+ calld->path = grpc_slice_ref_internal(
+ GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.path->md));
+ calld->host = grpc_slice_ref_internal(
+ GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.authority->md));
+ calld->path_set = true;
+ calld->host_set = true;
+ grpc_metadata_batch_remove(calld->recv_initial_metadata, GRPC_BATCH_PATH);
+ grpc_metadata_batch_remove(calld->recv_initial_metadata,
+ GRPC_BATCH_AUTHORITY);
+ } else {
+ GRPC_ERROR_REF(error);
+ }
+ op_deadline = calld->recv_initial_metadata->deadline;
+ if (op_deadline != GRPC_MILLIS_INF_FUTURE) {
+ calld->deadline = op_deadline;
+ }
+ if (calld->host_set && calld->path_set) {
+ /* do nothing */
+ } else {
+ /* Pass the error reference to calld->recv_initial_metadata_error */
+ grpc_error* src_error = error;
+ error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Missing :authority or :path", &src_error, 1);
+ GRPC_ERROR_UNREF(src_error);
+ calld->recv_initial_metadata_error = GRPC_ERROR_REF(error);
+ }
+ grpc_closure* closure = calld->on_done_recv_initial_metadata;
+ calld->on_done_recv_initial_metadata = nullptr;
+ if (calld->seen_recv_trailing_metadata_ready) {
+ GRPC_CALL_COMBINER_START(calld->call_combiner,
+ &calld->recv_trailing_metadata_ready,
+ calld->recv_trailing_metadata_error,
+ "continue server_recv_trailing_metadata_ready");
+ }
+ GRPC_CLOSURE_RUN(closure, error);
+}
+
+static void server_recv_trailing_metadata_ready(void* user_data,
+ grpc_error* error) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (calld->on_done_recv_initial_metadata != nullptr) {
+ calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error);
+ calld->seen_recv_trailing_metadata_ready = true;
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
+ server_recv_trailing_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner,
+ "deferring server_recv_trailing_metadata_ready "
+ "until after server_on_recv_initial_metadata");
+ return;
+ }
+ error =
+ grpc_error_add_child(GRPC_ERROR_REF(error),
+ GRPC_ERROR_REF(calld->recv_initial_metadata_error));
+ GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
+}
+
+static void server_mutate_op(grpc_call_element* elem,
+ grpc_transport_stream_op_batch* op) {
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+
+ if (op->recv_initial_metadata) {
+ GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == nullptr);
+ calld->recv_initial_metadata =
+ op->payload->recv_initial_metadata.recv_initial_metadata;
+ calld->on_done_recv_initial_metadata =
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->server_on_recv_initial_metadata;
+ op->payload->recv_initial_metadata.recv_flags =
+ &calld->recv_initial_metadata_flags;
+ }
+ if (op->recv_trailing_metadata) {
+ calld->original_recv_trailing_metadata_ready =
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &calld->recv_trailing_metadata_ready;
+ }
+}
+
+static void server_start_transport_stream_op_batch(
+ grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
+ server_mutate_op(elem, op);
+ grpc_call_next_op(elem, op);
+}
+
+static void got_initial_metadata(void* ptr, grpc_error* error) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (error == GRPC_ERROR_NONE) {
+ start_new_rpc(elem);
+ } else {
+ if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) {
+ GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
+ } else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) {
+ /* zombied call will be destroyed when it's removed from the pending
+ queue... later */
+ }
+ }
+}
+
+static void accept_stream(void* cd, grpc_transport* transport,
+ const void* transport_server_data) {
+ channel_data* chand = static_cast<channel_data*>(cd);
+ /* create a call */
+ grpc_call_create_args args;
+ args.channel = chand->channel;
+ args.server = chand->server;
+ args.parent = nullptr;
+ args.propagation_mask = 0;
+ args.cq = nullptr;
+ args.pollset_set_alternative = nullptr;
+ args.server_transport_data = transport_server_data;
+ args.add_initial_metadata = nullptr;
+ args.add_initial_metadata_count = 0;
+ args.send_deadline = GRPC_MILLIS_INF_FUTURE;
+ grpc_call* call;
+ grpc_error* error = grpc_call_create(&args, &call);
+ grpc_call_element* elem =
+ grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+ if (error != GRPC_ERROR_NONE) {
+ got_initial_metadata(elem, error);
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ grpc_op op;
+ op.op = GRPC_OP_RECV_INITIAL_METADATA;
+ op.flags = 0;
+ op.reserved = nullptr;
+ op.data.recv_initial_metadata.recv_initial_metadata =
+ &calld->initial_metadata;
+ GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata);
+}
+
+static void channel_connectivity_changed(void* cd, grpc_error* error) {
+ channel_data* chand = static_cast<channel_data*>(cd);
+ grpc_server* server = chand->server;
+ if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
+ grpc_transport_op* op = grpc_make_transport_op(nullptr);
+ op->on_connectivity_state_change = &chand->channel_connectivity_changed;
+ op->connectivity_state = &chand->connectivity_state;
+ grpc_channel_next_op(grpc_channel_stack_element(
+ grpc_channel_get_channel_stack(chand->channel), 0),
+ op);
+ } else {
+ gpr_mu_lock(&server->mu_global);
+ destroy_channel(chand, GRPC_ERROR_REF(error));
+ gpr_mu_unlock(&server->mu_global);
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
+ }
+}
+
+static grpc_error* init_call_elem(grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ server_ref(chand->server);
+ new (elem->call_data) call_data(elem, *args);
+ return GRPC_ERROR_NONE;
+}
+
+static void destroy_call_elem(grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* ignored) {
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ calld->~call_data();
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ server_unref(chand->server);
+}
+
+static grpc_error* init_channel_elem(grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ GPR_ASSERT(args->is_first);
+ GPR_ASSERT(!args->is_last);
+ chand->server = nullptr;
+ chand->channel = nullptr;
+ chand->next = chand->prev = chand;
+ chand->registered_methods = nullptr;
+ chand->connectivity_state = GRPC_CHANNEL_IDLE;
+ GRPC_CLOSURE_INIT(&chand->channel_connectivity_changed,
+ channel_connectivity_changed, chand,
+ grpc_schedule_on_exec_ctx);
+ return GRPC_ERROR_NONE;
+}
+
+static void destroy_channel_elem(grpc_channel_element* elem) {
+ size_t i;
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ if (chand->registered_methods) {
+ for (i = 0; i < chand->registered_method_slots; i++) {
+ grpc_slice_unref_internal(chand->registered_methods[i].method);
+ if (chand->registered_methods[i].has_host) {
+ grpc_slice_unref_internal(chand->registered_methods[i].host);
+ }
+ }
+ gpr_free(chand->registered_methods);
+ }
+ if (chand->server) {
+ if (chand->server->channelz_server != nullptr &&
+ chand->channelz_socket_uuid != 0) {
+ chand->server->channelz_server->RemoveChildSocket(
+ chand->channelz_socket_uuid);
+ }
+ gpr_mu_lock(&chand->server->mu_global);
+ chand->next->prev = chand->prev;
+ chand->prev->next = chand->next;
+ chand->next = chand->prev = chand;
+ maybe_finish_shutdown(chand->server);
+ gpr_mu_unlock(&chand->server->mu_global);
+ server_unref(chand->server);
+ }
+}
+
+const grpc_channel_filter grpc_server_top_filter = {
+ server_start_transport_stream_op_batch,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_channel_next_get_info,
+ "server",
+};
+
+static void register_completion_queue(grpc_server* server,
+ grpc_completion_queue* cq,
+ void* reserved) {
+ size_t i, n;
+ GPR_ASSERT(!reserved);
+ for (i = 0; i < server->cq_count; i++) {
+ if (server->cqs[i] == cq) return;
+ }
+
+ GRPC_CQ_INTERNAL_REF(cq, "server");
+ n = server->cq_count++;
+ server->cqs = static_cast<grpc_completion_queue**>(gpr_realloc(
+ server->cqs, server->cq_count * sizeof(grpc_completion_queue*)));
+ server->cqs[n] = cq;
+}
+
+void grpc_server_register_completion_queue(grpc_server* server,
+ grpc_completion_queue* cq,
+ void* reserved) {
+ GRPC_API_TRACE(
+ "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
+ (server, cq, reserved));
+
+ auto cq_type = grpc_get_cq_completion_type(cq);
+ if (cq_type != GRPC_CQ_NEXT && cq_type != GRPC_CQ_CALLBACK) {
+ gpr_log(GPR_INFO,
+ "Completion queue of type %d is being registered as a "
+ "server-completion-queue",
+ static_cast<int>(cq_type));
+ /* Ideally we should log an error and abort but ruby-wrapped-language API
+ calls grpc_completion_queue_pluck() on server completion queues */
+ }
+
+ register_completion_queue(server, cq, reserved);
+}
+
+grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
+ grpc_core::ExecCtx exec_ctx;
+ GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
+
+ grpc_server* server =
+ static_cast<grpc_server*>(gpr_zalloc(sizeof(grpc_server)));
+
+ gpr_mu_init(&server->mu_global);
+ gpr_mu_init(&server->mu_call);
+ gpr_cv_init(&server->starting_cv);
+
+ /* decremented by grpc_server_destroy */
+ new (&server->internal_refcount) grpc_core::RefCount();
+ server->root_channel_data.next = server->root_channel_data.prev =
+ &server->root_channel_data;
+
+ server->channel_args = grpc_channel_args_copy(args);
+
+ const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ);
+ if (grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT)) {
+ arg = grpc_channel_args_find(
+ args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE);
+ size_t channel_tracer_max_memory = grpc_channel_arg_get_integer(
+ arg,
+ {GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX});
+ server->channelz_server =
+ grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>(
+ server, channel_tracer_max_memory);
+ server->channelz_server->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string("Server created"));
+ }
+
+ if (args != nullptr) {
+ grpc_resource_quota* resource_quota =
+ grpc_resource_quota_from_channel_args(args, false /* create */);
+ if (resource_quota != nullptr) {
+ server->default_resource_user =
+ grpc_resource_user_create(resource_quota, "default");
+ }
+ }
+
+ return server;
+}
+
+static int streq(const char* a, const char* b) {
+ if (a == nullptr && b == nullptr) return 1;
+ if (a == nullptr) return 0;
+ if (b == nullptr) return 0;
+ return 0 == strcmp(a, b);
+}
+
+void* grpc_server_register_method(
+ grpc_server* server, const char* method, const char* host,
+ grpc_server_register_method_payload_handling payload_handling,
+ uint32_t flags) {
+ registered_method* m;
+ GRPC_API_TRACE(
+ "grpc_server_register_method(server=%p, method=%s, host=%s, "
+ "flags=0x%08x)",
+ 4, (server, method, host, flags));
+ if (!method) {
+ gpr_log(GPR_ERROR,
+ "grpc_server_register_method method string cannot be NULL");
+ return nullptr;
+ }
+ for (m = server->registered_methods; m; m = m->next) {
+ if (streq(m->method, method) && streq(m->host, host)) {
+ gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
+ host ? host : "*");
+ return nullptr;
+ }
+ }
+ if ((flags & ~GRPC_INITIAL_METADATA_USED_MASK) != 0) {
+ gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x",
+ flags);
+ return nullptr;
+ }
+ m = static_cast<registered_method*>(gpr_zalloc(sizeof(registered_method)));
+ m->method = gpr_strdup(method);
+ m->host = gpr_strdup(host);
+ m->next = server->registered_methods;
+ m->payload_handling = payload_handling;
+ m->flags = flags;
+ server->registered_methods = m;
+ return m;
+}
+
+void grpc_server_start(grpc_server* server) {
+ size_t i;
+ grpc_core::ExecCtx exec_ctx;
+
+ GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
+
+ server->started = true;
+ server->pollset_count = 0;
+ server->pollsets = static_cast<grpc_pollset**>(
+ gpr_malloc(sizeof(grpc_pollset*) * server->cq_count));
+ for (i = 0; i < server->cq_count; i++) {
+ if (grpc_cq_can_listen(server->cqs[i])) {
+ server->pollsets[server->pollset_count++] =
+ grpc_cq_pollset(server->cqs[i]);
+ }
+ }
+ request_matcher_init(&server->unregistered_request_matcher, server);
+ for (registered_method* rm = server->registered_methods; rm; rm = rm->next) {
+ request_matcher_init(&rm->matcher, server);
+ }
+
+ gpr_mu_lock(&server->mu_global);
+ server->starting = true;
+ gpr_mu_unlock(&server->mu_global);
+
+ for (listener* l = server->listeners; l; l = l->next) {
+ l->start(server, l->arg, server->pollsets, server->pollset_count);
+ }
+
+ gpr_mu_lock(&server->mu_global);
+ server->starting = false;
+ gpr_cv_signal(&server->starting_cv);
+ gpr_mu_unlock(&server->mu_global);
+}
+
+void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
+ size_t* pollset_count) {
+ *pollset_count = server->pollset_count;
+ *pollsets = server->pollsets;
+}
+
+void grpc_server_setup_transport(
+ grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset,
+ const grpc_channel_args* args,
+ const grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>&
+ socket_node,
+ grpc_resource_user* resource_user) {
+ size_t num_registered_methods;
+ size_t alloc;
+ registered_method* rm;
+ channel_registered_method* crm;
+ grpc_channel* channel;
+ channel_data* chand;
+ uint32_t hash;
+ size_t slots;
+ uint32_t probes;
+ uint32_t max_probes = 0;
+ grpc_transport_op* op = nullptr;
+
+ channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport,
+ resource_user);
+ chand = static_cast<channel_data*>(
+ grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0)
+ ->channel_data);
+ chand->server = s;
+ server_ref(s);
+ chand->channel = channel;
+ if (socket_node != nullptr) {
+ chand->channelz_socket_uuid = socket_node->uuid();
+ s->channelz_server->AddChildSocket(socket_node);
+ } else {
+ chand->channelz_socket_uuid = 0;
+ }
+
+ size_t cq_idx;
+ for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
+ if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break;
+ }
+ if (cq_idx == s->cq_count) {
+ /* completion queue not found: pick a random one to publish new calls to */
+ cq_idx = static_cast<size_t>(rand()) % s->cq_count;
+ }
+ chand->cq_idx = cq_idx;
+
+ num_registered_methods = 0;
+ for (rm = s->registered_methods; rm; rm = rm->next) {
+ num_registered_methods++;
+ }
+ /* build a lookup table phrased in terms of mdstr's in this channels context
+ to quickly find registered methods */
+ if (num_registered_methods > 0) {
+ slots = 2 * num_registered_methods;
+ alloc = sizeof(channel_registered_method) * slots;
+ chand->registered_methods =
+ static_cast<channel_registered_method*>(gpr_zalloc(alloc));
+ for (rm = s->registered_methods; rm; rm = rm->next) {
+ grpc_slice host;
+ bool has_host;
+ grpc_slice method;
+ if (rm->host != nullptr) {
+ host = grpc_slice_from_static_string(rm->host);
+ has_host = true;
+ } else {
+ has_host = false;
+ }
+ method = grpc_slice_from_static_string(rm->method);
+ hash = GRPC_MDSTR_KV_HASH(has_host ? grpc_slice_hash_internal(host) : 0,
+ grpc_slice_hash_internal(method));
+ for (probes = 0; chand->registered_methods[(hash + probes) % slots]
+ .server_registered_method != nullptr;
+ probes++)
+ ;
+ if (probes > max_probes) max_probes = probes;
+ crm = &chand->registered_methods[(hash + probes) % slots];
+ crm->server_registered_method = rm;
+ crm->flags = rm->flags;
+ crm->has_host = has_host;
+ if (has_host) {
+ crm->host = host;
+ }
+ crm->method = method;
+ }
+ GPR_ASSERT(slots <= UINT32_MAX);
+ chand->registered_method_slots = static_cast<uint32_t>(slots);
+ chand->registered_method_max_probes = max_probes;
+ }
+
+ gpr_mu_lock(&s->mu_global);
+ chand->next = &s->root_channel_data;
+ chand->prev = chand->next->prev;
+ chand->next->prev = chand->prev->next = chand;
+ gpr_mu_unlock(&s->mu_global);
+
+ GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
+ op = grpc_make_transport_op(nullptr);
+ op->set_accept_stream = true;
+ op->set_accept_stream_fn = accept_stream;
+ op->set_accept_stream_user_data = chand;
+ op->on_connectivity_state_change = &chand->channel_connectivity_changed;
+ op->connectivity_state = &chand->connectivity_state;
+ if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {
+ op->disconnect_with_error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown");
+ }
+ grpc_transport_perform_op(transport, op);
+}
+
+void done_published_shutdown(void* done_arg, grpc_cq_completion* storage) {
+ (void)done_arg;
+ gpr_free(storage);
+}
+
+static void listener_destroy_done(void* s, grpc_error* error) {
+ grpc_server* server = static_cast<grpc_server*>(s);
+ gpr_mu_lock(&server->mu_global);
+ server->listeners_destroyed++;
+ maybe_finish_shutdown(server);
+ gpr_mu_unlock(&server->mu_global);
+}
+
+/*
+ - Kills all pending requests-for-incoming-RPC-calls (i.e the requests made via
+ grpc_server_request_call and grpc_server_request_registered call will now be
+ cancelled). See 'kill_pending_work_locked()'
+
+ - Shuts down the listeners (i.e the server will no longer listen on the port
+ for new incoming channels).
+
+ - Iterates through all channels on the server and sends shutdown msg (see
+ 'channel_broadcaster_shutdown()' for details) to the clients via the
+ transport layer. The transport layer then guarantees the following:
+ -- Sends shutdown to the client (for eg: HTTP2 transport sends GOAWAY)
+ -- If the server has outstanding calls that are in the process, the
+ connection is NOT closed until the server is done with all those calls
+ -- Once, there are no more calls in progress, the channel is closed
+ */
+void grpc_server_shutdown_and_notify(grpc_server* server,
+ grpc_completion_queue* cq, void* tag) {
+ listener* l;
+ shutdown_tag* sdt;
+ channel_broadcaster broadcaster;
+ grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
+ grpc_core::ExecCtx exec_ctx;
+
+ GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
+ (server, cq, tag));
+
+ /* wait for startup to be finished: locks mu_global */
+ gpr_mu_lock(&server->mu_global);
+ while (server->starting) {
+ gpr_cv_wait(&server->starting_cv, &server->mu_global,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ }
+
+ /* stay locked, and gather up some stuff to do */
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
+ if (server->shutdown_published) {
+ grpc_cq_end_op(cq, tag, GRPC_ERROR_NONE, done_published_shutdown, nullptr,
+ static_cast<grpc_cq_completion*>(
+ gpr_malloc(sizeof(grpc_cq_completion))));
+ gpr_mu_unlock(&server->mu_global);
+ return;
+ }
+ server->shutdown_tags = static_cast<shutdown_tag*>(
+ gpr_realloc(server->shutdown_tags,
+ sizeof(shutdown_tag) * (server->num_shutdown_tags + 1)));
+ sdt = &server->shutdown_tags[server->num_shutdown_tags++];
+ sdt->tag = tag;
+ sdt->cq = cq;
+ if (gpr_atm_acq_load(&server->shutdown_flag)) {
+ gpr_mu_unlock(&server->mu_global);
+ return;
+ }
+
+ server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
+
+ channel_broadcaster_init(server, &broadcaster);
+
+ gpr_atm_rel_store(&server->shutdown_flag, 1);
+
+ /* collect all unregistered then registered calls */
+ gpr_mu_lock(&server->mu_call);
+ kill_pending_work_locked(
+ server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
+ gpr_mu_unlock(&server->mu_call);
+
+ maybe_finish_shutdown(server);
+ gpr_mu_unlock(&server->mu_global);
+
+ /* Shutdown listeners */
+ for (l = server->listeners; l; l = l->next) {
+ GRPC_CLOSURE_INIT(&l->destroy_done, listener_destroy_done, server,
+ grpc_schedule_on_exec_ctx);
+ l->destroy(server, l->arg, &l->destroy_done);
+ if (server->channelz_server != nullptr && l->socket_uuid != 0) {
+ server->channelz_server->RemoveChildListenSocket(l->socket_uuid);
+ }
+ }
+
+ channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */,
+ GRPC_ERROR_NONE);
+
+ if (server->default_resource_user != nullptr) {
+ grpc_resource_quota_unref(
+ grpc_resource_user_quota(server->default_resource_user));
+ grpc_resource_user_shutdown(server->default_resource_user);
+ grpc_resource_user_unref(server->default_resource_user);
+ }
+}
+
+void grpc_server_cancel_all_calls(grpc_server* server) {
+ channel_broadcaster broadcaster;
+ grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
+ grpc_core::ExecCtx exec_ctx;
+
+ GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
+
+ gpr_mu_lock(&server->mu_global);
+ channel_broadcaster_init(server, &broadcaster);
+ gpr_mu_unlock(&server->mu_global);
+
+ channel_broadcaster_shutdown(
+ &broadcaster, false /* send_goaway */,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls"));
+}
+
+void grpc_server_destroy(grpc_server* server) {
+ listener* l;
+ grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
+ grpc_core::ExecCtx exec_ctx;
+
+ GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
+
+ gpr_mu_lock(&server->mu_global);
+ GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
+ GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
+
+ while (server->listeners) {
+ l = server->listeners;
+ server->listeners = l->next;
+ gpr_free(l);
+ }
+
+ gpr_mu_unlock(&server->mu_global);
+
+ server_unref(server);
+}
+
+void grpc_server_add_listener(
+ grpc_server* server, void* listener_arg,
+ void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets,
+ size_t pollset_count),
+ void (*destroy)(grpc_server* server, void* arg, grpc_closure* on_done),
+ grpc_core::RefCountedPtr<grpc_core::channelz::ListenSocketNode> node) {
+ listener* l = static_cast<listener*>(gpr_malloc(sizeof(listener)));
+ l->arg = listener_arg;
+ l->start = start;
+ l->destroy = destroy;
+ l->socket_uuid = 0;
+ if (node != nullptr) {
+ l->socket_uuid = node->uuid();
+ if (server->channelz_server != nullptr) {
+ server->channelz_server->AddChildListenSocket(std::move(node));
+ }
+ }
+ l->next = server->listeners;
+ server->listeners = l;
+}
+
+static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
+ requested_call* rc) {
+ call_data* calld = nullptr;
+ request_matcher* rm = nullptr;
+ if (gpr_atm_acq_load(&server->shutdown_flag)) {
+ fail_call(server, cq_idx, rc,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
+ return GRPC_CALL_OK;
+ }
+ switch (rc->type) {
+ case BATCH_CALL:
+ rm = &server->unregistered_request_matcher;
+ break;
+ case REGISTERED_CALL:
+ rm = &rc->data.registered.method->matcher;
+ break;
+ }
+ if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
+ /* this was the first queued request: we need to lock and start
+ matching calls */
+ gpr_mu_lock(&server->mu_call);
+ while ((calld = rm->pending_head) != nullptr) {
+ rc = reinterpret_cast<requested_call*>(
+ gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
+ if (rc == nullptr) break;
+ rm->pending_head = calld->pending_next;
+ gpr_mu_unlock(&server->mu_call);
+ if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
+ // Zombied Call
+ GRPC_CLOSURE_INIT(
+ &calld->kill_zombie_closure, kill_zombie,
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
+ } else {
+ publish_call(server, calld, cq_idx, rc);
+ }
+ gpr_mu_lock(&server->mu_call);
+ }
+ gpr_mu_unlock(&server->mu_call);
+ }
+ return GRPC_CALL_OK;
+}
+
+grpc_call_error grpc_server_request_call(
+ grpc_server* server, grpc_call** call, grpc_call_details* details,
+ grpc_metadata_array* initial_metadata,
+ grpc_completion_queue* cq_bound_to_call,
+ grpc_completion_queue* cq_for_notification, void* tag) {
+ grpc_call_error error;
+ grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
+ grpc_core::ExecCtx exec_ctx;
+ requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
+ GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
+ GRPC_API_TRACE(
+ "grpc_server_request_call("
+ "server=%p, call=%p, details=%p, initial_metadata=%p, "
+ "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
+ 7,
+ (server, call, details, initial_metadata, cq_bound_to_call,
+ cq_for_notification, tag));
+ size_t cq_idx;
+ for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
+ if (server->cqs[cq_idx] == cq_for_notification) {
+ break;
+ }
+ }
+ if (cq_idx == server->cq_count) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ goto done;
+ }
+ if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
+ goto done;
+ }
+ details->reserved = nullptr;
+ rc->cq_idx = cq_idx;
+ rc->type = BATCH_CALL;
+ rc->server = server;
+ rc->tag = tag;
+ rc->cq_bound_to_call = cq_bound_to_call;
+ rc->call = call;
+ rc->data.batch.details = details;
+ rc->initial_metadata = initial_metadata;
+ error = queue_call_request(server, cq_idx, rc);
+done:
+
+ return error;
+}
+
+grpc_call_error grpc_server_request_registered_call(
+ grpc_server* server, void* rmp, grpc_call** call, gpr_timespec* deadline,
+ grpc_metadata_array* initial_metadata, grpc_byte_buffer** optional_payload,
+ grpc_completion_queue* cq_bound_to_call,
+ grpc_completion_queue* cq_for_notification, void* tag) {
+ grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
+ grpc_core::ExecCtx exec_ctx;
+ GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
+ requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
+ registered_method* rm = static_cast<registered_method*>(rmp);
+ GRPC_API_TRACE(
+ "grpc_server_request_registered_call("
+ "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
+ "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
+ "tag=%p)",
+ 9,
+ (server, rmp, call, deadline, initial_metadata, optional_payload,
+ cq_bound_to_call, cq_for_notification, tag));
+
+ size_t cq_idx;
+ for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
+ if (server->cqs[cq_idx] == cq_for_notification) {
+ break;
+ }
+ }
+ if (cq_idx == server->cq_count) {
+ gpr_free(rc);
+ return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ }
+ if ((optional_payload == nullptr) !=
+ (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) {
+ gpr_free(rc);
+ return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
+ }
+
+ if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
+ gpr_free(rc);
+ return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
+ }
+ rc->cq_idx = cq_idx;
+ rc->type = REGISTERED_CALL;
+ rc->server = server;
+ rc->tag = tag;
+ rc->cq_bound_to_call = cq_bound_to_call;
+ rc->call = call;
+ rc->data.registered.method = rm;
+ rc->data.registered.deadline = deadline;
+ rc->initial_metadata = initial_metadata;
+ rc->data.registered.optional_payload = optional_payload;
+ return queue_call_request(server, cq_idx, rc);
+}
+
+static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
+ grpc_error* error) {
+ *rc->call = nullptr;
+ rc->initial_metadata->count = 0;
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+
+ grpc_cq_end_op(server->cqs[cq_idx], rc->tag, error, done_request_event, rc,
+ &rc->completion);
+}
+
+const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) {
+ return server->channel_args;
+}
+
+grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server) {
+ return server->default_resource_user;
+}
+
+int grpc_server_has_open_connections(grpc_server* server) {
+ int r;
+ gpr_mu_lock(&server->mu_global);
+ r = server->root_channel_data.next != &server->root_channel_data;
+ gpr_mu_unlock(&server->mu_global);
+ return r;
+}
+
+grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
+ grpc_server* server) {
+ if (server == nullptr) {
+ return nullptr;
+ }
+ return server->channelz_server.get();
+}