diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index b157dc37c..84b742fcb 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.14.2", + "version": "1.14.3", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/retrying-call.ts b/packages/grpc-js/src/retrying-call.ts index 1d49ad337..61ff58fa1 100644 --- a/packages/grpc-js/src/retrying-call.ts +++ b/packages/grpc-js/src/retrying-call.ts @@ -760,11 +760,10 @@ export class RetryingCall implements Call, DeadlineInfoProvider { this.maybeStartHedgingTimer(); } - private handleChildWriteCompleted(childIndex: number) { - const childCall = this.underlyingCalls[childIndex]; - const messageIndex = childCall.nextMessageToSend; + private handleChildWriteCompleted(childIndex: number, messageIndex: number) { this.getBufferEntry(messageIndex).callback?.(); this.clearSentMessages(); + const childCall = this.underlyingCalls[childIndex]; childCall.nextMessageToSend += 1; this.sendNextChildMessage(childIndex); } @@ -774,19 +773,33 @@ export class RetryingCall implements Call, DeadlineInfoProvider { if (childCall.state === 'COMPLETED') { return; } - if (this.getBufferEntry(childCall.nextMessageToSend)) { - const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend); + const messageIndex = childCall.nextMessageToSend; + if (this.getBufferEntry(messageIndex)) { + const bufferEntry = this.getBufferEntry(messageIndex); switch (bufferEntry.entryType) { case 'MESSAGE': childCall.call.sendMessageWithContext( { callback: error => { // Ignore error - this.handleChildWriteCompleted(childIndex); + this.handleChildWriteCompleted(childIndex, messageIndex); }, }, bufferEntry.message!.message ); + // Optimization: if the next entry is HALF_CLOSE, send it immediately + // without waiting for the message callback. This is safe because the message + // has already been passed to the underlying transport. + const nextEntry = this.getBufferEntry(messageIndex + 1); + if (nextEntry.entryType === 'HALF_CLOSE') { + this.trace( + 'Sending halfClose immediately after message to child [' + + childCall.call.getCallNumber() + + '] - optimizing for unary/final message' + ); + childCall.nextMessageToSend += 1; + childCall.call.halfClose(); + } break; case 'HALF_CLOSE': childCall.nextMessageToSend += 1; @@ -813,7 +826,11 @@ export class RetryingCall implements Call, DeadlineInfoProvider { }; this.writeBuffer.push(bufferEntry); if (bufferEntry.allocated) { - context.callback?.(); + // Run this in next tick to avoid suspending the current execution context + // otherwise it might cause half closing the call before sending message + process.nextTick(() => { + context.callback?.(); + }); for (const [callIndex, call] of this.underlyingCalls.entries()) { if ( call.state === 'ACTIVE' && @@ -823,7 +840,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider { { callback: error => { // Ignore error - this.handleChildWriteCompleted(callIndex); + this.handleChildWriteCompleted(callIndex, messageIndex); }, }, message @@ -843,7 +860,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider { { callback: error => { // Ignore error - this.handleChildWriteCompleted(this.committedCallIndex!); + this.handleChildWriteCompleted(this.committedCallIndex!, messageIndex); }, }, message @@ -868,12 +885,21 @@ export class RetryingCall implements Call, DeadlineInfoProvider { allocated: false, }); for (const call of this.underlyingCalls) { - if ( - call?.state === 'ACTIVE' && - call.nextMessageToSend === halfCloseIndex - ) { - call.nextMessageToSend += 1; - call.call.halfClose(); + if (call?.state === 'ACTIVE') { + // Send halfClose to call when either: + // - nextMessageToSend === halfCloseIndex - 1: last message sent, callback pending (optimization) + // - nextMessageToSend === halfCloseIndex: all messages sent and acknowledged + if (call.nextMessageToSend === halfCloseIndex + || call.nextMessageToSend === halfCloseIndex - 1) { + this.trace( + 'Sending halfClose immediately to child [' + + call.call.getCallNumber() + + '] - all messages already sent' + ); + call.nextMessageToSend += 1; + call.call.halfClose(); + } + // Otherwise, halfClose will be sent by sendNextChildMessage when message callbacks complete } } } @@ -895,4 +921,4 @@ export class RetryingCall implements Call, DeadlineInfoProvider { return null; } } -} +} \ No newline at end of file diff --git a/packages/grpc-js/test/test-end-to-end.ts b/packages/grpc-js/test/test-end-to-end.ts index c7de2d6a6..676b4cff0 100644 --- a/packages/grpc-js/test/test-end-to-end.ts +++ b/packages/grpc-js/test/test-end-to-end.ts @@ -18,7 +18,7 @@ import * as assert from 'assert'; import * as path from 'path'; import { loadProtoFile } from './common'; -import { Metadata, Server, ServerDuplexStream, ServerUnaryCall, ServiceClientConstructor, ServiceError, experimental, sendUnaryData } from '../src'; +import { Metadata, Server, ServerCredentials, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServiceClientConstructor, ServiceError, credentials, experimental, sendUnaryData } from '../src'; import { ServiceClient } from '../src/make-client'; const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); @@ -36,6 +36,15 @@ const echoServiceImplementation = { call.end(); }); }, + echoClientStream(call: ServerReadableStream, callback: sendUnaryData) { + const messages: any[] = []; + call.on('data', (message: any) => { + messages.push(message); + }); + call.on('end', () => { + callback(null, { value: messages.map(m => m.value).join(','), value2: messages.length }); + }); + }, }; describe('Client should successfully communicate with server', () => { @@ -77,4 +86,20 @@ describe('Client should successfully communicate with server', () => { }); }); }).timeout(5000); + + it('Client streaming with one message should work', done => { + server = new Server(); + server.addService(EchoService.service, echoServiceImplementation); + server.bindAsync('localhost:0', ServerCredentials.createInsecure(), (error, port) => { + assert.ifError(error); + client = new EchoService(`localhost:${port}`, credentials.createInsecure()); + const call = client.echoClientStream((error: ServiceError, response: any) => { + assert.ifError(error); + assert.deepStrictEqual(response, { value: 'test value', value2: 1 }); + done(); + }); + call.write({ value: 'test value', value2: 42 }); + call.end(); + }); + }); });