Skip to content

Commit 7c7699c

Browse files
committed
multipart upload event lifecycles
add dev config fix unseekable stream failure initial request update try catch fix null pointer add comment stack-info: PR: #4061, branch: GarrettBeatty/stacked/3
1 parent ff96820 commit 7c7699c

File tree

4 files changed

+358
-10
lines changed

4 files changed

+358
-10
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"services": [
3+
{
4+
"serviceName": "S3",
5+
"type": "patch",
6+
"changeLogMessages": [
7+
"Added progress tracking events to multipart upload",
8+
"Added CompleteMultipartUploadResponse to TransferUtilityUploadResponse mapping"
9+
]
10+
}
11+
]
12+
}

sdk/src/Services/S3/Custom/Transfer/Internal/MultipartUploadCommand.cs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,10 +372,46 @@ private void UploadPartProgressEventCallback(object sender, UploadProgressArgs e
372372
long transferredBytes = Interlocked.Add(ref _totalTransferredBytes, e.IncrementTransferred - e.CompensationForRetry);
373373

374374
var progressArgs = new UploadProgressArgs(e.IncrementTransferred, transferredBytes, this._contentLength,
375-
e.CompensationForRetry, this._fileTransporterRequest.FilePath);
375+
e.CompensationForRetry, this._fileTransporterRequest.FilePath, this._fileTransporterRequest);
376376
this._fileTransporterRequest.OnRaiseProgressEvent(progressArgs);
377377
}
378378

379+
private void FireTransferInitiatedEvent()
380+
{
381+
var initiatedArgs = new UploadInitiatedEventArgs(
382+
request: _fileTransporterRequest,
383+
filePath: _fileTransporterRequest.FilePath,
384+
totalBytes: _contentLength
385+
);
386+
387+
_fileTransporterRequest.OnRaiseTransferInitiatedEvent(initiatedArgs);
388+
}
389+
390+
private void FireTransferCompletedEvent(TransferUtilityUploadResponse response)
391+
{
392+
var completedArgs = new UploadCompletedEventArgs(
393+
request: _fileTransporterRequest,
394+
response: response,
395+
filePath: _fileTransporterRequest.FilePath,
396+
transferredBytes: Interlocked.Read(ref _totalTransferredBytes),
397+
totalBytes: _contentLength
398+
);
399+
400+
_fileTransporterRequest.OnRaiseTransferCompletedEvent(completedArgs);
401+
}
402+
403+
private void FireTransferFailedEvent()
404+
{
405+
var failedArgs = new UploadFailedEventArgs(
406+
request: _fileTransporterRequest,
407+
filePath: _fileTransporterRequest.FilePath,
408+
transferredBytes: Interlocked.Read(ref _totalTransferredBytes),
409+
totalBytes: _contentLength
410+
);
411+
412+
_fileTransporterRequest.OnRaiseTransferFailedEvent(failedArgs);
413+
}
414+
379415
/// <summary>
380416
/// <para>
381417
/// If a checksum algorithm was not specified, we MUST add the default value used by the SDK (as the individual part

sdk/src/Services/S3/Custom/Transfer/Internal/_async/MultipartUploadCommand.async.cs

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,33 @@ internal partial class MultipartUploadCommand : BaseCommand
3333

