2 Object.defineProperty(exports, "__esModule", { value: true });
3 const events_1 = require("events");
4 const stream_1 = require("stream");
5 const constants_1 = require("./constants");
6 class ClientUnaryCallImpl extends events_1.EventEmitter {
10 call.on('metadata', (metadata) => {
11 this.emit('metadata', metadata);
13 call.on('status', (status) => {
14 this.emit('status', status);
18 this.call.cancelWithStatus(constants_1.Status.CANCELLED, 'Cancelled on client');
21 return this.call.getPeer();
24 exports.ClientUnaryCallImpl = ClientUnaryCallImpl;
25 function setUpReadableStream(stream, call, deserialize) {
26 let statusEmitted = false;
27 call.on('data', (data) => {
30 deserialized = deserialize(data);
33 call.cancelWithStatus(constants_1.Status.INTERNAL, 'Failed to parse server response');
36 if (!stream.push(deserialized)) {
40 call.on('end', () => {
45 call.once('status', () => {
50 call.on('status', (status) => {
51 if (status.code !== constants_1.Status.OK) {
52 const error = Object.assign(new Error(status.details), status);
53 stream.emit('error', error);
55 stream.emit('status', status);
60 class ClientReadableStreamImpl extends stream_1.Readable {
61 constructor(call, deserialize) {
62 super({ objectMode: true });
64 this.deserialize = deserialize;
65 call.on('metadata', (metadata) => {
66 this.emit('metadata', metadata);
68 setUpReadableStream(this, call, deserialize);
71 this.call.cancelWithStatus(constants_1.Status.CANCELLED, 'Cancelled on client');
74 return this.call.getPeer();
80 exports.ClientReadableStreamImpl = ClientReadableStreamImpl;
81 function tryWrite(call, serialize, chunk, encoding, cb) {
83 const flags = Number(encoding);
85 message = serialize(chunk);
88 call.cancelWithStatus(constants_1.Status.INTERNAL, 'Serialization failure');
92 const writeObj = { message };
93 if (!Number.isNaN(flags)) {
94 writeObj.flags = flags;
96 call.write(writeObj, cb);
98 class ClientWritableStreamImpl extends stream_1.Writable {
99 constructor(call, serialize) {
100 super({ objectMode: true });
102 this.serialize = serialize;
103 call.on('metadata', (metadata) => {
104 this.emit('metadata', metadata);
106 call.on('status', (status) => {
107 this.emit('status', status);
111 this.call.cancelWithStatus(constants_1.Status.CANCELLED, 'Cancelled on client');
114 return this.call.getPeer();
116 _write(chunk, encoding, cb) {
117 tryWrite(this.call, this.serialize, chunk, encoding, cb);
124 exports.ClientWritableStreamImpl = ClientWritableStreamImpl;
125 class ClientDuplexStreamImpl extends stream_1.Duplex {
126 constructor(call, serialize, deserialize) {
127 super({ objectMode: true });
129 this.serialize = serialize;
130 this.deserialize = deserialize;
131 call.on('metadata', (metadata) => {
132 this.emit('metadata', metadata);
134 setUpReadableStream(this, call, deserialize);
137 this.call.cancelWithStatus(constants_1.Status.CANCELLED, 'Cancelled on client');
140 return this.call.getPeer();
145 _write(chunk, encoding, cb) {
146 tryWrite(this.call, this.serialize, chunk, encoding, cb);
153 exports.ClientDuplexStreamImpl = ClientDuplexStreamImpl;
154 //# sourceMappingURL=call.js.map