Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/java/com/uid2/optout/Const.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ public static class Config extends com.uid2.shared.Const.Config {
public static final String OptOutSqsQueueUrlProp = "optout_sqs_queue_url";
public static final String OptOutSqsEnabledProp = "optout_enqueue_sqs_enabled";
public static final String OptOutSqsS3FolderProp = "optout_sqs_s3_folder"; // Default: "sqs-delta" - folder within same S3 bucket as regular optout
public static final String OptOutS3BucketDroppedRequestsProp = "optout_s3_bucket_dropped_requests";
public static final String OptOutSqsMaxMessagesPerPollProp = "optout_sqs_max_messages_per_poll";
public static final String OptOutSqsVisibilityTimeoutProp = "optout_sqs_visibility_timeout";
public static final String TrafficFilterConfigPathProp = "traffic_filter_config_path";
public static final String TrafficCalcConfigPathProp = "traffic_calc_config_path";
public static final String ManualOverrideS3PathProp = "manual_override_s3_path";
}

public static class Event {
Expand Down
17 changes: 13 additions & 4 deletions src/main/java/com/uid2/optout/Main.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.uid2.optout;

import com.uid2.optout.vertx.*;
import com.uid2.optout.vertx.OptOutTrafficFilter.MalformedTrafficFilterConfigException;
import com.uid2.optout.vertx.OptOutTrafficCalculator.MalformedTrafficCalcConfigException;
import com.uid2.shared.ApplicationVersion;
import com.uid2.shared.Utils;
import com.uid2.shared.attest.AttestationResponseHandler;
Expand All @@ -27,7 +29,6 @@
import io.vertx.config.ConfigRetriever;
import io.vertx.core.*;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.impl.HttpUtils;
import io.vertx.core.json.JsonObject;
import io.vertx.micrometer.MetricsDomain;
import org.slf4j.Logger;
Expand Down Expand Up @@ -296,14 +297,22 @@ public void run(String[] args) throws IOException {
fsSqs = CloudUtils.createStorage(optoutBucket, sqsConfig);
}

// Deploy SQS log producer with its own storage instance
OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, sqsCs);
// Create SQS-specific cloud storage instance for dropped requests (different bucket)
String optoutBucketDroppedRequests = this.config.getString(Const.Config.OptOutS3BucketDroppedRequestsProp);
ICloudStorage fsSqsDroppedRequests = CloudUtils.createStorage(optoutBucketDroppedRequests, config);

// Deploy SQS log producer with its own storage instance
OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(config, fsSqs, fsSqsDroppedRequests, sqsCs, Const.Event.DeltaProduce, null);
futs.add(this.deploySingleInstance(sqsLogProducer));

LOGGER.info("SQS log producer deployed - bucket: {}, folder: {}",
this.config.getString(Const.Config.OptOutS3BucketProp), sqsFolder);
} catch (IOException e) {
LOGGER.error("Failed to initialize SQS log producer: " + e.getMessage(), e);
LOGGER.error("Failed to initialize SQS log producer, delta production will be disabled: " + e.getMessage(), e);
} catch (MalformedTrafficFilterConfigException e) {
LOGGER.error("The traffic filter config is malformed, refusing to process messages, delta production will be disabled: " + e.getMessage(), e);
} catch (MalformedTrafficCalcConfigException e) {
LOGGER.error("The traffic calc config is malformed, refusing to process messages, delta production will be disabled: " + e.getMessage(), e);
}
}

Expand Down
35 changes: 34 additions & 1 deletion src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package com.uid2.optout.vertx;

import io.vertx.core.json.JsonObject;

/**
* Result object containing statistics from delta production.
*/
public class DeltaProductionResult {
private final int deltasProduced;
private final int entriesProcessed;
private final int droppedRequestFilesProduced;
private final int droppedRequestsProcessed;

public DeltaProductionResult(int deltasProduced, int entriesProcessed) {
public DeltaProductionResult(int deltasProduced, int entriesProcessed, int droppedRequestFilesProduced, int droppedRequestsProcessed) {
this.deltasProduced = deltasProduced;
this.entriesProcessed = entriesProcessed;
this.droppedRequestFilesProduced = droppedRequestFilesProduced;
this.droppedRequestsProcessed = droppedRequestsProcessed;
}

public int getDeltasProduced() {
Expand All @@ -19,5 +25,32 @@ public int getDeltasProduced() {
public int getEntriesProcessed() {
return entriesProcessed;
}

public int getDroppedRequestFilesProduced() {
return droppedRequestFilesProduced;
}

public int getDroppedRequestsProcessed() {
return droppedRequestsProcessed;
}

public JsonObject encodeSuccessResult() {
return new JsonObject()
.put("status", "success")
.put("deltas_produced", deltasProduced)
.put("entries_processed", entriesProcessed)
.put("dropped_request_files_produced", droppedRequestFilesProduced)
.put("dropped_requests_processed", droppedRequestsProcessed);
}

public static JsonObject createSkippedResult(String reason) {
return new JsonObject()
.put("status", "skipped")
.put("reason", reason)
.put("deltas_produced", 0)
.put("entries_processed", 0)
.put("dropped_request_files_produced", 0)
.put("dropped_requests_processed", 0);
}
}

Loading
Loading