Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / src / server.js
1 /**
2  * @license
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 'use strict';
20
21 var grpc = require('./grpc_extension');
22
23 var common = require('./common');
24
25 var Metadata = require('./metadata');
26
27 var constants = require('./constants');
28
29 var stream = require('stream');
30
31 var Readable = stream.Readable;
32 var Writable = stream.Writable;
33 var Duplex = stream.Duplex;
34 var util = require('util');
35
36 var EventEmitter = require('events').EventEmitter;
37
38 /**
39  * Handle an error on a call by sending it as a status
40  * @private
41  * @param {grpc.internal~Call} call The call to send the error on
42  * @param {(Object|Error)} error The error object
43  */
44 function handleError(call, error) {
45   var statusMetadata = new Metadata();
46   var status = {
47     code: constants.status.UNKNOWN,
48     details: 'Unknown Error'
49   };
50   if (error.hasOwnProperty('message')) {
51     status.details = error.message;
52   }
53   if (error.hasOwnProperty('code') && Number.isInteger(error.code)) {
54     status.code = error.code;
55     if (error.hasOwnProperty('details')) {
56       status.details = error.details;
57     }
58     if (status.code == constants.status.INTERNAL) {
59       common.log(constants.logVerbosity.ERROR, error);
60     }
61   }
62   if (error.hasOwnProperty('metadata')) {
63     statusMetadata = error.metadata;
64   }
65   status.metadata = statusMetadata._getCoreRepresentation();
66   var error_batch = {};
67   if (!call.metadataSent) {
68     error_batch[grpc.opType.SEND_INITIAL_METADATA] =
69         (new Metadata())._getCoreRepresentation();
70   }
71   error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
72   call.startBatch(error_batch, function(){});
73 }
74
75 /**
76  * Send a response to a unary or client streaming call.
77  * @private
78  * @param {grpc.Call} call The call to respond on
79  * @param {*} value The value to respond with
80  * @param {grpc~serialize} serialize Serialization function for the
81  *     response
82  * @param {grpc.Metadata=} metadata Optional trailing metadata to send with
83  *     status
84  * @param {number=} [flags=0] Flags for modifying how the message is sent.
85  */
86 function sendUnaryResponse(call, value, serialize, metadata, flags) {
87   var end_batch = {};
88   var statusMetadata = new Metadata();
89   var status = {
90     code: constants.status.OK,
91     details: 'OK'
92   };
93   if (metadata) {
94     statusMetadata = metadata;
95   }
96   var message;
97   try {
98     message = serialize(value);
99   } catch (e) {
100     e.code = constants.status.INTERNAL;
101     handleError(call, e);
102     return;
103   }
104   status.metadata = statusMetadata._getCoreRepresentation();
105   if (!call.metadataSent) {
106     end_batch[grpc.opType.SEND_INITIAL_METADATA] =
107         (new Metadata())._getCoreRepresentation();
108     call.metadataSent = true;
109   }
110   message.grpcWriteFlags = flags;
111   end_batch[grpc.opType.SEND_MESSAGE] = message;
112   end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
113   call.startBatch(end_batch, function (){});
114 }
115
116 /**
117  * Initialize a writable stream. This is used for both the writable and duplex
118  * stream constructors.
119  * @private
120  * @param {Writable} stream The stream to set up
121  * @param {function(*):Buffer=} Serialization function for responses
122  */
123 function setUpWritable(stream, serialize) {
124   stream.finished = false;
125   stream.status = {
126     code : constants.status.OK,
127     details : 'OK',
128     metadata : new Metadata()
129   };
130   stream.serialize = common.wrapIgnoreNull(serialize);
131   function sendStatus() {
132     var batch = {};
133     if (!stream.call.metadataSent) {
134       stream.call.metadataSent = true;
135       batch[grpc.opType.SEND_INITIAL_METADATA] =
136           (new Metadata())._getCoreRepresentation();
137     }
138
139     if (stream.status.metadata) {
140       stream.status.metadata = stream.status.metadata._getCoreRepresentation();
141     }
142     batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status;
143     stream.call.startBatch(batch, function(){});
144   }
145   stream.on('finish', sendStatus);
146   /**
147    * Set the pending status to a given error status. If the error does not have
148    * code or details properties, the code will be set to grpc.status.UNKNOWN
149    * and the details will be set to 'Unknown Error'.
150    * @param {Error} err The error object
151    */
152   function setStatus(err) {
153     var code = constants.status.UNKNOWN;
154     var details = 'Unknown Error';
155     var metadata = new Metadata();
156     if (err.hasOwnProperty('message')) {
157       details = err.message;
158     }
159     if (err.hasOwnProperty('code')) {
160       code = err.code;
161       if (err.hasOwnProperty('details')) {
162         details = err.details;
163       }
164     }
165     if (err.hasOwnProperty('metadata')) {
166       metadata = err.metadata;
167     }
168     stream.status = {code: code, details: details, metadata: metadata};
169   }
170   /**
171    * Terminate the call. This includes indicating that reads are done, draining
172    * all pending writes, and sending the given error as a status
173    * @param {Error} err The error object
174    * @this GrpcServerStream
175    */
176   function terminateCall(err) {
177     // Drain readable data
178     setStatus(err);
179     stream.end();
180   }
181   stream.on('error', terminateCall);
182   /**
183    * Override of Writable#end method that allows for sending metadata with a
184    * success status.
185    * @param {Metadata=} metadata Metadata to send with the status
186    */
187   stream.end = function(metadata) {
188     if (metadata) {
189       stream.status.metadata = metadata;
190     }
191     Writable.prototype.end.call(this);
192   };
193 }
194
195 /**
196  * Initialize a readable stream. This is used for both the readable and duplex
197  * stream constructors.
198  * @private
199  * @param {Readable} stream The stream to initialize
200  * @param {grpc~deserialize} deserialize Deserialization function for
201  *     incoming data.
202  */
203 function setUpReadable(stream, deserialize) {
204   stream.deserialize = common.wrapIgnoreNull(deserialize);
205   stream.finished = false;
206   stream.reading = false;
207
208   stream.terminate = function() {
209     stream.finished = true;
210     stream.on('data', function() {});
211   };
212
213   stream.on('cancelled', function() {
214     stream.terminate();
215   });
216 }
217
218 /**
219  * Emitted when the call has been cancelled. After this has been emitted, the
220  * call's `cancelled` property will be set to `true`.
221  * @event grpc~ServerUnaryCall~cancelled
222  */
223
224 util.inherits(ServerUnaryCall, EventEmitter);
225
226 /**
227  * An EventEmitter. Used for unary calls.
228  * @constructor grpc~ServerUnaryCall
229  * @extends external:EventEmitter
230  * @param {grpc.internal~Call} call The call object associated with the request
231  * @param {grpc.Metadata} metadata The request metadata from the client
232  */
233 function ServerUnaryCall(call, metadata) {
234   EventEmitter.call(this);
235   this.call = call;
236   /**
237    * Indicates if the call has been cancelled
238    * @member {boolean} grpc~ServerUnaryCall#cancelled
239    */
240   this.cancelled = false;
241   /**
242    * The request metadata from the client
243    * @member {grpc.Metadata} grpc~ServerUnaryCall#metadata
244    */
245   this.metadata = metadata;
246   /**
247    * The request message from the client
248    * @member {*} grpc~ServerUnaryCall#request
249    */
250   this.request = undefined;
251 }
252
253 /**
254  * Emitted when the call has been cancelled. After this has been emitted, the
255  * call's `cancelled` property will be set to `true`.
256  * @event grpc~ServerWritableStream~cancelled
257  */
258
259 util.inherits(ServerWritableStream, Writable);
260
261 /**
262  * A stream that the server can write to. Used for calls that are streaming from
263  * the server side.
264  * @constructor grpc~ServerWritableStream
265  * @extends external:Writable
266  * @borrows grpc~ServerUnaryCall#sendMetadata as
267  *     grpc~ServerWritableStream#sendMetadata
268  * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerWritableStream#getPeer
269  * @param {grpc.internal~Call} call The call object to send data with
270  * @param {grpc.Metadata} metadata The request metadata from the client
271  * @param {grpc~serialize} serialize Serialization function for writes
272  */
273 function ServerWritableStream(call, metadata, serialize) {
274   Writable.call(this, {objectMode: true});
275   this.call = call;
276
277   this.finished = false;
278   setUpWritable(this, serialize);
279   /**
280    * Indicates if the call has been cancelled
281    * @member {boolean} grpc~ServerWritableStream#cancelled
282    */
283   this.cancelled = false;
284   /**
285    * The request metadata from the client
286    * @member {grpc.Metadata} grpc~ServerWritableStream#metadata
287    */
288   this.metadata = metadata;
289   /**
290    * The request message from the client
291    * @member {*} grpc~ServerWritableStream#request
292    */
293   this.request = undefined;
294 }
295
296 /**
297  * Start writing a chunk of data. This is an implementation of a method required
298  * for implementing stream.Writable.
299  * @private
300  * @param {Buffer} chunk The chunk of data to write
301  * @param {string} encoding Used to pass write flags
302  * @param {function(Error=)} callback Callback to indicate that the write is
303  *     complete
304  */
305 function _write(chunk, encoding, callback) {
306   /* jshint validthis: true */
307   var batch = {};
308   var self = this;
309   var message;
310   try {
311     message = this.serialize(chunk);
312   } catch (e) {
313     e.code = constants.status.INTERNAL;
314     callback(e);
315     return;
316   }
317   if (!this.call.metadataSent) {
318     batch[grpc.opType.SEND_INITIAL_METADATA] =
319         (new Metadata())._getCoreRepresentation();
320     this.call.metadataSent = true;
321   }
322   if (Number.isFinite(encoding)) {
323     /* Attach the encoding if it is a finite number. This is the closest we
324      * can get to checking that it is valid flags */
325     message.grpcWriteFlags = encoding;
326   }
327   batch[grpc.opType.SEND_MESSAGE] = message;
328   this.call.startBatch(batch, function(err, value) {
329     if (err) {
330       self.emit('error', err);
331       return;
332     }
333     callback();
334   });
335 }
336
337 ServerWritableStream.prototype._write = _write;
338
339 /**
340  * Emitted when the call has been cancelled. After this has been emitted, the
341  * call's `cancelled` property will be set to `true`.
342  * @event grpc~ServerReadableStream~cancelled
343  */
344
345 util.inherits(ServerReadableStream, Readable);
346
347 /**
348  * A stream that the server can read from. Used for calls that are streaming
349  * from the client side.
350  * @constructor grpc~ServerReadableStream
351  * @extends external:Readable
352  * @borrows grpc~ServerUnaryCall#sendMetadata as
353  *     grpc~ServerReadableStream#sendMetadata
354  * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerReadableStream#getPeer
355  * @param {grpc.internal~Call} call The call object to read data with
356  * @param {grpc.Metadata} metadata The request metadata from the client
357  * @param {grpc~deserialize} deserialize Deserialization function for reads
358  */
359 function ServerReadableStream(call, metadata, deserialize) {
360   Readable.call(this, {objectMode: true});
361   this.call = call;
362   setUpReadable(this, deserialize);
363   /**
364    * Indicates if the call has been cancelled
365    * @member {boolean} grpc~ServerReadableStream#cancelled
366    */
367   this.cancelled = false;
368   /**
369    * The request metadata from the client
370    * @member {grpc.Metadata} grpc~ServerReadableStream#metadata
371    */
372   this.metadata = metadata;
373 }
374
375 /**
376  * Start reading from the gRPC data source. This is an implementation of a
377  * method required for implementing stream.Readable
378  * @access private
379  * @param {number} size Ignored
380  */
381 function _read(size) {
382   /* jshint validthis: true */
383   var self = this;
384   /**
385    * Callback to be called when a READ event is received. Pushes the data onto
386    * the read queue and starts reading again if applicable
387    * @param {grpc.Event} event READ event object
388    */
389   function readCallback(err, event) {
390     if (err) {
391       self.terminate();
392       return;
393     }
394     if (self.finished) {
395       self.push(null);
396       return;
397     }
398     var data = event.read;
399     var deserialized;
400     try {
401       deserialized = self.deserialize(data);
402     } catch (e) {
403       e.code = constants.status.INTERNAL;
404       self.emit('error', e);
405       return;
406     }
407     if (self.push(deserialized) && data !== null) {
408       var read_batch = {};
409       read_batch[grpc.opType.RECV_MESSAGE] = true;
410       self.call.startBatch(read_batch, readCallback);
411     } else {
412       self.reading = false;
413     }
414   }
415   if (self.finished) {
416     self.push(null);
417   } else {
418     if (!self.reading) {
419       self.reading = true;
420       var batch = {};
421       batch[grpc.opType.RECV_MESSAGE] = true;
422       self.call.startBatch(batch, readCallback);
423     }
424   }
425 }
426
427 ServerReadableStream.prototype._read = _read;
428
429 /**
430  * Emitted when the call has been cancelled. After this has been emitted, the
431  * call's `cancelled` property will be set to `true`.
432  * @event grpc~ServerDuplexStream~cancelled
433  */
434
435 util.inherits(ServerDuplexStream, Duplex);
436
437 /**
438  * A stream that the server can read from or write to. Used for calls with
439  * duplex streaming.
440  * @constructor grpc~ServerDuplexStream
441  * @extends external:Duplex
442  * @borrows grpc~ServerUnaryCall#sendMetadata as
443  *     grpc~ServerDuplexStream#sendMetadata
444  * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerDuplexStream#getPeer
445  * @param {grpc.internal~Call} call Call object to proxy
446  * @param {grpc.Metadata} metadata The request metadata from the client
447  * @param {grpc~serialize} serialize Serialization function for requests
448  * @param {grpc~deserialize} deserialize Deserialization function for
449  *     responses
450  */
451 function ServerDuplexStream(call, metadata, serialize, deserialize) {
452   Duplex.call(this, {objectMode: true});
453   this.call = call;
454   setUpWritable(this, serialize);
455   setUpReadable(this, deserialize);
456   /**
457    * Indicates if the call has been cancelled
458    * @member {boolean} grpc~ServerReadableStream#cancelled
459    */
460   this.cancelled = false;
461   /**
462    * The request metadata from the client
463    * @member {grpc.Metadata} grpc~ServerReadableStream#metadata
464    */
465   this.metadata = metadata;
466 }
467
468 ServerDuplexStream.prototype._read = _read;
469 ServerDuplexStream.prototype._write = _write;
470
471 /**
472  * Send the initial metadata for a writable stream.
473  * @alias grpc~ServerUnaryCall#sendMetadata
474  * @param {grpc.Metadata} responseMetadata Metadata to send
475  */
476 function sendMetadata(responseMetadata) {
477   /* jshint validthis: true */
478   var self = this;
479   if (!this.call.metadataSent) {
480     this.call.metadataSent = true;
481     var batch = {};
482     batch[grpc.opType.SEND_INITIAL_METADATA] =
483         responseMetadata._getCoreRepresentation();
484     this.call.startBatch(batch, function(err) {
485       if (err) {
486         self.emit('error', err);
487         return;
488       }
489     });
490   }
491 }
492
493 ServerUnaryCall.prototype.sendMetadata = sendMetadata;
494 ServerWritableStream.prototype.sendMetadata = sendMetadata;
495 ServerReadableStream.prototype.sendMetadata = sendMetadata;
496 ServerDuplexStream.prototype.sendMetadata = sendMetadata;
497
498 /**
499  * Get the endpoint this call/stream is connected to.
500  * @alias grpc~ServerUnaryCall#getPeer
501  * @return {string} The URI of the endpoint
502  */
503 function getPeer() {
504   /* jshint validthis: true */
505   return this.call.getPeer();
506 }
507
508 ServerUnaryCall.prototype.getPeer = getPeer;
509 ServerReadableStream.prototype.getPeer = getPeer;
510 ServerWritableStream.prototype.getPeer = getPeer;
511 ServerDuplexStream.prototype.getPeer = getPeer;
512
513 /**
514  * Wait for the client to close, then emit a cancelled event if the client
515  * cancelled.
516  * @private
517  */
518 function waitForCancel() {
519   /* jshint validthis: true */
520   var self = this;
521   var cancel_batch = {};
522   cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
523   self.call.startBatch(cancel_batch, function(err, result) {
524     if (err) {
525       self.emit('error', err);
526     }
527     if (result.cancelled) {
528       self.cancelled = true;
529       self.emit('cancelled');
530     }
531   });
532 }
533
534 ServerUnaryCall.prototype.waitForCancel = waitForCancel;
535 ServerReadableStream.prototype.waitForCancel = waitForCancel;
536 ServerWritableStream.prototype.waitForCancel = waitForCancel;
537 ServerDuplexStream.prototype.waitForCancel = waitForCancel;
538
539 /**
540  * Callback function passed to server handlers that handle methods with unary
541  * responses.
542  * @callback grpc.Server~sendUnaryData
543  * @param {grpc~ServiceError} error An error, if the call failed
544  * @param {*} value The response value. Must be a valid argument to the
545  *     `responseSerialize` method of the method that is being handled
546  * @param {grpc.Metadata=} trailer Trailing metadata to send, if applicable
547  * @param {grpc.writeFlags=} flags Flags to modify writing the response
548  */
549
550 /**
551  * User-provided method to handle unary requests on a server
552  * @callback grpc.Server~handleUnaryCall
553  * @param {grpc~ServerUnaryCall} call The call object
554  * @param {grpc.Server~sendUnaryData} callback The callback to call to respond
555  *     to the request
556  */
557
558 /**
559  * Fully handle a unary call
560  * @private
561  * @param {grpc.internal~Call} call The call to handle
562  * @param {Object} handler Request handler object for the method that was called
563  * @param {grpc~Server.handleUnaryCall} handler.func The handler function
564  * @param {grpc~deserialize} handler.deserialize The deserialization function
565  *     for request data
566  * @param {grpc~serialize} handler.serialize The serialization function for
567  *     response data
568  * @param {grpc.Metadata} metadata Metadata from the client
569  */
570 function handleUnary(call, handler, metadata) {
571   var emitter = new ServerUnaryCall(call, metadata);
572   emitter.on('error', function(error) {
573     handleError(call, error);
574   });
575   emitter.waitForCancel();
576   var batch = {};
577   batch[grpc.opType.RECV_MESSAGE] = true;
578   call.startBatch(batch, function(err, result) {
579     if (err) {
580       handleError(call, err);
581       return;
582     }
583     try {
584       emitter.request = handler.deserialize(result.read);
585     } catch (e) {
586       e.code = constants.status.INTERNAL;
587       handleError(call, e);
588       return;
589     }
590     if (emitter.cancelled) {
591       return;
592     }
593     handler.func(emitter, function sendUnaryData(err, value, trailer, flags) {
594       if (err) {
595         if (trailer) {
596           err.metadata = trailer;
597         }
598         handleError(call, err);
599       } else {
600         sendUnaryResponse(call, value, handler.serialize, trailer, flags);
601       }
602     });
603   });
604 }
605
606 /**
607  * User provided method to handle server streaming methods on the server.
608  * @callback grpc.Server~handleServerStreamingCall
609  * @param {grpc~ServerWritableStream} call The call object
610  */
611
612 /**
613  * Fully handle a server streaming call
614  * @private
615  * @param {grpc.internal~Call} call The call to handle
616  * @param {Object} handler Request handler object for the method that was called
617  * @param {grpc~Server.handleServerStreamingCall} handler.func The handler
618  *     function
619  * @param {grpc~deserialize} handler.deserialize The deserialization function
620  *     for request data
621  * @param {grpc~serialize} handler.serialize The serialization function for
622  *     response data
623  * @param {grpc.Metadata} metadata Metadata from the client
624  */
625 function handleServerStreaming(call, handler, metadata) {
626   var stream = new ServerWritableStream(call, metadata, handler.serialize);
627   stream.waitForCancel();
628   var batch = {};
629   batch[grpc.opType.RECV_MESSAGE] = true;
630   call.startBatch(batch, function(err, result) {
631     if (err) {
632       stream.emit('error', err);
633       return;
634     }
635     try {
636       stream.request = handler.deserialize(result.read);
637     } catch (e) {
638       e.code = constants.status.INTERNAL;
639       stream.emit('error', e);
640       return;
641     }
642     handler.func(stream);
643   });
644 }
645
646 /**
647  * User provided method to handle client streaming methods on the server.
648  * @callback grpc.Server~handleClientStreamingCall
649  * @param {grpc~ServerReadableStream} call The call object
650  * @param {grpc.Server~sendUnaryData} callback The callback to call to respond
651  *     to the request
652  */
653
654 /**
655  * Fully handle a client streaming call
656  * @access private
657  * @param {grpc.internal~Call} call The call to handle
658  * @param {Object} handler Request handler object for the method that was called
659  * @param {grpc~Server.handleClientStreamingCall} handler.func The handler
660  *     function
661  * @param {grpc~deserialize} handler.deserialize The deserialization function
662  *     for request data
663  * @param {grpc~serialize} handler.serialize The serialization function for
664  *     response data
665  * @param {grpc.Metadata} metadata Metadata from the client
666  */
667 function handleClientStreaming(call, handler, metadata) {
668   var stream = new ServerReadableStream(call, metadata, handler.deserialize);
669   stream.on('error', function(error) {
670     handleError(call, error);
671   });
672   stream.waitForCancel();
673   handler.func(stream, function(err, value, trailer, flags) {
674     stream.terminate();
675     if (err) {
676       if (trailer) {
677         err.metadata = trailer;
678       }
679       handleError(call, err);
680     } else {
681       sendUnaryResponse(call, value, handler.serialize, trailer, flags);
682     }
683   });
684 }
685
686 /**
687  * User provided method to handle bidirectional streaming calls on the server.
688  * @callback grpc.Server~handleBidiStreamingCall
689  * @param {grpc~ServerDuplexStream} call The call object
690  */
691
692 /**
693  * Fully handle a bidirectional streaming call
694  * @private
695  * @param {grpc.internal~Call} call The call to handle
696  * @param {Object} handler Request handler object for the method that was called
697  * @param {grpc~Server.handleBidiStreamingCall} handler.func The handler
698  *     function
699  * @param {grpc~deserialize} handler.deserialize The deserialization function
700  *     for request data
701  * @param {grpc~serialize} handler.serialize The serialization function for
702  *     response data
703  * @param {Metadata} metadata Metadata from the client
704  */
705 function handleBidiStreaming(call, handler, metadata) {
706   var stream = new ServerDuplexStream(call, metadata, handler.serialize,
707                                       handler.deserialize);
708   stream.waitForCancel();
709   handler.func(stream);
710 }
711
712 var streamHandlers = {
713   unary: handleUnary,
714   server_stream: handleServerStreaming,
715   client_stream: handleClientStreaming,
716   bidi: handleBidiStreaming
717 };
718
719 /**
720  * Constructs a server object that stores request handlers and delegates
721  * incoming requests to those handlers
722  * @memberof grpc
723  * @constructor
724  * @param {Object=} options Options that should be passed to the internal server
725  *     implementation. The available options are listed in
726  *     [this document]{@link https://grpc.github.io/grpc/core/group__grpc__arg__keys.html}.
727  * @example
728  * var server = new grpc.Server();
729  * server.addProtoService(protobuf_service_descriptor, service_implementation);
730  * server.bind('address:port', server_credential);
731  * server.start();
732  */
733 function Server(options) {
734   this.handlers = {};
735   var server = new grpc.Server(options);
736   this._server = server;
737   this.started = false;
738 }
739
740 /**
741  * Start the server and begin handling requests
742  */
743 Server.prototype.start = function() {
744   if (this.started) {
745     throw new Error('Server is already running');
746   }
747   var self = this;
748   this.started = true;
749   this._server.start();
750   /**
751    * Handles the SERVER_RPC_NEW event. If there is a handler associated with
752    * the requested method, use that handler to respond to the request. Then
753    * wait for the next request
754    * @param {grpc.internal~Event} event The event to handle with tag
755    *     SERVER_RPC_NEW
756    */
757   function handleNewCall(err, event) {
758     if (err) {
759       return;
760     }
761     var details = event.new_call;
762     var call = details.call;
763     var method = details.method;
764     var metadata = Metadata._fromCoreRepresentation(details.metadata);
765     if (method === null) {
766       return;
767     }
768     self._server.requestCall(handleNewCall);
769     var handler;
770     if (self.handlers.hasOwnProperty(method)) {
771       handler = self.handlers[method];
772     } else {
773       var batch = {};
774       batch[grpc.opType.SEND_INITIAL_METADATA] =
775           (new Metadata())._getCoreRepresentation();
776       batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
777         code: constants.status.UNIMPLEMENTED,
778         details: 'RPC method not implemented ' + method,
779         metadata: (new Metadata())._getCoreRepresentation()
780       };
781       batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
782       call.startBatch(batch, function() {});
783       return;
784     }
785     streamHandlers[handler.type](call, handler, metadata);
786   }
787   this._server.requestCall(handleNewCall);
788 };
789
790 /**
791  * Unified type for application handlers for all types of calls
792  * @typedef {(grpc.Server~handleUnaryCall
793  *            |grpc.Server~handleClientStreamingCall
794  *            |grpc.Server~handleServerStreamingCall
795  *            |grpc.Server~handleBidiStreamingCall)} grpc.Server~handleCall
796  */
797
798 /**
799  * Registers a handler to handle the named method. Fails if there already is
800  * a handler for the given method. Returns true on success
801  * @param {string} name The name of the method that the provided function should
802  *     handle/respond to.
803  * @param {grpc.Server~handleCall} handler Function that takes a stream of
804  *     request values and returns a stream of response values
805  * @param {grpc~serialize} serialize Serialization function for responses
806  * @param {grpc~deserialize} deserialize Deserialization function for requests
807  * @param {('unary'|'client_stream'|'server_stream'|'bidi')} type The streaming type of method that this handles
808  * @return {boolean} True if the handler was set. False if a handler was already
809  *     set for that name.
810  */
811 Server.prototype.register = function(name, handler, serialize, deserialize,
812                                      type) {
813   if (this.handlers.hasOwnProperty(name)) {
814     return false;
815   }
816   this.handlers[name] = {
817     func: handler,
818     serialize: serialize,
819     deserialize: deserialize,
820     type: type
821   };
822   return true;
823 };
824
825 /**
826  * Gracefully shuts down the server. The server will stop receiving new calls,
827  * and any pending calls will complete. The callback will be called when all
828  * pending calls have completed and the server is fully shut down. This method
829  * is idempotent with itself and forceShutdown.
830  * @param {function()} callback The shutdown complete callback
831  */
832 Server.prototype.tryShutdown = function(callback) {
833   this._server.tryShutdown(callback);
834 };
835
836 /**
837  * Forcibly shuts down the server. The server will stop receiving new calls
838  * and cancel all pending calls. When it returns, the server has shut down.
839  * This method is idempotent with itself and tryShutdown, and it will trigger
840  * any outstanding tryShutdown callbacks.
841  */
842 Server.prototype.forceShutdown = function() {
843   this._server.forceShutdown();
844 };
845
846 var unimplementedStatusResponse = {
847   code: constants.status.UNIMPLEMENTED,
848   details: 'The server does not implement this method'
849 };
850
851 var defaultHandler = {
852   unary: function(call, callback) {
853     callback(unimplementedStatusResponse);
854   },
855   client_stream: function(call, callback) {
856     callback(unimplementedStatusResponse);
857   },
858   server_stream: function(call) {
859     call.emit('error', unimplementedStatusResponse);
860   },
861   bidi: function(call) {
862     call.emit('error', unimplementedStatusResponse);
863   }
864 };
865
866 function isObject(thing) {
867   return (typeof thing === 'object' || typeof thing === 'function') && thing !== null;
868 }
869
870 /**
871  * Add a service to the server, with a corresponding implementation.
872  * @param {grpc~ServiceDefinition} service The service descriptor
873  * @param {Object<String, grpc.Server~handleCall>} implementation Map of method
874  *     names to method implementation for the provided service.
875  */
876 Server.prototype.addService = function(service, implementation) {
877   if (!isObject(service) || !isObject(implementation)) {
878     throw new Error('addService requires two objects as arguments');
879   }
880   if (Object.keys(service).length === 0) {
881     throw new Error('Cannot add an empty service to a server');
882   }
883   if (this.started) {
884     throw new Error('Can\'t add a service to a started server.');
885   }
886   var self = this;
887   Object.keys(service).forEach(name => {
888     const attrs = service[name];
889     var method_type;
890     if (attrs.requestStream) {
891       if (attrs.responseStream) {
892         method_type = 'bidi';
893       } else {
894         method_type = 'client_stream';
895       }
896     } else {
897       if (attrs.responseStream) {
898         method_type = 'server_stream';
899       } else {
900         method_type = 'unary';
901       }
902     }
903     var impl;
904     if (implementation[name] === undefined) {
905       /* Handle the case where the method is passed with the name exactly as
906          written in the proto file, instead of using JavaScript function
907          naming style */
908       if (implementation[attrs.originalName] === undefined) {
909         common.log(constants.logVerbosity.ERROR, 'Method handler ' + name +
910             ' for ' + attrs.path + ' expected but not provided');
911         impl = defaultHandler[method_type];
912       } else {
913         impl = implementation[attrs.originalName].bind(implementation);
914       }
915     } else {
916       impl = implementation[name].bind(implementation);
917     }
918     var serialize = attrs.responseSerialize;
919     var deserialize = attrs.requestDeserialize;
920     var register_success = self.register(attrs.path, impl, serialize,
921                                          deserialize, method_type);
922     if (!register_success) {
923       throw new Error('Method handler for ' + attrs.path +
924           ' already provided.');
925     }
926   });
927 };
928
929 /**
930  * Add a proto service to the server, with a corresponding implementation
931  * @deprecated Use {@link grpc.Server#addService} instead
932  * @param {Protobuf.Reflect.Service} service The proto service descriptor
933  * @param {Object<String, grpc.Server~handleCall>} implementation Map of method
934  *     names to method implementation for the provided service.
935  */
936 Server.prototype.addProtoService = util.deprecate(function(service,
937                                                            implementation) {
938   var options;
939   var protobuf_js_5_common = require('./protobuf_js_5_common');
940   var protobuf_js_6_common = require('./protobuf_js_6_common');
941   if (protobuf_js_5_common.isProbablyProtobufJs5(service)) {
942     options = Object.assign({}, common.defaultGrpcOptions, service.grpc_options);
943     this.addService(
944         protobuf_js_5_common.getProtobufServiceAttrs(service, options),
945         implementation);
946   } else if (protobuf_js_6_common.isProbablyProtobufJs6(service)) {
947     options = Object.assign({}, common.defaultGrpcOptions, service.grpc_options);
948     this.addService(
949         protobuf_js_6_common.getProtobufServiceAttrs(service, options),
950         implementation);
951   } else {
952     // We assume that this is a service attributes object
953     this.addService(service, implementation);
954   }
955 }, 'Server#addProtoService: Use Server#addService instead');
956
957 /**
958  * Binds the server to the given port, with SSL disabled if creds is an
959  * insecure credentials object
960  * @param {string} port The port that the server should bind on, in the format
961  *     "address:port"
962  * @param {grpc.ServerCredentials} creds Server credential object to be used for
963  *     SSL. Pass an insecure credentials object for an insecure port.
964  * @return {number} The bound port number. Negative if binding the port failed.
965  */
966 Server.prototype.bind = function(port, creds) {
967   if (this.started) {
968     throw new Error('Can\'t bind an already running server to an address');
969   }
970   return this._server.addHttp2Port(port, creds);
971 };
972
973 /**
974  * Called with the result of attempting to bind a port
975  * @callback grpc.Server~bindCallback
976  * @param {Error=} error If non-null, indicates that binding the port failed.
977  * @param {number} port The bound port number. If binding the port fails, this
978  *     will be negative to match the output of bind.
979  */
980
981 /**
982  * Binds the server to the given port, with SSL disabled if creds is an
983  * insecure credentials object. Provides the result asynchronously.
984  * @param {string} port The port that the server should bind on, in the format
985  *     "address:port"
986  * @param {grpc.ServerCredentials} creds Server credential object to be used for
987  *     SSL. Pass an insecure credentials object for an insecure port.
988  */
989 Server.prototype.bindAsync = function(port, creds, callback) {
990   /* This can throw. We do not try to catch that error because it indicates an
991    * incorrect use of the function, which should not be surfaced asynchronously
992    */
993   const result = this.bind(port, creds)
994   if (result < 0) {
995     setImmediate(callback, new Error('Failed to bind port'), result);
996   } else {
997     setImmediate(callback, null, result);
998   }
999 }
1000
1001 exports.Server = Server;