Skip to content

Commit 923d833

Browse files
committed
Add multipartupload lifecycle tracking (#4061)
1 parent e4d9733 commit 923d833

File tree

4 files changed

+343
-10
lines changed

4 files changed

+343
-10
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"services": [
3+
{
4+
"serviceName": "S3",
5+
"type": "minor",
6+
"changeLogMessages": [
7+
"Added UploadInitiatedEvent, UploadCompletedEvent, and UploadFailedEvent for multipart uploads."
8+
]
9+
}
10+
]
11+
}

sdk/src/Services/S3/Custom/Transfer/Internal/MultipartUploadCommand.cs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,10 +375,46 @@ private void UploadPartProgressEventCallback(object sender, UploadProgressArgs e
375375
long transferredBytes = Interlocked.Add(ref _totalTransferredBytes, e.IncrementTransferred - e.CompensationForRetry);
376376

377377
var progressArgs = new UploadProgressArgs(e.IncrementTransferred, transferredBytes, this._contentLength,
378-
e.CompensationForRetry, this._fileTransporterRequest.FilePath);
378+
e.CompensationForRetry, this._fileTransporterRequest.FilePath, this._fileTransporterRequest);
379379
this._fileTransporterRequest.OnRaiseProgressEvent(progressArgs);
380380
}
381381

382+
private void FireTransferInitiatedEvent()
383+
{
384+
var initiatedArgs = new UploadInitiatedEventArgs(
385+
request: _fileTransporterRequest,
386+
totalBytes: _contentLength,
387+
filePath: _fileTransporterRequest.FilePath
388+
);
389+
390+
_fileTransporterRequest.OnRaiseTransferInitiatedEvent(initiatedArgs);
391+
}
392+
393+
private void FireTransferCompletedEvent(TransferUtilityUploadResponse response)
394+
{
395+
var completedArgs = new UploadCompletedEventArgs(
396+
request: _fileTransporterRequest,
397+
filePath: _fileTransporterRequest.FilePath,
398+
response: response,
399+
transferredBytes: Interlocked.Read(ref _totalTransferredBytes),
400+
totalBytes: _contentLength
401+
);
402+
403+
_fileTransporterRequest.OnRaiseTransferCompletedEvent(completedArgs);
404+
}
405+
406+
private void FireTransferFailedEvent()
407+
{
408+
var failedArgs = new UploadFailedEventArgs(
409+
request: _fileTransporterRequest,
410+
filePath: _fileTransporterRequest.FilePath,
411+
transferredBytes: Interlocked.Read(ref _totalTransferredBytes),
412+
totalBytes: _contentLength
413+
);
414+
415+
_fileTransporterRequest.OnRaiseTransferFailedEvent(failedArgs);
416+
}
417+
382418
/// <summary>
383419
/// <para>
384420
/// If a checksum algorithm was not specified, we MUST add the default value used by the SDK (as the individual part

sdk/src/Services/S3/Custom/Transfer/Internal/_async/MultipartUploadCommand.async.cs

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,33 @@ internal partial class MultipartUploadCommand : BaseCommand
3333