3434
public override async Task ExecuteAsync(CancellationToken cancellationToken)
3535
{
36+
// Fire transfer initiated event FIRST, before choosing path
37+
FireTransferInitiatedEvent();
38+
3639
if ( (this._fileTransporterRequest.InputStream != null && !this._fileTransporterRequest.InputStream.CanSeek) || this._fileTransporterRequest.ContentLength == -1)
3740
{
3841
await UploadUnseekableStreamAsync(this._fileTransporterRequest, cancellationToken).ConfigureAwait(false);
3942
}
4043
else
4144
{
42-
var initRequest = ConstructInitiateMultipartUploadRequest();
43-
var initResponse = await _s3Client.InitiateMultipartUploadAsync(initRequest, cancellationToken)
45+
InitiateMultipartUploadResponse initResponse = null;
46+
try
47+
{
48+
var initRequest = ConstructInitiateMultipartUploadRequest();
49+
initResponse = await _s3Client.InitiateMultipartUploadAsync(initRequest, cancellationToken)
4450
.ConfigureAwait(continueOnCapturedContext: false);
45-
Logger.DebugFormat("Initiated upload: {0}", initResponse.UploadId);
51+
Logger.DebugFormat("Initiated upload: {0}", initResponse.UploadId);
52+
}
53+
catch (Exception)
54+
{
55+
FireTransferFailedEvent();
56+
throw;
57+
}
4658

4759
var pendingUploadPartTasks = new List<Task<UploadPartResponse>>();
48-
4960
SemaphoreSlim localThrottler = null;
5061
CancellationTokenSource internalCts = null;
62+
5163
try
5264
{
5365
Logger.DebugFormat("Queue up the UploadPartRequests to be executed");
@@ -101,14 +113,19 @@ await localThrottler.WaitAsync(cancellationToken)
101113

102114
Logger.DebugFormat("Beginning completing multipart. ({0})", initResponse.UploadId);
103115
var compRequest = ConstructCompleteMultipartUploadRequest(initResponse);
104-
await this._s3Client.CompleteMultipartUploadAsync(compRequest, cancellationToken)
116+
var completeResponse = await this._s3Client.CompleteMultipartUploadAsync(compRequest, cancellationToken)
105117
.ConfigureAwait(continueOnCapturedContext: false);
106118
Logger.DebugFormat("Done completing multipart. ({0})", initResponse.UploadId);
107119

120+
var mappedResponse = ResponseMapper.MapCompleteMultipartUploadResponse(completeResponse);
121+
FireTransferCompletedEvent(mappedResponse);
108122
}
109123
catch (Exception e)
110124
{
111-
Logger.Error(e, "Exception while uploading. ({0})", initResponse.UploadId);
125+
Logger.Error(e, "Exception while uploading. ({0})", initResponse?.UploadId ?? "unknown");
126+
127+
FireTransferFailedEvent();
128+
112129
// Can't do async invocation in the catch block, doing cleanup synchronously.
113130
Cleanup(initResponse.UploadId, pendingUploadPartTasks);
114131
throw;
@@ -201,8 +218,19 @@ private void AbortMultipartUpload(string uploadId)
201218
}
202219
};
203220

204-
var initiateRequest = ConstructInitiateMultipartUploadRequest(requestEventHandler);
205-
var initiateResponse = await _s3Client.InitiateMultipartUploadAsync(initiateRequest, cancellationToken).ConfigureAwait(false);
221+
InitiateMultipartUploadResponse initiateResponse = null;
222+
223+
try
224+
{
225+
var initiateRequest = ConstructInitiateMultipartUploadRequest(requestEventHandler);
226+
initiateResponse = await _s3Client.InitiateMultipartUploadAsync(initiateRequest, cancellationToken).ConfigureAwait(false);
227+
}
228+
catch (Exception ex)
229+
{
230+
FireTransferFailedEvent();
231+
Logger.Error(ex, "Failed to initiate multipart upload for unseekable stream");
232+
throw;
233+
}
206234

207235
try
208236
{
@@ -270,12 +298,17 @@ private void AbortMultipartUpload(string uploadId)
270298

271299
this._uploadResponses = uploadPartResponses;
272300
CompleteMultipartUploadRequest compRequest = ConstructCompleteMultipartUploadRequest(initiateResponse, true, requestEventHandler);
273-
await _s3Client.CompleteMultipartUploadAsync(compRequest, cancellationToken).ConfigureAwait(false);
301+
var completeResponse = await _s3Client.CompleteMultipartUploadAsync(compRequest, cancellationToken).ConfigureAwait(false);
274302
Logger.DebugFormat("Completed multi part upload. (Part count: {0}, Upload Id: {1})", uploadPartResponses.Count, initiateResponse.UploadId);
303+
304+
var mappedResponse = ResponseMapper.MapCompleteMultipartUploadResponse(completeResponse);
305+
FireTransferCompletedEvent(mappedResponse);
275306
}
276307
}
277308
catch (Exception ex)
278309
{
310+
FireTransferFailedEvent();
311+
279312
await _s3Client.AbortMultipartUploadAsync(new AbortMultipartUploadRequest()
280313
{
281314
BucketName = request.BucketName,

0 commit comments

Comments
 (0)