diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 3016b28d69a..ed180153f30 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -2205,6 +2205,84 @@ static int blob_request_pre_signed_url(struct flb_s3 *context, return ret; } +int s3_parse_presigned_url(struct flb_s3 *ctx, const char *url, + flb_sds_t *out_host, flb_sds_t *out_uri, + int *out_port) +{ + int ret; + char *scheme = NULL; + char *host = NULL; + char *port = NULL; + char *uri = NULL; + int resolved_port = 0; + flb_sds_t host_sds = NULL; + flb_sds_t uri_sds = NULL; + + if (out_host == NULL || out_uri == NULL || url == NULL) { + return -1; + } + + ret = flb_utils_url_split(url, &scheme, &host, &port, &uri); + if (ret == -1 || host == NULL || uri == NULL) { + flb_plg_error(ctx->ins, "Invalid pre signed URL: %s", url ? url : "(null)"); + goto error; + } + + if (port != NULL) { + resolved_port = (int) strtoul(port, NULL, 10); + } + else if (scheme != NULL) { + if (strcasecmp(scheme, "https") == 0) { + resolved_port = 443; + } + else { + resolved_port = 80; + } + } + + host_sds = flb_sds_create(host); + if (host_sds == NULL) { + goto error; + } + + uri_sds = flb_sds_create(uri); + if (uri_sds == NULL) { + flb_sds_destroy(host_sds); + host_sds = NULL; + goto error; + } + + if (out_port != NULL) { + *out_port = resolved_port; + } + + *out_host = host_sds; + *out_uri = uri_sds; + + flb_free(scheme); + flb_free(host); + flb_free(port); + flb_free(uri); + + return 0; + +error: + if (scheme) { + flb_free(scheme); + } + if (host) { + flb_free(host); + } + if (port) { + flb_free(port); + } + if (uri) { + flb_free(uri); + } + + return -1; +} + static int blob_fetch_pre_signed_url(struct flb_s3 *context, flb_sds_t *result_url, char *format, @@ -2467,8 +2545,12 @@ static int put_blob_object(struct flb_s3 *ctx, int ret; int num_headers = 0; char *final_key; - flb_sds_t uri; + flb_sds_t uri = NULL; flb_sds_t tmp; + flb_sds_t presigned_full = NULL; + flb_sds_t presigned_host = NULL; + const char *original_host = NULL; + int presigned_port = 0; char final_body_md5[25]; if (ctx->authorization_endpoint_url == NULL) { @@ -2499,13 +2581,29 @@ static int put_blob_object(struct flb_s3 *ctx, uri = tmp; } else { - uri = NULL; + ret = blob_fetch_put_object_pre_signed_url(ctx, &presigned_full, (char *) tag, ctx->bucket, (char *) path); + + if (ret != 0) { + return -1; + } - ret = blob_fetch_put_object_pre_signed_url(ctx, &uri, (char *) tag, ctx->bucket, (char *) path); + ret = s3_parse_presigned_url(ctx, presigned_full, &presigned_host, &uri, &presigned_port); + flb_sds_destroy(presigned_full); + presigned_full = NULL; if (ret != 0) { return -1; } + + if (presigned_port != 0 && presigned_port != ctx->port) { + flb_plg_error(ctx->ins, "Pre signed URL uses unexpected port %d", presigned_port); + flb_sds_destroy(presigned_host); + flb_sds_destroy(uri); + return -1; + } + + original_host = ctx->s3_client->host; + ctx->s3_client->host = presigned_host; } memset(final_body_md5, 0, sizeof(final_body_md5)); @@ -2514,8 +2612,8 @@ static int put_blob_object(struct flb_s3 *ctx, final_body_md5, sizeof(final_body_md5)); if (ret != 0) { flb_plg_error(ctx->ins, "Failed to create Content-MD5 header"); - flb_sds_destroy(uri); - return -1; + ret = -1; + goto cleanup; } } @@ -2527,8 +2625,7 @@ static int put_blob_object(struct flb_s3 *ctx, ret = create_headers(ctx, final_body_md5, &headers, &num_headers, FLB_FALSE); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to create headers"); - flb_sds_destroy(uri); - return -1; + goto cleanup; } c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT, @@ -2545,10 +2642,9 @@ static int put_blob_object(struct flb_s3 *ctx, */ final_key = uri + strlen(ctx->bucket) + 1; flb_plg_info(ctx->ins, "Successfully uploaded object %s", final_key); - flb_sds_destroy(uri); flb_http_client_destroy(c); - - return 0; + ret = 0; + goto cleanup; } flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size, "PutObject", ctx->ins); @@ -2559,9 +2655,19 @@ static int put_blob_object(struct flb_s3 *ctx, } flb_plg_error(ctx->ins, "PutObject request failed"); - flb_sds_destroy(uri); + ret = -1; - return -1; +cleanup: + if (original_host != NULL) { + ctx->s3_client->host = original_host; + flb_sds_destroy(presigned_host); + } + + if (uri != NULL) { + flb_sds_destroy(uri); + } + + return ret; } static int abort_blob_upload(struct flb_s3 *ctx, diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index fc30ff81ff7..ba13915e445 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -218,6 +218,10 @@ void multipart_upload_destroy(struct multipart_upload *m_upload); struct flb_http_client *mock_s3_call(char *error_env_var, char *api); int s3_plugin_under_test(); +int s3_parse_presigned_url(struct flb_s3 *ctx, const char *url, + flb_sds_t *out_host, flb_sds_t *out_uri, + int *out_port); + int get_md5_base64(char *buf, size_t buf_size, char *md5_str, size_t md5_str_size); int create_headers(struct flb_s3 *ctx, char *body_md5, diff --git a/plugins/out_s3/s3_multipart.c b/plugins/out_s3/s3_multipart.c index 7ad7b2095b1..6aad7660909 100644 --- a/plugins/out_s3/s3_multipart.c +++ b/plugins/out_s3/s3_multipart.c @@ -406,13 +406,17 @@ int complete_multipart_upload(struct flb_s3 *ctx, struct multipart_upload *m_upload, char *pre_signed_url) { - char *body; + char *body = NULL; size_t size; flb_sds_t uri = NULL; flb_sds_t tmp; int ret; struct flb_http_client *c = NULL; struct flb_aws_client *s3_client; + flb_sds_t presigned_host = NULL; + const char *original_host = NULL; + int presigned_port = 0; + int result = -1; if (!m_upload->upload_id) { flb_plg_error(ctx->ins, "Cannot complete multipart upload for key %s: " @@ -420,31 +424,41 @@ int complete_multipart_upload(struct flb_s3 *ctx, return -1; } - uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 11 + - flb_sds_len(m_upload->upload_id)); - if (!uri) { - flb_errno(); - return -1; - } - if (pre_signed_url != NULL) { - tmp = flb_sds_copy(uri, pre_signed_url, strlen(pre_signed_url)); + ret = s3_parse_presigned_url(ctx, pre_signed_url, &presigned_host, &uri, &presigned_port); + if (ret != 0) { + return -1; + } + + if (presigned_port != 0 && presigned_port != ctx->port) { + flb_plg_error(ctx->ins, "Pre signed URL uses unexpected port %d", presigned_port); + flb_sds_destroy(presigned_host); + flb_sds_destroy(uri); + return -1; + } + + original_host = ctx->s3_client->host; + ctx->s3_client->host = presigned_host; } else { + uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 11 + + flb_sds_len(m_upload->upload_id)); + if (!uri) { + flb_errno(); + return -1; + } + tmp = flb_sds_printf(&uri, "/%s%s?uploadId=%s", ctx->bucket, m_upload->s3_key, m_upload->upload_id); + if (!tmp) { + goto cleanup; + } + uri = tmp; } - if (!tmp) { - flb_sds_destroy(uri); - return -1; - } - uri = tmp; - ret = complete_multipart_upload_payload(ctx, m_upload, &body, &size); if (ret < 0) { - flb_sds_destroy(uri); - return -1; + goto cleanup; } s3_client = ctx->s3_client; @@ -456,8 +470,9 @@ int complete_multipart_upload(struct flb_s3 *ctx, uri, body, size, NULL, 0); } - flb_sds_destroy(uri); flb_free(body); + body = NULL; + if (c) { flb_plg_debug(ctx->ins, "CompleteMultipartUpload http status=%d", c->resp.status); @@ -468,7 +483,8 @@ int complete_multipart_upload(struct flb_s3 *ctx, flb_http_client_destroy(c); /* remove this upload from the file system */ remove_upload_from_fs(ctx, m_upload); - return 0; + result = 0; + goto cleanup; } flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size, "CompleteMultipartUpload", ctx->ins); @@ -480,7 +496,23 @@ int complete_multipart_upload(struct flb_s3 *ctx, } flb_plg_error(ctx->ins, "CompleteMultipartUpload request failed"); - return -1; + result = -1; + +cleanup: + if (body != NULL) { + flb_free(body); + } + + if (original_host != NULL) { + ctx->s3_client->host = original_host; + flb_sds_destroy(presigned_host); + } + + if (uri != NULL) { + flb_sds_destroy(uri); + } + + return result; } int abort_multipart_upload(struct flb_s3 *ctx, @@ -491,6 +523,11 @@ int abort_multipart_upload(struct flb_s3 *ctx, flb_sds_t tmp; struct flb_http_client *c = NULL; struct flb_aws_client *s3_client; + flb_sds_t presigned_host = NULL; + const char *original_host = NULL; + int presigned_port = 0; + int result = -1; + int ret; if (!m_upload->upload_id) { flb_plg_error(ctx->ins, "Cannot complete multipart upload for key %s: " @@ -498,27 +535,38 @@ int abort_multipart_upload(struct flb_s3 *ctx, return -1; } - uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 11 + - flb_sds_len(m_upload->upload_id)); - if (!uri) { - flb_errno(); - return -1; - } - if (pre_signed_url != NULL) { - tmp = flb_sds_copy(uri, pre_signed_url, strlen(pre_signed_url)); + ret = s3_parse_presigned_url(ctx, pre_signed_url, &presigned_host, &uri, &presigned_port); + if (ret != 0) { + return -1; + } + + if (presigned_port != 0 && presigned_port != ctx->port) { + flb_plg_error(ctx->ins, "Pre signed URL uses unexpected port %d", presigned_port); + flb_sds_destroy(presigned_host); + flb_sds_destroy(uri); + return -1; + } + + original_host = ctx->s3_client->host; + ctx->s3_client->host = presigned_host; } else { + uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 11 + + flb_sds_len(m_upload->upload_id)); + if (!uri) { + flb_errno(); + return -1; + } + tmp = flb_sds_printf(&uri, "/%s%s?uploadId=%s", ctx->bucket, m_upload->s3_key, m_upload->upload_id); + if (!tmp) { + goto abort_cleanup; + } + uri = tmp; } - if (!tmp) { - flb_sds_destroy(uri); - return -1; - } - uri = tmp; - s3_client = ctx->s3_client; if (s3_plugin_under_test() == FLB_TRUE) { c = mock_s3_call("TEST_ABORT_MULTIPART_UPLOAD_ERROR", "AbortMultipartUpload"); @@ -528,8 +576,6 @@ int abort_multipart_upload(struct flb_s3 *ctx, uri, NULL, 0, NULL, 0); } - flb_sds_destroy(uri); - if (c) { flb_plg_debug(ctx->ins, "AbortMultipartUpload http status=%d", c->resp.status); @@ -540,7 +586,8 @@ int abort_multipart_upload(struct flb_s3 *ctx, flb_http_client_destroy(c); /* remove this upload from the file system */ remove_upload_from_fs(ctx, m_upload); - return 0; + result = 0; + goto abort_cleanup; } flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size, "AbortMultipartUpload", ctx->ins); @@ -552,7 +599,19 @@ int abort_multipart_upload(struct flb_s3 *ctx, } flb_plg_error(ctx->ins, "AbortMultipartUpload request failed"); - return -1; + result = -1; + +abort_cleanup: + if (original_host != NULL) { + ctx->s3_client->host = original_host; + flb_sds_destroy(presigned_host); + } + + if (uri != NULL) { + flb_sds_destroy(uri); + } + + return result; } int create_multipart_upload(struct flb_s3 *ctx, @@ -566,25 +625,40 @@ int create_multipart_upload(struct flb_s3 *ctx, struct flb_aws_header *headers = NULL; int num_headers = 0; int ret; - - uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 8); - if (!uri) { - flb_errno(); - return -1; - } + flb_sds_t presigned_host = NULL; + const char *original_host = NULL; + int presigned_port = 0; if (pre_signed_url != NULL) { - tmp = flb_sds_copy(uri, pre_signed_url, strlen(pre_signed_url)); + ret = s3_parse_presigned_url(ctx, pre_signed_url, &presigned_host, &uri, &presigned_port); + if (ret != 0) { + return -1; + } + + if (presigned_port != 0 && presigned_port != ctx->port) { + flb_plg_error(ctx->ins, "Pre signed URL uses unexpected port %d", presigned_port); + flb_sds_destroy(presigned_host); + flb_sds_destroy(uri); + return -1; + } + + original_host = ctx->s3_client->host; + ctx->s3_client->host = presigned_host; } else { - tmp = flb_sds_printf(&uri, "/%s%s?uploads=", ctx->bucket, m_upload->s3_key); - } + uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 8); + if (!uri) { + flb_errno(); + return -1; + } - if (!tmp) { - flb_sds_destroy(uri); - return -1; + tmp = flb_sds_printf(&uri, "/%s%s?uploads=", ctx->bucket, m_upload->s3_key); + if (!tmp) { + ret = -1; + goto cleanup; + } + uri = tmp; } - uri = tmp; s3_client = ctx->s3_client; if (s3_plugin_under_test() == FLB_TRUE) { @@ -594,8 +668,7 @@ int create_multipart_upload(struct flb_s3 *ctx, ret = create_headers(ctx, NULL, &headers, &num_headers, FLB_TRUE); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to create headers"); - flb_sds_destroy(uri); - return -1; + goto cleanup; } c = s3_client->client_vtable->request(s3_client, FLB_HTTP_POST, uri, NULL, 0, headers, num_headers); @@ -603,7 +676,18 @@ int create_multipart_upload(struct flb_s3 *ctx, flb_free(headers); } } - flb_sds_destroy(uri); + ret = -1; + +cleanup: + if (original_host != NULL) { + ctx->s3_client->host = original_host; + flb_sds_destroy(presigned_host); + } + + if (uri != NULL) { + flb_sds_destroy(uri); + } + if (c) { flb_plg_debug(ctx->ins, "CreateMultipartUpload http status=%d", c->resp.status); @@ -635,7 +719,7 @@ int create_multipart_upload(struct flb_s3 *ctx, } flb_plg_error(ctx->ins, "CreateMultipartUpload request failed"); - return -1; + return ret; } /* gets the ETag value from response headers */ @@ -693,44 +777,56 @@ int upload_part(struct flb_s3 *ctx, struct multipart_upload *m_upload, struct flb_aws_header *headers = NULL; int num_headers = 0; char body_md5[25]; - - uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 8); - if (!uri) { - flb_errno(); - return -1; - } + flb_sds_t presigned_host = NULL; + const char *original_host = NULL; + int presigned_port = 0; + int result = -1; if (pre_signed_url != NULL) { - tmp = flb_sds_copy(uri, pre_signed_url, strlen(pre_signed_url)); + ret = s3_parse_presigned_url(ctx, pre_signed_url, &presigned_host, &uri, &presigned_port); + if (ret != 0) { + return -1; + } + + if (presigned_port != 0 && presigned_port != ctx->port) { + flb_plg_error(ctx->ins, "Pre signed URL uses unexpected port %d", presigned_port); + flb_sds_destroy(presigned_host); + flb_sds_destroy(uri); + return -1; + } + + original_host = ctx->s3_client->host; + ctx->s3_client->host = presigned_host; } else { - tmp = flb_sds_printf(&uri, "/%s%s?partNumber=%d&uploadId=%s", - ctx->bucket, m_upload->s3_key, m_upload->part_number, - m_upload->upload_id); - } + uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 8); + if (!uri) { + flb_errno(); + return -1; + } - if (!tmp) { - flb_errno(); - flb_sds_destroy(uri); - return -1; + tmp = flb_sds_printf(&uri, "/%s%s?partNumber=%d&uploadId=%s", + ctx->bucket, m_upload->s3_key, m_upload->part_number, + m_upload->upload_id); + if (!tmp) { + goto cleanup; + } + uri = tmp; } - uri = tmp; memset(body_md5, 0, sizeof(body_md5)); if (ctx->send_content_md5 == FLB_TRUE) { ret = get_md5_base64(body, body_size, body_md5, sizeof(body_md5)); if (ret != 0) { flb_plg_error(ctx->ins, "Failed to create Content-MD5 header"); - flb_sds_destroy(uri); - return -1; + goto cleanup; } num_headers = 1; headers = flb_malloc(sizeof(struct flb_aws_header) * num_headers); if (headers == NULL) { flb_errno(); - flb_sds_destroy(uri); - return -1; + goto cleanup; } headers[0].key = "Content-MD5"; @@ -748,8 +844,11 @@ int upload_part(struct flb_s3 *ctx, struct multipart_upload *m_upload, uri, body, body_size, headers, num_headers); } - flb_free(headers); - flb_sds_destroy(uri); + if (headers != NULL) { + flb_free(headers); + headers = NULL; + } + if (c) { flb_plg_info(ctx->ins, "UploadPart http status=%d", c->resp.status); @@ -761,7 +860,7 @@ int upload_part(struct flb_s3 *ctx, struct multipart_upload *m_upload, flb_plg_debug(ctx->ins, "Raw UploadPart response: %s", c->resp.payload); flb_http_client_destroy(c); - return -1; + goto cleanup; } m_upload->etags[m_upload->part_number - 1] = tmp; flb_plg_info(ctx->ins, "Successfully uploaded part #%d " @@ -783,7 +882,8 @@ int upload_part(struct flb_s3 *ctx, struct multipart_upload *m_upload, "could be lost, UploadId=%s, ETag=%s", m_upload->upload_id, tmp); } - return 0; + result = 0; + goto cleanup; } flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size, "UploadPart", ctx->ins); @@ -795,5 +895,16 @@ int upload_part(struct flb_s3 *ctx, struct multipart_upload *m_upload, } flb_plg_error(ctx->ins, "UploadPart request failed"); - return -1; + +cleanup: + if (original_host != NULL) { + ctx->s3_client->host = original_host; + flb_sds_destroy(presigned_host); + } + + if (uri != NULL) { + flb_sds_destroy(uri); + } + + return result; }