2323import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .STREAMING_SIGNED_PAYLOAD_TRAILER ;
2424import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .STREAMING_UNSIGNED_PAYLOAD_TRAILER ;
2525import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_CONTENT_SHA256 ;
26+ import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_DECODED_CONTENT_LENGTH ;
2627import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_TRAILER ;
2728import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerUtils .moveContentLength ;
2829
2930import java .nio .ByteBuffer ;
3031import java .nio .charset .StandardCharsets ;
3132import java .util .ArrayList ;
32- import java .util .Collections ;
3333import java .util .List ;
34+ import java .util .Optional ;
35+ import java .util .concurrent .CompletableFuture ;
3436import org .reactivestreams .Publisher ;
3537import software .amazon .awssdk .annotations .SdkInternalApi ;
3638import software .amazon .awssdk .checksums .SdkChecksum ;
3739import software .amazon .awssdk .checksums .spi .ChecksumAlgorithm ;
3840import software .amazon .awssdk .http .ContentStreamProvider ;
3941import software .amazon .awssdk .http .Header ;
4042import software .amazon .awssdk .http .SdkHttpRequest ;
43+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .AsyncChunkEncodedPayload ;
4144import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChecksumTrailerProvider ;
4245import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedInputStream ;
46+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedPayload ;
47+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedPublisher ;
4348import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SigV4ChunkExtensionProvider ;
4449import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SigV4TrailerProvider ;
50+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SyncChunkEncodedPayload ;
4551import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .TrailerProvider ;
46- import software .amazon .awssdk .http .auth .aws .internal .signer .io .ChecksumInputStream ;
4752import software .amazon .awssdk .http .auth .aws .internal .signer .io .ResettableContentStreamProvider ;
4853import software .amazon .awssdk .utils .BinaryUtils ;
4954import software .amazon .awssdk .utils .Pair ;
@@ -73,51 +78,67 @@ public static Builder builder() {
7378
7479 @ Override
7580 public ContentStreamProvider sign (ContentStreamProvider payload , V4RequestSigningResult requestSigningResult ) {
76- SdkHttpRequest .Builder request = requestSigningResult .getSignedRequest ();
77-
78- String checksum = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
79- () -> new IllegalArgumentException (X_AMZ_CONTENT_SHA256 + " must be set!" )
80- );
81-
8281 ChunkedEncodedInputStream .Builder chunkedEncodedInputStreamBuilder = ChunkedEncodedInputStream
8382 .builder ()
8483 .inputStream (payload .newStream ())
8584 .chunkSize (chunkSize )
8685 .header (chunk -> Integer .toHexString (chunk .remaining ()).getBytes (StandardCharsets .UTF_8 ));
8786
88- preExistingTrailers .forEach (trailer -> chunkedEncodedInputStreamBuilder .addTrailer (() -> trailer ));
87+ SyncChunkEncodedPayload chunkedPayload = new SyncChunkEncodedPayload (chunkedEncodedInputStreamBuilder );
88+ signCommon (chunkedPayload , requestSigningResult );
89+
90+ return new ResettableContentStreamProvider (chunkedEncodedInputStreamBuilder ::build );
91+ }
92+
93+ @ Override
94+ public Publisher <ByteBuffer > signAsync (Publisher <ByteBuffer > payload , V4RequestSigningResult requestSigningResult ) {
95+ ChunkedEncodedPublisher .Builder chunkedStreamBuilder = ChunkedEncodedPublisher .builder ()
96+ .publisher (payload )
97+ .chunkSize (chunkSize )
98+ .addEmptyTrailingChunk (true );
99+
100+ AsyncChunkEncodedPayload checksumPayload = new AsyncChunkEncodedPayload (chunkedStreamBuilder );
101+ signCommon (checksumPayload , requestSigningResult );
102+
103+ return chunkedStreamBuilder .build ();
104+ }
105+
106+ private void signCommon (ChunkedEncodedPayload payload , V4RequestSigningResult requestSigningResult ) {
107+ preExistingTrailers .forEach (t -> payload .addTrailer (() -> t ));
108+
109+ SdkHttpRequest .Builder request = requestSigningResult .getSignedRequest ();
110+
111+ payload .decodedContentLength (request .firstMatchingHeader (X_AMZ_DECODED_CONTENT_LENGTH )
112+ .map (Long ::parseLong )
113+ .orElse (0L ));
114+
115+ String checksum = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
116+ () -> new IllegalArgumentException (X_AMZ_CONTENT_SHA256 + " must be set!" )
117+ );
89118
90119 switch (checksum ) {
91120 case STREAMING_SIGNED_PAYLOAD : {
92121 RollingSigner rollingSigner = new RollingSigner (requestSigningResult .getSigningKey (),
93122 requestSigningResult .getSignature ());
94- chunkedEncodedInputStreamBuilder .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
123+ payload .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
95124 break ;
96125 }
97126 case STREAMING_UNSIGNED_PAYLOAD_TRAILER :
98- setupChecksumTrailerIfNeeded (chunkedEncodedInputStreamBuilder );
127+ setupChecksumTrailerIfNeeded (payload );
99128 break ;
100129 case STREAMING_SIGNED_PAYLOAD_TRAILER : {
130+ setupChecksumTrailerIfNeeded (payload );
101131 RollingSigner rollingSigner = new RollingSigner (requestSigningResult .getSigningKey (),
102132 requestSigningResult .getSignature ());
103- chunkedEncodedInputStreamBuilder .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
104- setupChecksumTrailerIfNeeded (chunkedEncodedInputStreamBuilder );
105- chunkedEncodedInputStreamBuilder .addTrailer (
106- new SigV4TrailerProvider (chunkedEncodedInputStreamBuilder .trailers (), rollingSigner , credentialScope )
133+ payload .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
134+ payload .addTrailer (
135+ new SigV4TrailerProvider (payload .trailers (), rollingSigner , credentialScope )
107136 );
108137 break ;
109138 }
110139 default :
111140 throw new UnsupportedOperationException ();
112141 }
113-
114- return new ResettableContentStreamProvider (chunkedEncodedInputStreamBuilder ::build );
115- }
116-
117- @ Override
118- public Publisher <ByteBuffer > signAsync (Publisher <ByteBuffer > payload , V4RequestSigningResult requestSigningResult ) {
119- // TODO(sra-identity-and-auth): implement this first and remove addFlexibleChecksumInTrailer logic in HttpChecksumStage
120- throw new UnsupportedOperationException ();
121142 }
122143
123144 @ Override
@@ -127,27 +148,66 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider
127148 setupPreExistingTrailers (request );
128149
129150 // pre-existing trailers
151+ encodedContentLength = calculateEncodedContentLength (request , contentLength );
152+
153+ if (checksumAlgorithm != null ) {
154+ String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
155+ request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
156+ }
157+ request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
158+ request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
159+ }
160+
161+ @ Override
162+ public CompletableFuture <Pair <SdkHttpRequest .Builder , Optional <Publisher <ByteBuffer >>>> beforeSigningAsync (
163+ SdkHttpRequest .Builder request , Publisher <ByteBuffer > payload ) {
164+ return moveContentLength (request , payload )
165+ .thenApply (p -> {
166+ SdkHttpRequest .Builder requestBuilder = p .left ();
167+ setupPreExistingTrailers (requestBuilder );
168+
169+ long decodedContentLength = requestBuilder .firstMatchingHeader (X_AMZ_DECODED_CONTENT_LENGTH )
170+ .map (Long ::parseLong )
171+ // should not happen, this header is added by moveContentLength
172+ .orElseThrow (() -> new RuntimeException (X_AMZ_DECODED_CONTENT_LENGTH
173+ + " header not present" ));
174+
175+ long encodedContentLength = calculateEncodedContentLength (request , decodedContentLength );
176+
177+ if (checksumAlgorithm != null ) {
178+ String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
179+ request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
180+ }
181+ request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
182+ request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
183+ return Pair .of (requestBuilder , p .right ());
184+ });
185+ }
186+
187+ private long calculateEncodedContentLength (SdkHttpRequest .Builder requestBuilder , long decodedContentLength ) {
188+ long encodedContentLength = 0 ;
189+
130190 encodedContentLength += calculateExistingTrailersLength ();
131191
132- String checksum = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
192+ String checksum = requestBuilder .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
133193 () -> new IllegalArgumentException (X_AMZ_CONTENT_SHA256 + " must be set!" )
134194 );
135195
136196 switch (checksum ) {
137197 case STREAMING_SIGNED_PAYLOAD : {
138198 long extensionsLength = 81 ; // ;chunk-signature:<sigv4 hex signature, 64 bytes>
139- encodedContentLength += calculateChunksLength (contentLength , extensionsLength );
199+ encodedContentLength += calculateChunksLength (decodedContentLength , extensionsLength );
140200 break ;
141201 }
142202 case STREAMING_UNSIGNED_PAYLOAD_TRAILER :
143203 if (checksumAlgorithm != null ) {
144204 encodedContentLength += calculateChecksumTrailerLength (checksumHeaderName (checksumAlgorithm ));
145205 }
146- encodedContentLength += calculateChunksLength (contentLength , 0 );
206+ encodedContentLength += calculateChunksLength (decodedContentLength , 0 );
147207 break ;
148208 case STREAMING_SIGNED_PAYLOAD_TRAILER : {
149209 long extensionsLength = 81 ; // ;chunk-signature:<sigv4 hex signature, 64 bytes>
150- encodedContentLength += calculateChunksLength (contentLength , extensionsLength );
210+ encodedContentLength += calculateChunksLength (decodedContentLength , extensionsLength );
151211 if (checksumAlgorithm != null ) {
152212 encodedContentLength += calculateChecksumTrailerLength (checksumHeaderName (checksumAlgorithm ));
153213 }
@@ -161,12 +221,7 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider
161221 // terminating \r\n
162222 encodedContentLength += 2 ;
163223
164- if (checksumAlgorithm != null ) {
165- String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
166- request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
167- }
168- request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
169- request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
224+ return encodedContentLength ;
170225 }
171226
172227 /**
@@ -250,25 +305,17 @@ private long calculateChecksumTrailerLength(String checksumHeaderName) {
250305 return lengthInBytes + 2 ;
251306 }
252307
253- /**
254- * Add the checksum as a trailer to the chunk-encoded stream.
255- * <p>
256- * If the checksum-algorithm is not present, then nothing is done.
257- */
258- private void setupChecksumTrailerIfNeeded (ChunkedEncodedInputStream .Builder builder ) {
308+ private void setupChecksumTrailerIfNeeded (ChunkedEncodedPayload payload ) {
259309 if (checksumAlgorithm == null ) {
260310 return ;
261311 }
262312 String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
263313 SdkChecksum sdkChecksum = fromChecksumAlgorithm (checksumAlgorithm );
264- ChecksumInputStream checksumInputStream = new ChecksumInputStream (
265- builder .inputStream (),
266- Collections .singleton (sdkChecksum )
267- );
268314
269315 TrailerProvider checksumTrailer = new ChecksumTrailerProvider (sdkChecksum , checksumHeaderName );
270316
271- builder .inputStream (checksumInputStream ).addTrailer (checksumTrailer );
317+ payload .checksumPayload (sdkChecksum );
318+ payload .addTrailer (checksumTrailer );
272319 }
273320
274321 static class Builder {
0 commit comments