3434
public override async Task ExecuteAsync(CancellationToken cancellationToken)
3535
{
36+
// Fire transfer initiated event FIRST, before choosing path
37+
FireTransferInitiatedEvent();
38+
3639
if ( (this._fileTransporterRequest.InputStream != null && !this._fileTransporterRequest.InputStream.CanSeek) || this._fileTransporterRequest.ContentLength == -1)
3740
{
3841
await UploadUnseekableStreamAsync(this._fileTransporterRequest, cancellationToken).ConfigureAwait(false);
3942
}
4043
else
4144
{
42-
var initRequest = ConstructInitiateMultipartUploadRequest();
43-
var initResponse = await _s3Client.InitiateMultipartUploadAsync(initRequest, cancellationToken)
45+
InitiateMultipartUploadResponse initResponse = null;
46+
try
47+
{
48+
var initRequest = ConstructInitiateMultipartUploadRequest();
49+
initResponse = await _s3Client.InitiateMultipartUploadAsync(initRequest, cancellationToken)
4450
.ConfigureAwait(continueOnCapturedContext: false);
45-
Logger.DebugFormat("Initiated upload: {0}", initResponse.UploadId);
51+
Logger.DebugFormat("Initiated upload: {0}", initResponse.UploadId);
52+
}
53+
catch (Exception)
54+
{
55+
FireTransferFailedEvent();
56+
throw;
57+
}
4658

4759
var pendingUploadPartTasks = new List<Task<UploadPartResponse>>();
48-
4960
SemaphoreSlim localThrottler = null;
5061
CancellationTokenSource internalCts = null;
62+
5163
try
5264
{
5365
Logger.DebugFormat("Queue up the UploadPartRequests to be executed");
@@ -101,14 +113,19 @@ await localThrottler.WaitAsync(cancellationToken)
101113

102114
Logger.DebugFormat("Beginning completing multipart. ({0})", initResponse.UploadId);
103115
var compRequest = ConstructCompleteMultipartUploadRequest(initResponse);
104-
await this._s3Client.CompleteMultipartUploadAsync(compRequest, cancellationToken)
116+
var completeResponse = await this._s3Client.CompleteMultipartUploadAsync(compRequest, cancellationToken)
105117
.ConfigureAwait(continueOnCapturedContext: false);
106118
Logger.DebugFormat("Done completing multipart. ({0})", initResponse.UploadId);
107119

120+
var mappedResponse = ResponseMapper.MapCompleteMultipartUploadResponse(completeResponse);
121+
FireTransferCompletedEvent(mappedResponse);
108122
}
109123
catch (Exception e)
110124
{
111-
Logger.Error(e, "Exception while uploading. ({0})", initResponse.UploadId);
125+
Logger.Error(e, "Exception while uploading. ({0})", initResponse?.UploadId ?? "unknown");
126+
127+
FireTransferFailedEvent();
128+
112129
// Can't do async invocation in the catch block, doing cleanup synchronously.
113130
Cleanup(initResponse.UploadId, pendingUploadPartTasks);
114131
throw;
@@ -207,8 +224,19 @@ private void AbortMultipartUpload(string uploadId)
207224
}
208225
};
209226

210-
var initiateRequest = ConstructInitiateMultipartUploadRequest(requestEventHandler);
211-
var initiateResponse = await _s3Client.InitiateMultipartUploadAsync(initiateRequest, cancellationToken).ConfigureAwait(false);
227+
InitiateMultipartUploadResponse initiateResponse = null;
228+
229+
try
230+
{
231+
var initiateRequest = ConstructInitiateMultipartUploadRequest(requestEventHandler);
232+
initiateResponse = await _s3Client.InitiateMultipartUploadAsync(initiateRequest, cancellationToken).ConfigureAwait(false);
233+
}
234+
catch (Exception ex)
235+
{
236+
FireTransferFailedEvent();
237+
Logger.Error(ex, "Failed to initiate multipart upload for unseekable stream");
238+
throw;
239+
}
212240

213241
try
214242
{
@@ -276,12 +304,17 @@ private void AbortMultipartUpload(string uploadId)
276304

277305
this._uploadResponses = uploadPartResponses;
278306
CompleteMultipartUploadRequest compRequest = ConstructCompleteMultipartUploadRequest(initiateResponse, true, requestEventHandler);
279-
await _s3Client.CompleteMultipartUploadAsync(compRequest, cancellationToken).ConfigureAwait(false);
307+
var completeResponse = await _s3Client.CompleteMultipartUploadAsync(compRequest, cancellationToken).ConfigureAwait(false);
280308
Logger.DebugFormat("Completed multi part upload. (Part count: {0}, Upload Id: {1})", uploadPartResponses.Count, initiateResponse.UploadId);
309+
310+
var mappedResponse = ResponseMapper.MapCompleteMultipartUploadResponse(completeResponse);
311+
FireTransferCompletedEvent(mappedResponse);
281312
}
282313
}
283314
catch (Exception ex)
284315
{
316+
FireTransferFailedEvent();
317+
285318
await _s3Client.AbortMultipartUploadAsync(new AbortMultipartUploadRequest()
286319
{
287320
BucketName = request.BucketName,

0 commit comments

Comments
 (0)