@@ -93,12 +93,15 @@ export interface FormattedSubsequentIncrementalExecutionResult<
9393 extensions ?: TExtensions ;
9494}
9595
96+ interface RawDeferResult < TData = ObjMap < unknown > > {
97+ errors ?: ReadonlyArray < GraphQLError > ;
98+ data : TData ;
99+ }
100+
96101export interface IncrementalDeferResult <
97102 TData = ObjMap < unknown > ,
98103 TExtensions = ObjMap < unknown > ,
99- > {
100- errors ?: ReadonlyArray < GraphQLError > ;
101- data : TData ;
104+ > extends RawDeferResult < TData > {
102105 id : string ;
103106 subPath ?: ReadonlyArray < string | number > ;
104107 extensions ?: TExtensions ;
@@ -115,12 +118,15 @@ export interface FormattedIncrementalDeferResult<
115118 extensions ?: TExtensions ;
116119}
117120
118- export interface IncrementalStreamResult <
119- TData = Array < unknown > ,
120- TExtensions = ObjMap < unknown > ,
121- > {
121+ interface RawStreamItemsResult < TData = ReadonlyArray < unknown > > {
122122 errors ?: ReadonlyArray < GraphQLError > ;
123123 items : TData ;
124+ }
125+
126+ export interface IncrementalStreamResult <
127+ TData = ReadonlyArray < unknown > ,
128+ TExtensions = ObjMap < unknown > ,
129+ > extends RawStreamItemsResult < TData > {
124130 id : string ;
125131 subPath ?: ReadonlyArray < string | number > ;
126132 extensions ?: TExtensions ;
@@ -166,23 +172,27 @@ export interface FormattedCompletedResult {
166172}
167173
168174export function buildIncrementalResponse (
175+ context : IncrementalPublisherContext ,
169176 result : ObjMap < unknown > ,
170- errors : ReadonlyArray < GraphQLError > ,
177+ errors : ReadonlyArray < GraphQLError > | undefined ,
171178 futures : ReadonlyArray < Future > ,
172- cancellableStreams : Set < StreamRecord > ,
173179) : ExperimentalIncrementalExecutionResults {
174- const incrementalPublisher = new IncrementalPublisher ( cancellableStreams ) ;
180+ const incrementalPublisher = new IncrementalPublisher ( context ) ;
175181 return incrementalPublisher . buildResponse ( result , errors , futures ) ;
176182}
177183
184+ interface IncrementalPublisherContext {
185+ cancellableStreams ?: Set < StreamRecord > | undefined ;
186+ }
187+
178188/**
179189 * This class is used to publish incremental results to the client, enabling semi-concurrent
180190 * execution while preserving result order.
181191 *
182192 * @internal
183193 */
184194class IncrementalPublisher {
185- private _cancellableStreams : Set < StreamRecord > ;
195+ private _context : IncrementalPublisherContext ;
186196 private _nextId : number ;
187197 private _pending : Set < SubsequentResultRecord > ;
188198 private _completedResultQueue : Array < FutureResult > ;
@@ -193,8 +203,8 @@ class IncrementalPublisher {
193203 private _signalled ! : Promise < unknown > ;
194204 private _resolve ! : ( ) => void ;
195205
196- constructor ( cancellableStreams : Set < StreamRecord > ) {
197- this . _cancellableStreams = cancellableStreams ;
206+ constructor ( context : IncrementalPublisherContext ) {
207+ this . _context = context ;
198208 this . _nextId = 0 ;
199209 this . _pending = new Set ( ) ;
200210 this . _completedResultQueue = [ ] ;
@@ -206,7 +216,7 @@ class IncrementalPublisher {
206216
207217 buildResponse (
208218 data : ObjMap < unknown > ,
209- errors : ReadonlyArray < GraphQLError > ,
219+ errors : ReadonlyArray < GraphQLError > | undefined ,
210220 futures : ReadonlyArray < Future > ,
211221 ) : ExperimentalIncrementalExecutionResults {
212222 this . _addFutures ( futures ) ;
@@ -215,7 +225,7 @@ class IncrementalPublisher {
215225 const pending = this . _pendingSourcesToResults ( ) ;
216226
217227 const initialResult : InitialIncrementalExecutionResult =
218- errors . length === 0
228+ errors === undefined
219229 ? { data, pending, hasNext : true }
220230 : { errors, data, pending, hasNext : true } ;
221231
@@ -425,8 +435,12 @@ class IncrementalPublisher {
425435 } ;
426436
427437 const returnStreamIterators = async ( ) : Promise < void > => {
438+ const cancellableStreams = this . _context . cancellableStreams ;
439+ if ( cancellableStreams === undefined ) {
440+ return ;
441+ }
428442 const promises : Array < Promise < unknown > > = [ ] ;
429- for ( const streamRecord of this . _cancellableStreams ) {
443+ for ( const streamRecord of cancellableStreams ) {
430444 if ( streamRecord . earlyReturn !== undefined ) {
431445 promises . push ( streamRecord . earlyReturn ( ) ) ;
432446 }
@@ -475,27 +489,36 @@ class IncrementalPublisher {
475489 }
476490
477491 private _handleCompletedDeferredGroupedFieldSet (
478- result : DeferredGroupedFieldSetResult ,
492+ deferredGroupedFieldSetResult : DeferredGroupedFieldSetResult ,
479493 ) : void {
480- if ( ! isReconcilableDeferredGroupedFieldSetResult ( result ) ) {
481- for ( const deferredFragmentRecord of result . deferredFragmentRecords ) {
494+ if (
495+ isNonReconcilableDeferredGroupedFieldSetResult (
496+ deferredGroupedFieldSetResult ,
497+ )
498+ ) {
499+ for ( const deferredFragmentRecord of deferredGroupedFieldSetResult . deferredFragmentRecords ) {
482500 const id = deferredFragmentRecord . id ;
483501 if ( id !== undefined ) {
484- this . _completed . push ( { id, errors : result . errors } ) ;
502+ this . _completed . push ( {
503+ id,
504+ errors : deferredGroupedFieldSetResult . errors ,
505+ } ) ;
485506 this . _pending . delete ( deferredFragmentRecord ) ;
486507 }
487508 }
488509 return ;
489510 }
490- for ( const deferredFragmentRecord of result . deferredFragmentRecords ) {
491- deferredFragmentRecord . reconcilableResults . push ( result ) ;
511+ for ( const deferredFragmentRecord of deferredGroupedFieldSetResult . deferredFragmentRecords ) {
512+ deferredFragmentRecord . reconcilableResults . push (
513+ deferredGroupedFieldSetResult ,
514+ ) ;
492515 }
493516
494- if ( result . futures ) {
495- this . _addFutures ( result . futures ) ;
517+ if ( deferredGroupedFieldSetResult . futures ) {
518+ this . _addFutures ( deferredGroupedFieldSetResult . futures ) ;
496519 }
497520
498- for ( const deferredFragmentRecord of result . deferredFragmentRecords ) {
521+ for ( const deferredFragmentRecord of deferredGroupedFieldSetResult . deferredFragmentRecords ) {
499522 const id = deferredFragmentRecord . id ;
500523 // TODO: add test case for this.
501524 // Presumably, this can occur if an error causes a fragment to be completed early,
@@ -522,12 +545,9 @@ class IncrementalPublisher {
522545 fragmentResult ,
523546 ) ;
524547 const incrementalEntry : IncrementalDeferResult = {
525- data : fragmentResult . data ,
548+ ... fragmentResult . result ,
526549 id : bestId ,
527550 } ;
528- if ( result . errors . length > 0 ) {
529- incrementalEntry . errors = fragmentResult . errors ;
530- }
531551 if ( subPath !== undefined ) {
532552 incrementalEntry . subPath = subPath ;
533553 }
@@ -548,44 +568,48 @@ class IncrementalPublisher {
548568 this . _pruneEmpty ( ) ;
549569 }
550570
551- private _handleCompletedStreamItems ( result : StreamItemsResult ) : void {
552- const streamRecord = result . streamRecord ;
571+ private _handleCompletedStreamItems (
572+ streamItemsResult : StreamItemsResult ,
573+ ) : void {
574+ const streamRecord = streamItemsResult . streamRecord ;
553575 const id = streamRecord . id ;
554576 // TODO: Consider adding invariant or non-null assertion, as this should never happen. Since the stream is converted into a linked list
555577 // for ordering purposes, if an entry errors, additional entries will not be processed.
556578 /* c8 ignore next 3 */
557579 if ( id === undefined ) {
558580 return ;
559581 }
560- if ( result . items === undefined ) {
582+ if ( streamItemsResult . result === undefined ) {
561583 this . _completed . push ( { id } ) ;
562584 this . _pending . delete ( streamRecord ) ;
563- this . _cancellableStreams . delete ( streamRecord ) ;
564- } else if ( result . items === null ) {
585+ const cancellableStreams = this . _context . cancellableStreams ;
586+ if ( cancellableStreams !== undefined ) {
587+ cancellableStreams . delete ( streamRecord ) ;
588+ }
589+ } else if ( streamItemsResult . result === null ) {
565590 this . _completed . push ( {
566591 id,
567- errors : result . errors ,
592+ errors : streamItemsResult . errors ,
568593 } ) ;
569594 this . _pending . delete ( streamRecord ) ;
570- this . _cancellableStreams . delete ( streamRecord ) ;
595+ const cancellableStreams = this . _context . cancellableStreams ;
596+ if ( cancellableStreams !== undefined ) {
597+ cancellableStreams . delete ( streamRecord ) ;
598+ }
571599 streamRecord . earlyReturn ?.( ) . catch ( ( ) => {
572600 /* c8 ignore next 1 */
573601 // ignore error
574602 } ) ;
575603 } else {
576604 const incrementalEntry : IncrementalStreamResult = {
577605 id,
578- items : result . items as Array < unknown > , // FIX!
606+ ... streamItemsResult . result ,
579607 } ;
580608
581- if ( result . errors !== undefined && result . errors . length > 0 ) {
582- incrementalEntry . errors = result . errors ;
583- }
584-
585609 this . _incremental . push ( incrementalEntry ) ;
586610
587- if ( result . futures ) {
588- this . _addFutures ( result . futures ) ;
611+ if ( streamItemsResult . futures ) {
612+ this . _addFutures ( streamItemsResult . futures ) ;
589613 this . _pruneEmpty ( ) ;
590614 }
591615 }
@@ -639,35 +663,39 @@ export function isDeferredGroupedFieldSetRecord(
639663export interface IncrementalContext {
640664 deferUsageSet : DeferUsageSet | undefined ;
641665 path : Path | undefined ;
642- errors : Array < GraphQLError > ;
643- errorPaths : Set < Path > ;
644- futures : Array < Future > ;
666+ errors ?: Map < Path | undefined , GraphQLError > | undefined ;
667+ futures ?: Array < Future > | undefined ;
645668}
646669
647- export interface DeferredGroupedFieldSetResult {
648- deferredFragmentRecords : ReadonlyArray < DeferredFragmentRecord > ;
649- path : Array < string | number > ;
650- data : ObjMap < unknown > | null ;
651- futures ?: ReadonlyArray < Future > ;
652- errors : ReadonlyArray < GraphQLError > ;
653- }
670+ export type DeferredGroupedFieldSetResult =
671+ | ReconcilableDeferredGroupedFieldSetResult
672+ | NonReconcilableDeferredGroupedFieldSetResult ;
654673
655674export function isDeferredGroupedFieldSetResult (
656675 subsequentResult : DeferredGroupedFieldSetResult | StreamItemsResult ,
657676) : subsequentResult is DeferredGroupedFieldSetResult {
658677 return 'deferredFragmentRecords' in subsequentResult ;
659678}
660679
661- interface ReconcilableDeferredGroupedFieldSetResult
662- extends DeferredGroupedFieldSetResult {
663- data : ObjMap < unknown > ;
680+ interface ReconcilableDeferredGroupedFieldSetResult {
681+ deferredFragmentRecords : ReadonlyArray < DeferredFragmentRecord > ;
682+ path : Array < string | number > ;
683+ result : RawDeferResult ;
684+ futures ?: ReadonlyArray < Future > | undefined ;
664685 sent ?: true | undefined ;
665686}
666687
667- export function isReconcilableDeferredGroupedFieldSetResult (
688+ interface NonReconcilableDeferredGroupedFieldSetResult {
689+ result : null ;
690+ errors : ReadonlyArray < GraphQLError > ;
691+ deferredFragmentRecords : ReadonlyArray < DeferredFragmentRecord > ;
692+ path : Array < string | number > ;
693+ }
694+
695+ export function isNonReconcilableDeferredGroupedFieldSetResult (
668696 deferredGroupedFieldSetResult : DeferredGroupedFieldSetResult ,
669- ) : deferredGroupedFieldSetResult is ReconcilableDeferredGroupedFieldSetResult {
670- return deferredGroupedFieldSetResult . data ! == null ;
697+ ) : deferredGroupedFieldSetResult is NonReconcilableDeferredGroupedFieldSetResult {
698+ return deferredGroupedFieldSetResult . result = == null ;
671699}
672700
673701/** @internal */
@@ -691,9 +719,6 @@ export class DeferredGroupedFieldSetRecord {
691719 const incrementalContext : IncrementalContext = {
692720 deferUsageSet,
693721 path,
694- errors : [ ] ,
695- errorPaths : new Set ( ) ,
696- futures : [ ] ,
697722 } ;
698723
699724 for ( const deferredFragmentRecord of deferredFragmentRecords ) {
@@ -752,24 +777,36 @@ export class StreamRecord {
752777 }
753778}
754779
755- interface NonTerminatingStreamItemsResult {
780+ interface NonReconcilableStreamItemsResult {
756781 streamRecord : StreamRecord ;
757- items : ReadonlyArray < unknown > | null ;
758- futures ?: ReadonlyArray < Future > ;
782+ result : null ;
759783 errors : ReadonlyArray < GraphQLError > ;
760784}
761785
786+ interface NonTerminatingStreamItemsResult {
787+ streamRecord : StreamRecord ;
788+ result : RawStreamItemsResult ;
789+ futures ?: ReadonlyArray < Future > | undefined ;
790+ }
791+
762792interface TerminatingStreamItemsResult {
763793 streamRecord : StreamRecord ;
764- items ?: never ;
794+ result ?: never ;
765795 futures ?: never ;
766796 errors ?: never ;
767797}
768798
769799export type StreamItemsResult =
800+ | NonReconcilableStreamItemsResult
770801 | NonTerminatingStreamItemsResult
771802 | TerminatingStreamItemsResult ;
772803
804+ export function isNonTerminatingStreamItemsResult (
805+ streamItemsResult : StreamItemsResult ,
806+ ) : streamItemsResult is NonTerminatingStreamItemsResult {
807+ return streamItemsResult . result != null ;
808+ }
809+
773810/** @internal */
774811export class StreamItemsRecord {
775812 streamRecord : StreamRecord ;
@@ -789,9 +826,6 @@ export class StreamItemsRecord {
789826 const incrementalContext : IncrementalContext = {
790827 deferUsageSet : undefined ,
791828 path : itemPath ,
792- errors : [ ] ,
793- errorPaths : new Set ( ) ,
794- futures : [ ] ,
795829 } ;
796830
797831 this . _result = executor ( incrementalContext ) ;
@@ -810,12 +844,13 @@ export class StreamItemsRecord {
810844 private _prependNextStreamItems (
811845 result : StreamItemsResult ,
812846 ) : StreamItemsResult {
813- return this . nextStreamItems === undefined
814- ? result
815- : {
847+ return isNonTerminatingStreamItemsResult ( result ) &&
848+ this . nextStreamItems !== undefined
849+ ? {
816850 ...result ,
817851 futures : [ this . nextStreamItems , ...( result . futures ?? [ ] ) ] ,
818- } ;
852+ }
853+ : result ;
819854 }
820855}
821856
0 commit comments