--- /dev/null
+/**
+ * @license
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Client Interceptors
+ *
+ * This module describes the interceptor framework for clients.
+ * An interceptor is a function which takes an options object and a nextCall
+ * function and returns an InterceptingCall:
+ *
+ * ```
+ * var interceptor = function(options, nextCall) {
+ * return new InterceptingCall(nextCall(options));
+ * }
+ * ```
+ *
+ * The interceptor function must return an InterceptingCall object. Returning
+ * `new InterceptingCall(nextCall(options))` will satisfy the contract (but
+ * provide no interceptor functionality). `nextCall` is a function which will
+ * generate the next interceptor in the chain.
+ *
+ * To implement interceptor functionality, create a requester and pass it to
+ * the InterceptingCall constructor:
+ *
+ * `return new InterceptingCall(nextCall(options), requester);`
+ *
+ * A requester is a POJO with zero or more of the following methods:
+ *
+ * `start(metadata, listener, next)`
+ * * To continue, call next(metadata, listener). Listeners are described
+ * * below.
+ *
+ * `sendMessage(message, next)`
+ * * To continue, call next(message).
+ *
+ * `halfClose(next)`
+ * * To continue, call next().
+ *
+ * `cancel(message, next)`
+ * * To continue, call next().
+ *
+ * A listener is a POJO with one or more of the following methods:
+ *
+ * `onReceiveMetadata(metadata, next)`
+ * * To continue, call next(metadata)
+ *
+ * `onReceiveMessage(message, next)`
+ * * To continue, call next(message)
+ *
+ * `onReceiveStatus(status, next)`
+ * * To continue, call next(status)
+ *
+ * A listener is provided by the requester's `start` method. The provided
+ * listener implements all the inbound interceptor methods, which can be called
+ * to short-circuit the gRPC call.
+ *
+ * Three usage patterns are supported for listeners:
+ * 1) Pass the listener along without modification: `next(metadata, listener)`.
+ * In this case the interceptor declines to intercept any inbound operations.
+ * 2) Create a new listener with one or more inbound interceptor methods and
+ * pass it to `next`. In this case the interceptor will fire on the inbound
+ * operations implemented in the new listener.
+ * 3) Make direct inbound calls to the provided listener's methods. This
+ * short-circuits the interceptor stack.
+ *
+ * Do not modify the listener passed in. Either pass it along unmodified,
+ * ignore it, or call methods on it to short-circuit the call.
+ *
+ * To intercept errors, implement the `onReceiveStatus` method and test for
+ * `status.code !== grpc.status.OK`.
+ *
+ * To intercept trailers, examine `status.metadata` in the `onReceiveStatus`
+ * method.
+ *
+ * This is a trivial implementation of all interceptor methods:
+ * var interceptor = function(options, nextCall) {
+ * return new InterceptingCall(nextCall(options), {
+ * start: function(metadata, listener, next) {
+ * next(metadata, {
+ * onReceiveMetadata: function (metadata, next) {
+ * next(metadata);
+ * },
+ * onReceiveMessage: function (message, next) {
+ * next(message);
+ * },
+ * onReceiveStatus: function (status, next) {
+ * next(status);
+ * },
+ * });
+ * },
+ * sendMessage: function(message, next) {
+ * next(message);
+ * },
+ * halfClose: function(next) {
+ * next();
+ * },
+ * cancel: function(message, next) {
+ * next();
+ * }
+ * });
+ * };
+ *
+ * This is an interceptor with a single method:
+ * var interceptor = function(options, nextCall) {
+ * return new InterceptingCall(nextCall(options), {
+ * sendMessage: function(message, next) {
+ * next(message);
+ * }
+ * });
+ * };
+ *
+ * Builders are provided for convenience: StatusBuilder, ListenerBuilder,
+ * and RequesterBuilder
+ *
+ * gRPC client operations use this mapping to interceptor methods:
+ *
+ * grpc.opType.SEND_INITIAL_METADATA -> start
+ * grpc.opType.SEND_MESSAGE -> sendMessage
+ * grpc.opType.SEND_CLOSE_FROM_CLIENT -> halfClose
+ * grpc.opType.RECV_INITIAL_METADATA -> onReceiveMetadata
+ * grpc.opType.RECV_MESSAGE -> onReceiveMessage
+ * grpc.opType.RECV_STATUS_ON_CLIENT -> onReceiveStatus
+ *
+ * @module
+ */
+
+'use strict';
+
+var grpc = require('./grpc_extension');
+var Metadata = require('./metadata');
+var constants = require('./constants');
+var common = require('./common');
+var methodTypes = constants.methodTypes;
+var EventEmitter = require('events').EventEmitter;
+
+/**
+ * A custom error thrown when interceptor configuration fails.
+ * @param {string} message The error message
+ * @param {object=} extra
+ * @constructor
+ */
+var InterceptorConfigurationError =
+ function InterceptorConfigurationError(message, extra) {
+ Error.captureStackTrace(this, this.constructor);
+ this.name = this.constructor.name;
+ this.message = message;
+ this.extra = extra;
+ };
+
+require('util').inherits(InterceptorConfigurationError, Error);
+
+/**
+ * A builder for gRPC status objects.
+ * @constructor
+ */
+function StatusBuilder() {
+ this.code = null;
+ this.details = null;
+ this.metadata = null;
+}
+
+/**
+ * Adds a status code to the builder.
+ * @param {number} code The status code.
+ * @return {StatusBuilder}
+ */
+StatusBuilder.prototype.withCode = function(code) {
+ this.code = code;
+ return this;
+};
+
+/**
+ * Adds details to the builder.
+ * @param {string} details A status message.
+ * @return {StatusBuilder}
+ */
+StatusBuilder.prototype.withDetails = function(details) {
+ this.details = details;
+ return this;
+};
+
+/**
+ * Adds metadata to the builder.
+ * @param {Metadata} metadata The gRPC status metadata.
+ * @return {StatusBuilder}
+ */
+StatusBuilder.prototype.withMetadata = function(metadata) {
+ this.metadata = metadata;
+ return this;
+};
+
+/**
+ * Builds the status object.
+ * @return {grpc~StatusObject} A gRPC status.
+ */
+StatusBuilder.prototype.build = function() {
+ var status = {};
+ if (this.code !== undefined) {
+ status.code = this.code;
+ }
+ if (this.details) {
+ status.details = this.details;
+ }
+ if (this.metadata) {
+ status.metadata = this.metadata;
+ }
+ return status;
+};
+
+/**
+ * A builder for listener interceptors.
+ * @constructor
+ */
+function ListenerBuilder() {
+ this.metadata = null;
+ this.message = null;
+ this.status = null;
+}
+
+/**
+ * Adds an onReceiveMetadata method to the builder.
+ * @param {MetadataListener} on_receive_metadata A listener method for
+ * receiving metadata.
+ * @return {ListenerBuilder}
+ */
+ListenerBuilder.prototype.withOnReceiveMetadata =
+ function(on_receive_metadata) {
+ this.metadata = on_receive_metadata;
+ return this;
+ };
+
+/**
+ * Adds an onReceiveMessage method to the builder.
+ * @param {MessageListener} on_receive_message A listener method for receiving
+ * messages.
+ * @return {ListenerBuilder}
+ */
+ListenerBuilder.prototype.withOnReceiveMessage = function(on_receive_message) {
+ this.message = on_receive_message;
+ return this;
+};
+
+/**
+ * Adds an onReceiveStatus method to the builder.
+ * @param {StatusListener} on_receive_status A listener method for receiving
+ * status.
+ * @return {ListenerBuilder}
+ */
+ListenerBuilder.prototype.withOnReceiveStatus = function(on_receive_status) {
+ this.status = on_receive_status;
+ return this;
+};
+
+/**
+ * Builds the call listener.
+ * @return {grpc~Listener}
+ */
+ListenerBuilder.prototype.build = function() {
+ var self = this;
+ var listener = {};
+ listener.onReceiveMetadata = self.metadata;
+ listener.onReceiveMessage = self.message;
+ listener.onReceiveStatus = self.status;
+ return listener;
+};
+
+/**
+ * A builder for the outbound methods of an interceptor.
+ * @constructor
+ */
+function RequesterBuilder() {
+ this.start = null;
+ this.message = null;
+ this.half_close = null;
+ this.cancel = null;
+}
+
+/**
+ * Add a metadata requester to the builder.
+ * @param {MetadataRequester} start A requester method for handling metadata.
+ * @return {RequesterBuilder}
+ */
+RequesterBuilder.prototype.withStart = function(start) {
+ this.start = start;
+ return this;
+};
+
+/**
+ * Add a message requester to the builder.
+ * @param {MessageRequester} send_message A requester method for handling
+ * messages.
+ * @return {RequesterBuilder}
+ */
+RequesterBuilder.prototype.withSendMessage = function(send_message) {
+ this.message = send_message;
+ return this;
+};
+
+/**
+ * Add a close requester to the builder.
+ * @param {CloseRequester} half_close A requester method for handling client
+ * close.
+ * @return {RequesterBuilder}
+ */
+RequesterBuilder.prototype.withHalfClose = function(half_close) {
+ this.half_close = half_close;
+ return this;
+};
+
+/**
+ * Add a cancel requester to the builder.
+ * @param {CancelRequester} cancel A requester method for handling `cancel`
+ * @return {RequesterBuilder}
+ */
+RequesterBuilder.prototype.withCancel = function(cancel) {
+ this.cancel = cancel;
+ return this;
+};
+
+/**
+ * Builds the requester's interceptor methods.
+ * @return {grpc~Requester}
+ */
+RequesterBuilder.prototype.build = function() {
+ var requester = {};
+ requester.start = this.start;
+ requester.sendMessage = this.message;
+ requester.halfClose = this.half_close;
+ requester.cancel = this.cancel;
+ return requester;
+};
+
+/**
+ * Transforms a list of interceptor providers into interceptors.
+ * @param {InterceptorProvider[]} providers
+ * @param {grpc~MethodDefinition} method_definition
+ * @return {null|Interceptor[]}
+ */
+var resolveInterceptorProviders = function(providers, method_definition) {
+ if (!Array.isArray(providers)) {
+ return null;
+ }
+ var interceptors = [];
+ for (var i = 0; i < providers.length; i++) {
+ var provider = providers[i];
+ var interceptor = provider(method_definition);
+ if (interceptor) {
+ interceptors.push(interceptor);
+ }
+ }
+ return interceptors;
+};
+
+/**
+ * A chainable gRPC call proxy which will delegate to an optional requester
+ * object. By default, interceptor methods will chain to next_call. If a
+ * requester is provided which implements an interceptor method, that
+ * requester method will be executed as part of the chain.
+ * @param {InterceptingCall|null} next_call The next call in the chain
+ * @param {grpc~Requester=} requester Interceptor methods to handle request
+ * operations.
+ * @constructor
+ */
+function InterceptingCall(next_call, requester) {
+ this.next_call = next_call;
+ this.requester = requester;
+}
+
+const emptyNext = function() {};
+
+/**
+ * Get the next method in the chain or a no-op function if we are at the end
+ * of the chain
+ * @param {string} method_name
+ * @return {function} The next method in the chain
+ * @private
+ */
+InterceptingCall.prototype._getNextCall = function(method_name) {
+ return this.next_call ?
+ this.next_call[method_name].bind(this.next_call) :
+ emptyNext;
+};
+
+/**
+ * Call the next method in the chain. This will either be on the next
+ * InterceptingCall (next_call), or the requester if the requester
+ * implements the method.
+ * @param {string} method_name The name of the interceptor method
+ * @param {array=} args Payload arguments for the operation
+ * @param {function=} next The next InterceptingCall's method
+ * @return {null}
+ * @private
+ */
+InterceptingCall.prototype._callNext = function(method_name, args, next) {
+ var args_array = args || [];
+ var next_call = next ? next : this._getNextCall(method_name);
+ if (this.requester && this.requester[method_name]) {
+ // Avoid using expensive `apply` calls
+ var num_args = args_array.length;
+ switch (num_args) {
+ case 0:
+ return this.requester[method_name](next_call);
+ case 1:
+ return this.requester[method_name](args_array[0], next_call);
+ case 2:
+ return this.requester[method_name](args_array[0], args_array[1],
+ next_call);
+ }
+ } else {
+ if (next_call === emptyNext) {
+ throw new Error('Interceptor call chain terminated unexpectedly');
+ }
+ return next_call(args_array[0], args_array[1]);
+ }
+};
+
+/**
+ * Starts a call through the outbound interceptor chain and adds an element to
+ * the reciprocal inbound listener chain.
+ * @param {grpc.Metadata} metadata The outgoing metadata.
+ * @param {grpc~Listener} listener An intercepting listener for inbound
+ * operations.
+ */
+InterceptingCall.prototype.start = function(metadata, listener) {
+ var self = this;
+
+ // If the listener provided is an InterceptingListener, use it. Otherwise, we
+ // must be at the end of the listener chain, and any listener operations
+ // should be terminated in an EndListener.
+ var next_listener = _getInterceptingListener(listener, new EndListener());
+
+ // Build the next method in the interceptor chain
+ var next = function(metadata, current_listener) {
+ // If there is a next call in the chain, run it. Otherwise do nothing.
+ if (self.next_call) {
+ // Wire together any listener provided with the next listener
+ var listener = _getInterceptingListener(current_listener, next_listener);
+ self.next_call.start(metadata, listener);
+ }
+ };
+ this._callNext('start', [metadata, next_listener], next);
+};
+
+/**
+ * Pass a message through the interceptor chain.
+ * @param {jspb.Message} message
+ */
+InterceptingCall.prototype.sendMessage = function(message) {
+ this._callNext('sendMessage', [message]);
+};
+
+/**
+ * Run a close operation through the interceptor chain
+ */
+InterceptingCall.prototype.halfClose = function() {
+ this._callNext('halfClose');
+};
+
+/**
+ * Run a cancel operation through the interceptor chain
+ */
+InterceptingCall.prototype.cancel = function() {
+ this._callNext('cancel');
+};
+
+/**
+ * Run a cancelWithStatus operation through the interceptor chain.
+ * @param {number} code
+ * @param {string} details
+ */
+InterceptingCall.prototype.cancelWithStatus = function(code, details) {
+ this._callNext('cancelWithStatus', [code, details]);
+};
+
+/**
+ * Pass a getPeer call down to the base gRPC call (should not be intercepted)
+ * @return {object}
+ */
+InterceptingCall.prototype.getPeer = function() {
+ return this._callNext('getPeer');
+};
+
+/**
+ * For streaming calls, we need to transparently pass the stream's context
+ * through the interceptor chain. Passes the context between InterceptingCalls
+ * but hides it from any requester implementations.
+ * @param {object} context Carries objects needed for streaming operations.
+ * @param {jspb.Message} message The message to send.
+ */
+InterceptingCall.prototype.sendMessageWithContext = function(context, message) {
+ var next = this.next_call ?
+ this.next_call.sendMessageWithContext.bind(this.next_call, context) :
+ context;
+ this._callNext('sendMessage', [message], next);
+};
+
+/**
+ * For receiving streaming messages, we need to seed the base interceptor with
+ * the streaming context to create a RECV_MESSAGE batch.
+ * @param {object} context Carries objects needed for streaming operations
+ */
+InterceptingCall.prototype.recvMessageWithContext = function(context) {
+ this._callNext('recvMessageWithContext', [context]);
+};
+
+/**
+ * A chain-able listener object which will delegate to a custom listener when
+ * appropriate.
+ * @param {InterceptingListener|null} next_listener The next
+ * InterceptingListener in the chain
+ * @param {grpc~Listener=} delegate A custom listener object which may implement
+ * specific operations
+ * @constructor
+ */
+function InterceptingListener(next_listener, delegate) {
+ this.delegate = delegate || {};
+ this.next_listener = next_listener;
+}
+
+/**
+ * Get the next method in the chain or a no-op function if we are at the end
+ * of the chain.
+ * @param {string} method_name The name of the listener method.
+ * @return {function} The next method in the chain
+ * @private
+ */
+InterceptingListener.prototype._getNextListener = function(method_name) {
+ return this.next_listener ?
+ this.next_listener[method_name].bind(this.next_listener) :
+ function(){};
+};
+
+/**
+ * Call the next method in the chain. This will either be on the next
+ * InterceptingListener (next_listener), or the requester if the requester
+ * implements the method.
+ * @param {string} method_name The name of the interceptor method
+ * @param {array=} args Payload arguments for the operation
+ * @param {function=} next The next InterceptingListener's method
+ * @return {null}
+ * @private
+ */
+InterceptingListener.prototype._callNext = function(method_name, args, next) {
+ var args_array = args || [];
+ var next_listener = next ? next : this._getNextListener(method_name);
+ if (this.delegate && this.delegate[method_name]) {
+ // Avoid using expensive `apply` calls
+ var num_args = args_array.length;
+ switch (num_args) {
+ case 0:
+ return this.delegate[method_name](next_listener);
+ case 1:
+ return this.delegate[method_name](args_array[0], next_listener);
+ case 2:
+ return this.delegate[method_name](args_array[0], args_array[1],
+ next_listener);
+ }
+ } else {
+ return next_listener(args_array[0], args_array[1]);
+ }
+};
+/**
+ * Inbound metadata receiver.
+ * @param {Metadata} metadata
+ */
+InterceptingListener.prototype.onReceiveMetadata = function(metadata) {
+ this._callNext('onReceiveMetadata', [metadata]);
+};
+
+/**
+ * Inbound message receiver.
+ * @param {jspb.Message} message
+ */
+InterceptingListener.prototype.onReceiveMessage = function(message) {
+ this._callNext('onReceiveMessage', [message]);
+};
+
+/**
+ * When intercepting streaming message, we need to pass the streaming context
+ * transparently along the chain. Hides the context from the delegate listener
+ * methods.
+ * @param {object} context Carries objects needed for streaming operations.
+ * @param {jspb.Message} message The message received.
+ */
+InterceptingListener.prototype.recvMessageWithContext = function(context,
+ message) {
+ var fallback = this.next_listener.recvMessageWithContext;
+ var next_method = this.next_listener ?
+ fallback.bind(this.next_listener, context) :
+ context;
+ if (this.delegate.onReceiveMessage) {
+ this.delegate.onReceiveMessage(message, next_method, context);
+ } else {
+ next_method(message);
+ }
+};
+
+/**
+ * Inbound status receiver.
+ * @param {grpc~StatusObject} status
+ */
+InterceptingListener.prototype.onReceiveStatus = function(status) {
+ this._callNext('onReceiveStatus', [status]);
+};
+
+/**
+ * A dead-end listener used to terminate a call chain. Used when an interceptor
+ * creates a branch chain, when the branch returns the listener chain will
+ * terminate here.
+ * @constructor
+ */
+function EndListener() {}
+EndListener.prototype.onReceiveMetadata = function(){};
+EndListener.prototype.onReceiveMessage = function(){};
+EndListener.prototype.onReceiveStatus = function(){};
+EndListener.prototype.recvMessageWithContext = function(){};
+
+/**
+ * Get a call object built with the provided options.
+ * @param {grpc.Channel} channel
+ * @param {string} path
+ * @param {grpc.Client~CallOptions=} options Options object.
+ */
+function getCall(channel, path, options) {
+ var deadline;
+ var host;
+ var parent;
+ var propagate_flags;
+ var credentials;
+ if (options) {
+ deadline = options.deadline;
+ host = options.host;
+ parent = options.parent ? options.parent.call : undefined;
+ propagate_flags = options.propagate_flags;
+ credentials = options.credentials;
+ }
+ if (deadline === undefined) {
+ deadline = Infinity;
+ }
+ var call = channel.createCall(path, deadline, host,
+ parent, propagate_flags);
+ if (credentials) {
+ call.setCredentials(credentials);
+ }
+ return call;
+}
+
+var OP_DEPENDENCIES = {
+ [grpc.opType.SEND_MESSAGE]: [grpc.opType.SEND_INITIAL_METADATA],
+ [grpc.opType.SEND_CLOSE_FROM_CLIENT]: [grpc.opType.SEND_MESSAGE],
+ [grpc.opType.RECV_MESSAGE]: [grpc.opType.SEND_INITIAL_METADATA]
+};
+
+/**
+ * Produces a callback triggered by streaming response messages.
+ * @private
+ * @param {EventEmitter} emitter
+ * @param {grpc.internal~Call} call
+ * @param {function} get_listener Returns a grpc~Listener.
+ * @param {grpc~deserialize} deserialize
+ * @return {Function}
+ */
+function _getStreamReadCallback(emitter, call, get_listener, deserialize) {
+ return function (err, response) {
+ if (err) {
+ // Something has gone wrong. Stop reading and wait for status
+ emitter.finished = true;
+ emitter._readsDone();
+ return;
+ }
+ var data = response.read;
+ var deserialized;
+ try {
+ deserialized = deserialize(data);
+ } catch (e) {
+ common.log(constants.logVerbosity.ERROR, `Response deserialization failed: ${e.message}`);
+ emitter._readsDone({
+ code: constants.status.INTERNAL,
+ details: 'Failed to parse server response'
+ });
+ return;
+ }
+ if (data === null) {
+ emitter._readsDone();
+ return;
+ }
+ var listener = get_listener();
+ var context = {
+ call: call,
+ listener: listener
+ };
+ listener.recvMessageWithContext(context, deserialized);
+ };
+}
+
+/**
+ * Tests whether a batch can be started.
+ * @private
+ * @param {number[]} batch_ops The operations in the batch we are checking.
+ * @param {number[]} completed_ops Previously completed operations.
+ * @return {boolean}
+ */
+function _areBatchRequirementsMet(batch_ops, completed_ops) {
+ var dependencies = common.flatMap(batch_ops, function(op) {
+ return OP_DEPENDENCIES[op] || [];
+ });
+ for (var i = 0; i < dependencies.length; i++) {
+ var required_dep = dependencies[i];
+ if (batch_ops.indexOf(required_dep) === -1 &&
+ completed_ops.indexOf(required_dep) === -1) {
+ return false;
+ }
+ }
+ return true;
+}
+
+/**
+ * Enforces the order of operations for synchronous requests. If a batch's
+ * operations cannot be started because required operations have not started
+ * yet, the batch is deferred until requirements are met.
+ * @private
+ * @param {grpc.Client~Call} call
+ * @param {object} batch
+ * @param {object} batch_state
+ * @param {number[]} [batch_state.completed_ops] The ops already sent.
+ * @param {object} [batch_state.deferred_batches] Batches to be sent after
+ * their dependencies are fulfilled.
+ * @param {function} callback
+ * @return {object}
+ */
+function _startBatchIfReady(call, batch, batch_state, callback) {
+ var completed_ops = batch_state.completed_ops;
+ var deferred_batches = batch_state.deferred_batches;
+ var batch_ops = Object.keys(batch).map(Number);
+ if (_areBatchRequirementsMet(batch_ops, completed_ops)) {
+ // Dependencies are met, start the batch and any deferred batches whose
+ // dependencies are met as a result.
+ call.startBatch(batch, callback);
+ completed_ops = Array.from(new Set(completed_ops.concat(batch_ops)));
+ deferred_batches = common.flatMap(deferred_batches, function(deferred_batch) {
+ var deferred_batch_ops = Object.keys(deferred_batch).map(Number);
+ if (_areBatchRequirementsMet(deferred_batch_ops, completed_ops)) {
+ call.startBatch(deferred_batch.batch, deferred_batch.callback);
+ return [];
+ }
+ return [deferred_batch];
+ });
+ } else {
+ // Dependencies are not met, defer the batch
+ deferred_batches = deferred_batches.concat({
+ batch: batch,
+ callback: callback
+ });
+ }
+ return {
+ completed_ops: completed_ops,
+ deferred_batches: deferred_batches
+ };
+}
+
+/**
+ * Produces an interceptor which will start gRPC batches for unary calls.
+ * @private
+ * @param {grpc~MethodDefinition} method_definition
+ * @param {grpc.Channel} channel
+ * @param {EventEmitter} emitter
+ * @param {function} callback
+ * @return {Interceptor}
+ */
+function _getUnaryInterceptor(method_definition, channel, emitter, callback) {
+ var serialize = method_definition.requestSerialize;
+ var deserialize = method_definition.responseDeserialize;
+ return function (options) {
+ var call = getCall(channel, method_definition.path, options);
+ var first_listener;
+ var final_requester = {};
+ var batch_state = {
+ completed_ops: [],
+ deferred_batches: []
+ };
+ final_requester.start = function (metadata, listener) {
+ var batch = {
+ [grpc.opType.SEND_INITIAL_METADATA]:
+ metadata._getCoreRepresentation(),
+ };
+ first_listener = listener;
+ batch_state = _startBatchIfReady(call, batch, batch_state,
+ function() {});
+ };
+ final_requester.sendMessage = function (message) {
+ var batch = {
+ [grpc.opType.SEND_MESSAGE]: serialize(message),
+ };
+ batch_state = _startBatchIfReady(call, batch, batch_state,
+ function() {});
+ };
+ final_requester.halfClose = function () {
+ var batch = {
+ [grpc.opType.SEND_CLOSE_FROM_CLIENT]: true,
+ [grpc.opType.RECV_INITIAL_METADATA]: true,
+ [grpc.opType.RECV_MESSAGE]: true,
+ [grpc.opType.RECV_STATUS_ON_CLIENT]: true
+ };
+ var callback = function (err, response) {
+ response.status.metadata = Metadata._fromCoreRepresentation(
+ response.status.metadata);
+ var status = response.status;
+ var deserialized;
+ if (status.code === constants.status.OK) {
+ if (err) {
+ // Got a batch error, but OK status. Something went wrong
+ callback(err);
+ return;
+ } else {
+ try {
+ deserialized = deserialize(response.read);
+ } catch (e) {
+ common.log(constants.logVerbosity.ERROR, `Response deserialization failed: ${e.message}`);
+ /* Change status to indicate bad server response. This
+ * will result in passing an error to the callback */
+ status = {
+ code: constants.status.INTERNAL,
+ details: 'Failed to parse server response'
+ };
+ }
+ }
+ }
+ response.metadata =
+ Metadata._fromCoreRepresentation(response.metadata);
+ first_listener.onReceiveMetadata(response.metadata);
+ first_listener.onReceiveMessage(deserialized);
+ first_listener.onReceiveStatus(status);
+ };
+ batch_state = _startBatchIfReady(call, batch, batch_state, callback);
+ };
+ final_requester.cancel = function () {
+ call.cancel();
+ };
+ final_requester.cancelWithStatus = function(code, details) {
+ call.cancelWithStatus(code, details)
+ };
+ final_requester.getPeer = function () {
+ return call.getPeer();
+ };
+ return new InterceptingCall(null, final_requester);
+ };
+}
+
+/**
+ * Produces an interceptor which will start gRPC batches for client streaming
+ * calls.
+ * @private
+ * @param {grpc~MethodDefinition} method_definition
+ * @param {grpc.Channel} channel
+ * @param {EventEmitter} emitter
+ * @param {function} callback
+ * @return {Interceptor}
+ */
+function _getClientStreamingInterceptor(method_definition, channel, emitter,
+ callback) {
+ var serialize = common.wrapIgnoreNull(method_definition.requestSerialize);
+ var deserialize = method_definition.responseDeserialize;
+ return function (options) {
+ var first_listener;
+ var call = getCall(channel, method_definition.path, options);
+ var final_requester = {};
+ final_requester.start = function (metadata, listener) {
+ var metadata_batch = {
+ [grpc.opType.SEND_INITIAL_METADATA]: metadata._getCoreRepresentation(),
+ [grpc.opType.RECV_INITIAL_METADATA]: true
+ };
+ first_listener = listener;
+ call.startBatch(metadata_batch, function (err, response) {
+ if (err) {
+ // The call has stopped for some reason. A non-OK status will arrive
+ // in the other batch.
+ return;
+ }
+ response.metadata = Metadata._fromCoreRepresentation(response.metadata);
+ listener.onReceiveMetadata(response.metadata);
+ });
+ var recv_batch = {};
+ recv_batch[grpc.opType.RECV_MESSAGE] = true;
+ recv_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
+ call.startBatch(recv_batch, function (err, response) {
+ response.status.metadata = Metadata._fromCoreRepresentation(
+ response.status.metadata);
+ var status = response.status;
+ var deserialized;
+ if (status.code === constants.status.OK) {
+ if (err) {
+ // Got a batch error, but OK status. Something went wrong
+ callback(err);
+ return;
+ } else {
+ try {
+ deserialized = deserialize(response.read);
+ } catch (e) {
+ common.log(constants.logVerbosity.ERROR, `Response deserialization failed: ${e.message}`);
+ /* Change status to indicate bad server response. This will result
+ * in passing an error to the callback */
+ status = {
+ code: constants.status.INTERNAL,
+ details: 'Failed to parse server response'
+ };
+ }
+ }
+ }
+ listener.onReceiveMessage(deserialized);
+ listener.onReceiveStatus(status);
+ });
+ };
+ final_requester.sendMessage = function (chunk, context) {
+ var message;
+ var callback = (context && context.callback) ?
+ context.callback :
+ function () { };
+ var encoding = (context && context.encoding) ?
+ context.encoding :
+ '';
+ try {
+ message = serialize(chunk);
+ } catch (e) {
+ common.log(constants.logVerbosity.ERROR, `Request serialization failed: ${e.message}`);
+ /* Sending this error to the server and emitting it immediately on the
+ client may put the call in a slightly weird state on the client side,
+ but passing an object that causes a serialization failure is a misuse
+ of the API anyway, so that's OK. The primary purpose here is to give
+ the programmer a useful error and to stop the stream properly */
+ call.cancelWithStatus(constants.status.INTERNAL,
+ 'Serialization failure');
+ callback(e);
+ return;
+ }
+ if (Number.isFinite(encoding)) {
+ /* Attach the encoding if it is a finite number. This is the closest we
+ * can get to checking that it is valid flags */
+ message.grpcWriteFlags = encoding;
+ }
+ var batch = {
+ [grpc.opType.SEND_MESSAGE]: message
+ };
+ call.startBatch(batch, function (err, event) {
+ callback(err, event);
+ });
+ };
+ final_requester.halfClose = function () {
+ var batch = {
+ [grpc.opType.SEND_CLOSE_FROM_CLIENT]: true
+ };
+ call.startBatch(batch, function () { });
+ };
+ final_requester.cancel = function () {
+ call.cancel();
+ };
+ final_requester.cancelWithStatus = function(code, details) {
+ call.cancelWithStatus(code, details)
+ };
+ final_requester.getPeer = function() {
+ return call.getPeer();
+ };
+ return new InterceptingCall(null, final_requester);
+ };
+}
+
+/**
+ * Produces an interceptor which will start gRPC batches for server streaming
+ * calls.
+ * @private
+ * @param {grpc~MethodDefinition} method_definition
+ * @param {grpc.Channel} channel
+ * @param {EventEmitter} emitter
+ * @return {Interceptor}
+ */
+function _getServerStreamingInterceptor(method_definition, channel, emitter) {
+ var deserialize = common.wrapIgnoreNull(
+ method_definition.responseDeserialize);
+ var serialize = method_definition.requestSerialize;
+ return function (options) {
+ var batch_state = {
+ completed_ops: [],
+ deferred_batches: []
+ };
+ var call = getCall(channel, method_definition.path, options);
+ var final_requester = {};
+ var first_listener;
+ var get_listener = function() {
+ return first_listener;
+ };
+ final_requester.start = function(metadata, listener) {
+ first_listener = listener;
+ metadata = metadata.clone();
+ var metadata_batch = {
+ [grpc.opType.SEND_INITIAL_METADATA]: metadata._getCoreRepresentation(),
+ [grpc.opType.RECV_INITIAL_METADATA]: true
+ };
+ var callback = function(err, response) {
+ if (err) {
+ // The call has stopped for some reason. A non-OK status will arrive
+ // in the other batch.
+ return;
+ }
+ first_listener.onReceiveMetadata(
+ Metadata._fromCoreRepresentation(response.metadata));
+ };
+ batch_state = _startBatchIfReady(call, metadata_batch, batch_state,
+ callback);
+ var status_batch = {
+ [grpc.opType.RECV_STATUS_ON_CLIENT]: true
+ };
+ call.startBatch(status_batch, function(err, response) {
+ if (err) {
+ emitter.emit('error', err);
+ return;
+ }
+ response.status.metadata = Metadata._fromCoreRepresentation(
+ response.status.metadata);
+ first_listener.onReceiveStatus(response.status);
+ });
+ };
+ final_requester.sendMessage = function(argument) {
+ var message = serialize(argument);
+ if (options) {
+ message.grpcWriteFlags = options.flags;
+ }
+ var send_batch = {
+ [grpc.opType.SEND_MESSAGE]: message
+ };
+ var callback = function(err, response) {
+ if (err) {
+ // The call has stopped for some reason. A non-OK status will arrive
+ // in the other batch.
+ return;
+ }
+ };
+ batch_state = _startBatchIfReady(call, send_batch, batch_state, callback);
+ };
+ final_requester.halfClose = function() {
+ var batch = {
+ [grpc.opType.SEND_CLOSE_FROM_CLIENT]: true
+ };
+ batch_state = _startBatchIfReady(call, batch, batch_state, function() {});
+ };
+ final_requester.recvMessageWithContext = function(context) {
+ var recv_batch = {
+ [grpc.opType.RECV_MESSAGE]: true
+ };
+ var callback = _getStreamReadCallback(emitter, call,
+ get_listener, deserialize);
+ batch_state = _startBatchIfReady(call, recv_batch, batch_state, callback);
+ };
+ final_requester.cancel = function() {
+ call.cancel();
+ };
+ final_requester.cancelWithStatus = function(code, details) {
+ call.cancelWithStatus(code, details)
+ };
+ final_requester.getPeer = function() {
+ return call.getPeer();
+ };
+ return new InterceptingCall(null, final_requester);
+ };
+}
+
+/**
+ * Produces an interceptor which will start gRPC batches for bi-directional
+ * calls.
+ * @private
+ * @param {grpc~MethodDefinition} method_definition
+ * @param {grpc.Channel} channel
+ * @param {EventEmitter} emitter
+ * @return {Interceptor}
+ */
+function _getBidiStreamingInterceptor(method_definition, channel, emitter) {
+ var serialize = common.wrapIgnoreNull(method_definition.requestSerialize);
+ var deserialize = common.wrapIgnoreNull(
+ method_definition.responseDeserialize);
+ return function (options) {
+ var first_listener;
+ var get_listener = function() {
+ return first_listener;
+ };
+ var call = getCall(channel, method_definition.path, options);
+ var final_requester = {};
+ final_requester.start = function (metadata, listener) {
+ var metadata_batch = {
+ [grpc.opType.SEND_INITIAL_METADATA]: metadata._getCoreRepresentation(),
+ [grpc.opType.RECV_INITIAL_METADATA]: true
+ };
+ first_listener = listener;
+ call.startBatch(metadata_batch, function (err, response) {
+ if (err) {
+ // The call has stopped for some reason. A non-OK status will arrive
+ // in the other batch.
+ return;
+ }
+ response.metadata = Metadata._fromCoreRepresentation(response.metadata);
+ listener.onReceiveMetadata(response.metadata);
+ });
+ var recv_batch = {};
+ recv_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
+ call.startBatch(recv_batch, function (err, response) {
+ var status = response.status;
+ if (status.code === constants.status.OK) {
+ if (err) {
+ emitter.emit('error', err);
+ return;
+ }
+ }
+ response.status.metadata = Metadata._fromCoreRepresentation(
+ response.status.metadata);
+ listener.onReceiveStatus(status);
+ });
+ };
+ final_requester.sendMessage = function (chunk, context) {
+ var message;
+ var callback = (context && context.callback) ?
+ context.callback :
+ function() {};
+ var encoding = (context && context.encoding) ?
+ context.encoding :
+ '';
+ try {
+ message = serialize(chunk);
+ } catch (e) {
+ common.log(constants.logVerbosity.ERROR, `Request serialization failed: ${e.message}`);
+ /* Sending this error to the server and emitting it immediately on the
+ client may put the call in a slightly weird state on the client side,
+ but passing an object that causes a serialization failure is a misuse
+ of the API anyway, so that's OK. The primary purpose here is to give
+ the programmer a useful error and to stop the stream properly */
+ call.cancelWithStatus(constants.status.INTERNAL,
+ 'Serialization failure');
+ callback(e);
+ return;
+ }
+ if (Number.isFinite(encoding)) {
+ /* Attach the encoding if it is a finite number. This is the closest we
+ * can get to checking that it is valid flags */
+ message.grpcWriteFlags = encoding;
+ }
+ var batch = {
+ [grpc.opType.SEND_MESSAGE]: message
+ };
+ call.startBatch(batch, function (err, event) {
+ callback(err, event);
+ });
+ };
+ final_requester.halfClose = function () {
+ var batch = {
+ [grpc.opType.SEND_CLOSE_FROM_CLIENT]: true
+ };
+ call.startBatch(batch, function () { });
+ };
+ final_requester.recvMessageWithContext = function(context) {
+ var recv_batch = {
+ [grpc.opType.RECV_MESSAGE]: true
+ };
+ call.startBatch(recv_batch, _getStreamReadCallback(emitter, call,
+ get_listener, deserialize));
+ };
+ final_requester.cancel = function() {
+ call.cancel();
+ };
+ final_requester.cancelWithStatus = function(code, details) {
+ call.cancelWithStatus(code, details)
+ };
+ final_requester.getPeer = function() {
+ return call.getPeer();
+ };
+ return new InterceptingCall(null, final_requester);
+ };
+}
+
+/**
+ * Produces a listener for responding to callers of unary RPCs.
+ * @private
+ * @param {grpc~MethodDefinition} method_definition
+ * @param {EventEmitter} emitter
+ * @param {function} callback
+ * @return {grpc~Listener}
+ */
+function _getUnaryListener(method_definition, emitter, callback) {
+ var resultMessage;
+ return {
+ onReceiveMetadata: function (metadata) {
+ emitter.emit('metadata', metadata);
+ },
+ onReceiveMessage: function (message) {
+ resultMessage = message;
+ },
+ onReceiveStatus: function (status) {
+ if (status.code !== constants.status.OK) {
+ var error = common.createStatusError(status);
+ callback(error);
+ } else {
+ callback(null, resultMessage);
+ }
+ emitter.emit('status', status);
+ }
+ };
+}
+
+/**
+ * Produces a listener for responding to callers of client streaming RPCs.
+ * @private
+ * @param {grpc~MethodDefinition} method_definition
+ * @param {EventEmitter} emitter
+ * @param {function} callback
+ * @return {grpc~Listener}
+ */
+function _getClientStreamingListener(method_definition, emitter, callback) {
+ var resultMessage;
+ return {
+ onReceiveMetadata: function (metadata) {
+ emitter.emit('metadata', metadata);
+ },
+ onReceiveMessage: function (message) {
+ resultMessage = message;
+ },
+ onReceiveStatus: function (status) {
+ if (status.code !== constants.status.OK) {
+ var error = common.createStatusError(status);
+ callback(error);
+ } else {
+ callback(null, resultMessage);
+ }
+ emitter.emit('status', status);
+ }
+ };
+}
+
+/**
+ * Produces a listener for responding to callers of server streaming RPCs.
+ * @private
+ * @param {grpc~MethodDefinition} method_definition
+ * @param {EventEmitter} emitter
+ * @return {grpc~Listener}
+ */
+function _getServerStreamingListener(method_definition, emitter) {
+ var deserialize = common.wrapIgnoreNull(
+ method_definition.responseDeserialize);
+ return {
+ onReceiveMetadata: function (metadata) {
+ emitter.emit('metadata', metadata);
+ },
+ onReceiveMessage: function(message, next, context) {
+ if (emitter.push(message) && message !== null) {
+ var call = context.call;
+ var get_listener = function() {
+ return context.listener;
+ };
+ var read_batch = {};
+ read_batch[grpc.opType.RECV_MESSAGE] = true;
+ call.startBatch(read_batch, _getStreamReadCallback(emitter, call,
+ get_listener, deserialize));
+ } else {
+ emitter.reading = false;
+ }
+ },
+ onReceiveStatus: function (status) {
+ emitter._receiveStatus(status);
+ }
+ };
+}
+
+/**
+ * Produces a listener for responding to callers of bi-directional RPCs.
+ * @private
+ * @param {grpc~MethodDefinition} method_definition
+ * @param {EventEmitter} emitter
+ * @return {grpc~Listener}
+ */
+function _getBidiStreamingListener(method_definition, emitter) {
+ var deserialize = common.wrapIgnoreNull(
+ method_definition.responseDeserialize);
+ return {
+ onReceiveMetadata: function (metadata) {
+ emitter.emit('metadata', metadata);
+ },
+ onReceiveMessage: function(message, next, context) {
+ if (emitter.push(message) && message !== null) {
+ var call = context.call;
+ var get_listener = function() {
+ return context.listener;
+ };
+ var read_batch = {};
+ read_batch[grpc.opType.RECV_MESSAGE] = true;
+ call.startBatch(read_batch, _getStreamReadCallback(emitter, call,
+ get_listener, deserialize));
+ } else {
+ emitter.reading = false;
+ }
+ },
+ onReceiveStatus: function (status) {
+ emitter._receiveStatus(status);
+ }
+ };
+}
+
+var interceptorGenerators = {
+ [methodTypes.UNARY]: _getUnaryInterceptor,
+ [methodTypes.CLIENT_STREAMING]: _getClientStreamingInterceptor,
+ [methodTypes.SERVER_STREAMING]: _getServerStreamingInterceptor,
+ [methodTypes.BIDI_STREAMING]: _getBidiStreamingInterceptor
+};
+
+var listenerGenerators = {
+ [methodTypes.UNARY]: _getUnaryListener,
+ [methodTypes.CLIENT_STREAMING]: _getClientStreamingListener,
+ [methodTypes.SERVER_STREAMING]: _getServerStreamingListener,
+ [methodTypes.BIDI_STREAMING]: _getBidiStreamingListener
+};
+
+/**
+ * Creates the last listener in an interceptor stack.
+ * @param {grpc~MethodDefinition} method_definition
+ * @param {EventEmitter} emitter
+ * @param {function=} callback
+ * @return {grpc~Listener}
+ */
+function getLastListener(method_definition, emitter, callback) {
+ if (emitter instanceof Function) {
+ callback = emitter;
+ callback = function() {};
+ }
+ if (!(callback instanceof Function)) {
+ callback = function() {};
+ }
+ if (!((emitter instanceof EventEmitter) &&
+ (callback instanceof Function))) {
+ throw new Error('Argument mismatch in getLastListener');
+ }
+ var method_type = common.getMethodType(method_definition);
+ var generator = listenerGenerators[method_type];
+ return generator(method_definition, emitter, callback);
+}
+
+/**
+ *
+ * @param {grpc~MethodDefinition} method_definition
+ * @param {grpc.Client~CallOptions} options
+ * @param {Interceptor[]} interceptors
+ * @param {grpc.Channel} channel
+ * @param {function|EventEmitter} responder
+ */
+function getInterceptingCall(method_definition, options,
+ interceptors, channel, responder) {
+ var last_interceptor = _getLastInterceptor(method_definition, channel,
+ responder);
+ var all_interceptors = interceptors.concat(last_interceptor);
+ return _buildChain(all_interceptors, options);
+}
+
+/**
+ * Creates the last interceptor in an interceptor stack.
+ * @private
+ * @param {grpc~MethodDefinition} method_definition
+ * @param {grpc.Channel} channel
+ * @param {function|EventEmitter} responder
+ * @return {Interceptor}
+ */
+function _getLastInterceptor(method_definition, channel, responder) {
+ var callback = (responder instanceof Function) ? responder : function() {};
+ var emitter = (responder instanceof EventEmitter) ? responder :
+ new EventEmitter();
+ var method_type = common.getMethodType(method_definition);
+ var generator = interceptorGenerators[method_type];
+ return generator(method_definition, channel, emitter, callback);
+}
+
+/**
+ * Chain a list of interceptors together and return the first InterceptingCall.
+ * @private
+ * @param {Interceptor[]} interceptors An interceptor stack.
+ * @param {grpc.Client~CallOptions} options Call options.
+ * @return {InterceptingCall}
+ */
+function _buildChain(interceptors, options) {
+ var next = function(interceptors) {
+ if (interceptors.length === 0) {
+ return function (options) {};
+ }
+ var head_interceptor = interceptors[0];
+ var rest_interceptors = interceptors.slice(1);
+ return function (options) {
+ return head_interceptor(options, next(rest_interceptors));
+ };
+ };
+ var chain = next(interceptors)(options);
+ return new InterceptingCall(chain);
+}
+
+/**
+ * Wraps a plain listener object in an InterceptingListener if it isn't an
+ * InterceptingListener already.
+ * @param {InterceptingListener|object|null} current_listener
+ * @param {InterceptingListener|EndListener} next_listener
+ * @return {InterceptingListener|null}
+ * @private
+ */
+function _getInterceptingListener(current_listener, next_listener) {
+ if (!_isInterceptingListener(current_listener)) {
+ return new InterceptingListener(next_listener, current_listener);
+ }
+ return current_listener;
+}
+
+/**
+ * Test if the listener exists and is an InterceptingListener.
+ * @param listener
+ * @return {boolean}
+ * @private
+ */
+function _isInterceptingListener(listener) {
+ return listener && listener.constructor.name === 'InterceptingListener';
+}
+
+exports.resolveInterceptorProviders = resolveInterceptorProviders;
+
+exports.InterceptingCall = InterceptingCall;
+exports.ListenerBuilder = ListenerBuilder;
+exports.RequesterBuilder = RequesterBuilder;
+exports.StatusBuilder = StatusBuilder;
+
+exports.InterceptorConfigurationError = InterceptorConfigurationError;
+
+exports.getInterceptingCall = getInterceptingCall;
+exports.getLastListener = getLastListener;