diff --git a/examples/clickpipe/object_storage_s3_sqs_iam_role/README.md b/examples/clickpipe/object_storage_s3_sqs_iam_role/README.md new file mode 100644 index 00000000..37d2a01f --- /dev/null +++ b/examples/clickpipe/object_storage_s3_sqs_iam_role/README.md @@ -0,0 +1,9 @@ +## ClickPipe Object Storage: Unordered Mode with IAM role example + +This example demonstrates how to deploy a ClickPipe with S3 continuous ingestion using unordered mode (event-based via SQS) with IAM role authentication. + +## How to run + +- Rename `variables.sample.tfvars` to `variables.tfvars` and fill in all needed data. +- Run `terraform init` +- Run `terraform -var-file=variables.tfvars` diff --git a/examples/clickpipe/object_storage_s3_sqs_iam_role/main.tf b/examples/clickpipe/object_storage_s3_sqs_iam_role/main.tf new file mode 100644 index 00000000..0e822dc5 --- /dev/null +++ b/examples/clickpipe/object_storage_s3_sqs_iam_role/main.tf @@ -0,0 +1,106 @@ +variable "organization_id" {} +variable "token_key" {} +variable "token_secret" {} + +variable "service_id" { + description = "ClickHouse Cloud service ID" +} + +variable "bucket_url" { + description = "S3 bucket URL pattern (e.g., s3://my-bucket/path/*.json)" +} + +variable "sqs_queue_url" { + description = "SQS queue URL for S3 event notifications (e.g., https://sqs.us-east-1.amazonaws.com/123456789012/my-queue)" +} + +variable "iam_role" { + description = "ARN of the IAM role with permissions to read from S3 and receive SQS messages" + sensitive = true +} + +# S3 ClickPipe with continuous ingestion using SQS event notifications +# This example demonstrates event-based continuous ingestion where new files +# are detected via S3 event notifications sent to an SQS queue, rather than +# polling S3 for new files in lexicographical order. +resource "clickhouse_clickpipe" "s3_sqs_continuous" { + name = "S3 Continuous ClickPipe with SQS (IAM Role)" + service_id = var.service_id + + source = { + object_storage = { + type = "s3" + format = "JSONEachRow" + url = var.bucket_url + + # Enable continuous ingestion with event-based processing + is_continuous = true + queue_url = var.sqs_queue_url + + # IAM role authentication - recommended for AWS services + authentication = "IAM_ROLE" + iam_role = var.iam_role + } + } + + destination = { + table = "s3_events_data" + managed_table = true + + table_definition = { + engine = { + type = "MergeTree" + } + + sorting_key = ["timestamp"] + } + + columns = [ + { + name = "id" + type = "String" + }, + { + name = "timestamp" + type = "DateTime64(3)" + }, + { + name = "event_type" + type = "String" + }, + { + name = "data" + type = "String" + } + ] + } + + field_mappings = [ + { + source_field = "id" + destination_field = "id" + }, + { + source_field = "timestamp" + destination_field = "timestamp" + }, + { + source_field = "event_type" + destination_field = "event_type" + }, + { + source_field = "data" + destination_field = "data" + } + ] +} + +output "clickpipe_id" { + value = clickhouse_clickpipe.s3_sqs_continuous.id + description = "The ID of the created ClickPipe" +} + +output "clickpipe_state" { + value = clickhouse_clickpipe.s3_sqs_continuous.state + description = "The current state of the ClickPipe" +} diff --git a/examples/clickpipe/object_storage_s3_sqs_iam_role/provider.tf b/examples/clickpipe/object_storage_s3_sqs_iam_role/provider.tf new file mode 100644 index 00000000..cf47d1da --- /dev/null +++ b/examples/clickpipe/object_storage_s3_sqs_iam_role/provider.tf @@ -0,0 +1,13 @@ +terraform { + required_providers { + clickhouse = { + source = "ClickHouse/clickhouse" + } + } +} + +provider "clickhouse" { + organization_id = var.organization_id + token_key = var.token_key + token_secret = var.token_secret +} diff --git a/examples/clickpipe/object_storage_s3_sqs_iam_role/provider.tf.template.alpha b/examples/clickpipe/object_storage_s3_sqs_iam_role/provider.tf.template.alpha new file mode 100644 index 00000000..f9e76961 --- /dev/null +++ b/examples/clickpipe/object_storage_s3_sqs_iam_role/provider.tf.template.alpha @@ -0,0 +1,14 @@ +terraform { + required_providers { + clickhouse = { + version = "${CLICKHOUSE_TERRAFORM_PROVIDER_VERSION}" + source = "ClickHouse/clickhouse" + } + } +} + +provider "clickhouse" { + organization_id = var.organization_id + token_key = var.token_key + token_secret = var.token_secret +} diff --git a/examples/clickpipe/object_storage_s3_sqs_iam_role/variables.sample.tfvars b/examples/clickpipe/object_storage_s3_sqs_iam_role/variables.sample.tfvars new file mode 100644 index 00000000..7200ed43 --- /dev/null +++ b/examples/clickpipe/object_storage_s3_sqs_iam_role/variables.sample.tfvars @@ -0,0 +1,13 @@ +# These keys are for example only and won't work when pointed to a deployed ClickHouse OpenAPI server +organization_id = "aee076c1-3f83-4637-95b1-ad5a0a825b71" +token_key = "avhj1U5QCdWAE9CA9" +token_secret = "4b1dROiHQEuSXJHlV8zHFd0S7WQj7CGxz5kGJeJnca" +service_id = "aee076c1-3f83-4637-95b1-ad5a0a825b71" + +bucket_url = "s3://mybucket/path/*.json" + +# SQS queue URL must follow the format: https://sqs.{region}.amazonaws.com/{account-id}/{queue-name} +sqs_queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/my-clickpipe-queue" + +# IAM role ARN with permissions to read S3 and receive SQS messages +iam_role = "arn:aws:iam::123456789012:role/ClickPipeRole" diff --git a/examples/clickpipe/object_storage_s3_sqs_iam_user/README.md b/examples/clickpipe/object_storage_s3_sqs_iam_user/README.md new file mode 100644 index 00000000..329099ab --- /dev/null +++ b/examples/clickpipe/object_storage_s3_sqs_iam_user/README.md @@ -0,0 +1,9 @@ +## ClickPipe Object Storage: Unordered Mode with IAM user example + +This example demonstrates how to deploy a ClickPipe with S3 continuous ingestion using unordered mode (event-based via SQS) with IAM user authentication. + +## How to run + +- Rename `variables.sample.tfvars` to `variables.tfvars` and fill in all needed data. +- Run `terraform init` +- Run `terraform -var-file=variables.tfvars` diff --git a/examples/clickpipe/object_storage_s3_sqs_iam_user/main.tf b/examples/clickpipe/object_storage_s3_sqs_iam_user/main.tf new file mode 100644 index 00000000..80efe77d --- /dev/null +++ b/examples/clickpipe/object_storage_s3_sqs_iam_user/main.tf @@ -0,0 +1,114 @@ +variable "organization_id" {} +variable "token_key" {} +variable "token_secret" {} + +variable "service_id" { + description = "ClickHouse Cloud service ID" +} + +variable "bucket_url" { + description = "S3 bucket URL pattern (e.g., s3://my-bucket/path/*.json)" +} + +variable "sqs_queue_url" { + description = "SQS queue URL for S3 event notifications (e.g., https://sqs.us-east-1.amazonaws.com/123456789012/my-queue)" +} + +variable "iam_access_key_id" { + description = "AWS IAM access key ID with permissions to read from S3 and receive SQS messages" + sensitive = true +} + +variable "iam_secret_key" { + description = "AWS IAM secret access key" + sensitive = true +} + +# S3 ClickPipe with continuous ingestion using SQS event notifications +# This example demonstrates event-based continuous ingestion where new files +# are detected via S3 event notifications sent to an SQS queue, rather than +# polling S3 for new files in lexicographical order. +resource "clickhouse_clickpipe" "s3_sqs_continuous" { + name = "S3 Continuous ClickPipe with SQS (IAM User)" + service_id = var.service_id + + source = { + object_storage = { + type = "s3" + format = "JSONEachRow" + url = var.bucket_url + + # Enable continuous ingestion with event-based processing + is_continuous = true + queue_url = var.sqs_queue_url + + # IAM user authentication + authentication = "IAM_USER" + access_key = { + access_key_id = var.iam_access_key_id + secret_key = var.iam_secret_key + } + } + } + + destination = { + table = "s3_events_data" + managed_table = true + + table_definition = { + engine = { + type = "MergeTree" + } + + sorting_key = ["timestamp"] + } + + columns = [ + { + name = "id" + type = "String" + }, + { + name = "timestamp" + type = "DateTime64(3)" + }, + { + name = "event_type" + type = "String" + }, + { + name = "data" + type = "String" + } + ] + } + + field_mappings = [ + { + source_field = "id" + destination_field = "id" + }, + { + source_field = "timestamp" + destination_field = "timestamp" + }, + { + source_field = "event_type" + destination_field = "event_type" + }, + { + source_field = "data" + destination_field = "data" + } + ] +} + +output "clickpipe_id" { + value = clickhouse_clickpipe.s3_sqs_continuous.id + description = "The ID of the created ClickPipe" +} + +output "clickpipe_state" { + value = clickhouse_clickpipe.s3_sqs_continuous.state + description = "The current state of the ClickPipe" +} diff --git a/examples/clickpipe/object_storage_s3_sqs_iam_user/provider.tf b/examples/clickpipe/object_storage_s3_sqs_iam_user/provider.tf new file mode 100644 index 00000000..cf47d1da --- /dev/null +++ b/examples/clickpipe/object_storage_s3_sqs_iam_user/provider.tf @@ -0,0 +1,13 @@ +terraform { + required_providers { + clickhouse = { + source = "ClickHouse/clickhouse" + } + } +} + +provider "clickhouse" { + organization_id = var.organization_id + token_key = var.token_key + token_secret = var.token_secret +} diff --git a/examples/clickpipe/object_storage_s3_sqs_iam_user/provider.tf.template.alpha b/examples/clickpipe/object_storage_s3_sqs_iam_user/provider.tf.template.alpha new file mode 100644 index 00000000..f9e76961 --- /dev/null +++ b/examples/clickpipe/object_storage_s3_sqs_iam_user/provider.tf.template.alpha @@ -0,0 +1,14 @@ +terraform { + required_providers { + clickhouse = { + version = "${CLICKHOUSE_TERRAFORM_PROVIDER_VERSION}" + source = "ClickHouse/clickhouse" + } + } +} + +provider "clickhouse" { + organization_id = var.organization_id + token_key = var.token_key + token_secret = var.token_secret +} diff --git a/examples/clickpipe/object_storage_s3_sqs_iam_user/variables.sample.tfvars b/examples/clickpipe/object_storage_s3_sqs_iam_user/variables.sample.tfvars new file mode 100644 index 00000000..b54f35c1 --- /dev/null +++ b/examples/clickpipe/object_storage_s3_sqs_iam_user/variables.sample.tfvars @@ -0,0 +1,13 @@ +# These keys are for example only and won't work when pointed to a deployed ClickHouse OpenAPI server +organization_id = "aee076c1-3f83-4637-95b1-ad5a0a825b71" +token_key = "avhj1U5QCdWAE9CA9" +token_secret = "4b1dROiHQEuSXJHlV8zHFd0S7WQj7CGxz5kGJeJnca" +service_id = "aee076c1-3f83-4637-95b1-ad5a0a825b71" + +bucket_url = "s3://mybucket/path/*.json" + +# SQS queue URL must follow the format: https://sqs.{region}.amazonaws.com/{account-id}/{queue-name} +sqs_queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/my-clickpipe-queue" + +iam_access_key_id = "AKIAIOSFODNN7EXAMPLE" +iam_secret_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" diff --git a/pkg/internal/api/clickpipe_models.go b/pkg/internal/api/clickpipe_models.go index 1b7c1b66..f55eded3 100644 --- a/pkg/internal/api/clickpipe_models.go +++ b/pkg/internal/api/clickpipe_models.go @@ -114,7 +114,8 @@ type ClickPipeObjectStorageSource struct { Delimiter *string `json:"delimiter,omitempty"` Compression *string `json:"compression,omitempty"` - IsContinuous bool `json:"isContinuous"` + IsContinuous bool `json:"isContinuous"` + QueueURL *string `json:"queueUrl,omitempty"` Authentication *string `json:"authentication,omitempty"` AccessKey *ClickPipeSourceAccessKey `json:"accessKey,omitempty"` diff --git a/pkg/resource/clickpipe.go b/pkg/resource/clickpipe.go index 411dfa9e..3e0ebbbc 100644 --- a/pkg/resource/clickpipe.go +++ b/pkg/resource/clickpipe.go @@ -5,6 +5,7 @@ package resource import ( "context" "fmt" + "regexp" "strings" "time" @@ -401,6 +402,19 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest, boolplanmodifier.RequiresReplace(), }, }, + "queue_url": schema.StringAttribute{ + MarkdownDescription: "SQS queue URL for event-based continuous ingestion. When provided, files are ingested based on S3 event notifications rather than lexicographical order. Only applicable when `is_continuous` is `true`, storage type is `s3`, and authentication is provided. Format: `https://sqs.{region}.amazonaws.com/{account-id}/{queue-name}`", + Optional: true, + Validators: []validator.String{ + stringvalidator.RegexMatches( + regexp.MustCompile(`^https://sqs\.[a-z0-9.-]+\.amazonaws\.com/\d{12}/[a-zA-Z0-9._-]+$`), + "must be a valid SQS URL in the format https://sqs.{region}.amazonaws.com/{12-digit-account-id}/{queue-name}", + ), + }, + PlanModifiers: []planmodifier.String{ + stringplanmodifier.RequiresReplace(), + }, + }, "authentication": schema.StringAttribute{ MarkdownDescription: "CONNECTION_STRING is for Azure Blob Storage. IAM_ROLE and IAM_USER are for AWS S3/GCS/DigitalOcean. If not provided, no authentication is used", Optional: true, @@ -796,6 +810,44 @@ func (c *ClickPipeResource) ModifyPlan(ctx context.Context, request resource.Mod } } + // Validate queue_url configuration for object storage + if !plan.Source.IsNull() { + sourceModel := models.ClickPipeSourceModel{} + response.Diagnostics.Append(plan.Source.As(ctx, &sourceModel, basetypes.ObjectAsOptions{})...) + + if !sourceModel.ObjectStorage.IsNull() { + objectStorageModel := models.ClickPipeObjectStorageSourceModel{} + response.Diagnostics.Append(sourceModel.ObjectStorage.As(ctx, &objectStorageModel, basetypes.ObjectAsOptions{})...) + + // Validate queue_url is only provided when is_continuous is true + if !objectStorageModel.QueueURL.IsNull() && !objectStorageModel.QueueURL.IsUnknown() && objectStorageModel.QueueURL.ValueString() != "" { + if !objectStorageModel.IsContinuous.ValueBool() { + response.Diagnostics.AddError( + "Invalid Configuration", + "queue_url can only be provided when is_continuous is true", + ) + } + + // Validate queue_url is only used with S3 storage type + if objectStorageModel.Type.ValueString() != api.ClickPipeObjectStorageS3Type { + response.Diagnostics.AddError( + "Invalid Configuration", + "queue_url is only supported for S3 object storage", + ) + } + + // Validate queue_url requires IAM authentication + authType := objectStorageModel.Authentication.ValueString() + if authType != api.ClickPipeAuthenticationIAMUser && authType != api.ClickPipeAuthenticationIAMRole { + response.Diagnostics.AddError( + "Invalid Configuration", + "queue_url requires authentication type to be either IAM_USER or IAM_ROLE", + ) + } + } + } + } + if !request.State.Raw.IsNull() && !state.State.IsNull() { currentState := state.State.ValueString() @@ -1184,6 +1236,7 @@ func (c *ClickPipeResource) extractSourceFromPlan(ctx context.Context, diagnosti Delimiter: objectStorageModel.Delimiter.ValueStringPointer(), Compression: objectStorageModel.Compression.ValueStringPointer(), IsContinuous: objectStorageModel.IsContinuous.ValueBool(), + QueueURL: objectStorageModel.QueueURL.ValueStringPointer(), Authentication: objectStorageModel.Authentication.ValueStringPointer(), AccessKey: accessKey, IAMRole: objectStorageModel.IAMRole.ValueStringPointer(), @@ -1402,6 +1455,7 @@ func (c *ClickPipeResource) syncClickPipeState(ctx context.Context, state *model Delimiter: types.StringPointerValue(clickPipe.Source.ObjectStorage.Delimiter), Compression: types.StringPointerValue(clickPipe.Source.ObjectStorage.Compression), IsContinuous: types.BoolValue(clickPipe.Source.ObjectStorage.IsContinuous), + QueueURL: types.StringPointerValue(clickPipe.Source.ObjectStorage.QueueURL), IAMRole: types.StringPointerValue(clickPipe.Source.ObjectStorage.IAMRole), } diff --git a/pkg/resource/models/clickpipe_resource.go b/pkg/resource/models/clickpipe_resource.go index 97fa8819..95e3cb76 100644 --- a/pkg/resource/models/clickpipe_resource.go +++ b/pkg/resource/models/clickpipe_resource.go @@ -257,6 +257,7 @@ type ClickPipeObjectStorageSourceModel struct { Delimiter types.String `tfsdk:"delimiter"` Compression types.String `tfsdk:"compression"` IsContinuous types.Bool `tfsdk:"is_continuous"` + QueueURL types.String `tfsdk:"queue_url"` Authentication types.String `tfsdk:"authentication"` AccessKey types.Object `tfsdk:"access_key"` IAMRole types.String `tfsdk:"iam_role"` @@ -276,6 +277,7 @@ func (m ClickPipeObjectStorageSourceModel) ObjectType() types.ObjectType { "delimiter": types.StringType, "compression": types.StringType, "is_continuous": types.BoolType, + "queue_url": types.StringType, "authentication": types.StringType, "access_key": ClickPipeSourceAccessKeyModel{}.ObjectType(), "iam_role": types.StringType, @@ -294,6 +296,7 @@ func (m ClickPipeObjectStorageSourceModel) ObjectValue() types.Object { "delimiter": m.Delimiter, "compression": m.Compression, "is_continuous": m.IsContinuous, + "queue_url": m.QueueURL, "authentication": m.Authentication, "access_key": m.AccessKey, "iam_role": m.IAMRole,