diff --git a/.gitignore b/.gitignore index 2660de2..90bb36d 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,7 @@ Cargo.lock .DS_Store *.cblite2/ + +# Test results +test_results/ +response_to_thomas.md diff --git a/Cargo.toml b/Cargo.toml index 479d5fd..777082e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,8 @@ enum_primitive = "0.1.1" lazy_static = "1.5.0" regex = "1.11.1" serde_json = "1" +serde = { version = "1", features = ["derive"] } +chrono = "0.4" tempdir = "0.3.7" [dev-dependencies.reqwest] diff --git a/examples/README.md b/examples/README.md index 9b08267..41a19cd 100644 --- a/examples/README.md +++ b/examples/README.md @@ -42,13 +42,109 @@ Update the file `docker-conf/db-config.json` and run $ curl -XPUT -v "http://localhost:4985/my-db/" -H 'Content-Type: application/json' --data-binary @docker-conf/db-config.json ``` +## Automated Test Infrastructure + +The `tombstone_purge_test` includes comprehensive automation: + +- **Automatic Docker environment management**: Stops, rebuilds, and starts containers with correct configuration +- **Git validation**: Ensures no uncommitted changes before running +- **Timezone synchronization**: Verifies containers use same timezone as host +- **Structured reporting**: Generates comprehensive test reports in `test_results/` directory + +### Test Reports + +Each test run generates a timestamped report directory containing: +- `README.md`: Executive summary with test checkpoints and findings +- `metadata.json`: Test metadata, commit SHA, GitHub link +- `tombstone_states.json`: Full `_sync` xattr content at each checkpoint +- `test_output.log`: Complete console output +- `cbs_logs.log`: Couchbase Server container logs +- `sgw_logs.log`: Sync Gateway container logs + +**Example report path**: `test_results/test_run_2025-11-01_08-00-00_8db78d6/` + +### Important Findings + +**Tombstone Purge Behavior:** +- ✅ Tombstones are purged after 1 hour when purge interval is configured **at bucket creation** +- ❌ Configuring purge interval after tombstones are created does NOT purge existing tombstones +- ✅ Re-created documents are always treated as new (`flags=0`) even if tombstone persists + +**Reset Checkpoint Limitation:** +- ❌ Reset checkpoint alone does NOT re-push unmodified documents +- CBLite only pushes documents that changed since last successful sync +- For BC-994 scenario, documents must be modified locally before reset to trigger push + ## Running an example -As of now, there is only one example: `ticket_70596`. +### Available examples + +#### `check_cbs_config` +Utility to verify Couchbase Server bucket configuration, especially metadata purge interval. + +**Runtime: Instant** + +```shell +$ cargo run --features=enterprise --example check_cbs_config +``` + +Expected output: +``` +✓ CBS metadata purge interval (at purgeInterval): 0.04 + = 0.04 days (~1.0 hours, ~58 minutes) +``` + +#### `tombstone_quick_check` +Rapid validation test for tombstone detection via XATTRs. Verifies that tombstones are correctly identified in CBS without waiting for purge intervals. + +**Runtime: ~30 seconds** +**Output**: Clean, no warnings + +```shell +$ cargo run --features=enterprise --example tombstone_quick_check +``` + +#### `ticket_70596` +Demonstrates auto-purge behavior when documents are moved to inaccessible channels. -It can be run with the following command: ```shell $ cargo run --features=enterprise --example ticket_70596 ``` -There are utility functions available to interact with the Sync Gateway or Couchbase Server, feel free to add more if needed. +#### `tombstone_purge_test` +Complete tombstone purge test following Couchbase support recommendations (Thomas). Tests whether tombstones can be completely purged from CBS and SGW after the minimum 1-hour interval, such that re-creating a document with the same ID is treated as a new document. + +**Runtime: ~65-70 minutes** (+ ~5 minutes for Docker rebuild) +**Features**: Automatic Docker management, structured reporting + +```shell +$ cargo run --features=enterprise --example tombstone_purge_test +``` + +**What it does automatically:** +- ✅ Checks git status (fails if uncommitted changes) +- ✅ Rebuilds Docker environment (docker compose down -v && up) +- ✅ Verifies CBS purge interval configuration +- ✅ Runs complete test with checkpoints +- ✅ Generates structured report in `test_results/` +- ✅ Captures CBS and SGW logs + +**Test scenario:** +1. Create document in accessible channel and replicate +2. Delete document (creating tombstone) +3. Purge tombstone from Sync Gateway +4. Verify CBS purge interval (configured at bucket creation) +5. Wait 65 minutes +6. Compact CBS and SGW +7. Verify tombstone state (purged or persisting) +8. Re-create document with same ID and verify it's treated as new (flags=0, not flags=1) + +**Report location**: `test_results/test_run__/` + +### Utility functions + +There are utility functions available in `examples/utils/` to interact with the Sync Gateway and Couchbase Server: +- **SGW admin operations**: user management, sessions, document operations, database lifecycle +- **CBS admin operations**: bucket compaction, document queries, tombstone management, metadata purge interval configuration + +Feel free to add more if needed. diff --git a/examples/check_cbs_config.rs b/examples/check_cbs_config.rs new file mode 100644 index 0000000..0bda21c --- /dev/null +++ b/examples/check_cbs_config.rs @@ -0,0 +1,12 @@ +mod utils; + +use utils::*; + +fn main() { + println!("=== CBS Configuration Check ===\n"); + + println!("Checking current metadata purge interval configuration:"); + get_metadata_purge_interval(); + + println!("\n=== Check complete ==="); +} diff --git a/examples/docker-conf/couchbase-server-dev/configure-server.sh b/examples/docker-conf/couchbase-server-dev/configure-server.sh index a951963..371df8d 100755 --- a/examples/docker-conf/couchbase-server-dev/configure-server.sh +++ b/examples/docker-conf/couchbase-server-dev/configure-server.sh @@ -49,6 +49,30 @@ function bucketCreate() { fi } +function configureBucketCompaction() { + # Configure metadata purge interval to 1 hour (0.04 days) - CBS minimum + # This is important for tombstone purge testing with Sync Gateway + # Default is 3 days, which is too long for testing + # + # IMPORTANT: Must use REST API to configure per-bucket auto-compaction + # The couchbase-cli setting-compaction command only sets cluster-wide defaults + # + # Required parameters: + # - autoCompactionDefined=true: Enable per-bucket auto-compaction override + # - purgeInterval=0.04: Metadata purge interval (1 hour minimum) + # - parallelDBAndViewCompaction: Required parameter for auto-compaction + curl -X POST \ + -u "$COUCHBASE_ADMINISTRATOR_USERNAME:$COUCHBASE_ADMINISTRATOR_PASSWORD" \ + "http://127.0.0.1:8091/pools/default/buckets/$COUCHBASE_BUCKET" \ + -d "autoCompactionDefined=true" \ + -d "purgeInterval=0.04" \ + -d "parallelDBAndViewCompaction=false" + + if [[ $? != 0 ]]; then + return 1 + fi +} + function userSgCreate() { couchbase-cli user-manage \ -c 127.0.0.1:8091 \ @@ -101,6 +125,15 @@ function main() { echo "Creating the bucket [OK]" echo + echo "Configuring bucket compaction settings...." + retry configureBucketCompaction + if [[ $? != 0 ]]; then + echo "Bucket compaction config failed. Exiting." >&2 + exit 1 + fi + echo "Configuring bucket compaction settings [OK]" + echo + echo "Creating Sync Gateway user...." retry userSgCreate if [[ $? != 0 ]]; then diff --git a/examples/docker-conf/docker-compose.yml b/examples/docker-conf/docker-compose.yml index 202a386..1a9a9d6 100644 --- a/examples/docker-conf/docker-compose.yml +++ b/examples/docker-conf/docker-compose.yml @@ -7,6 +7,8 @@ services: - "11210:11210" # memcached port build: context: ${PWD}/couchbase-server-dev + environment: + - TZ=${TZ:-UTC} deploy: resources: limits: @@ -17,6 +19,8 @@ services: ports: - "4984:4984" - "4985:4985" + environment: + - TZ=${TZ:-UTC} deploy: resources: limits: diff --git a/examples/docker-conf/sync-function.js b/examples/docker-conf/sync-function.js index ec263c4..11bf8eb 100644 --- a/examples/docker-conf/sync-function.js +++ b/examples/docker-conf/sync-function.js @@ -7,6 +7,25 @@ function sync(doc, oldDoc, meta) { console.log("Metadata:"); console.log(meta); + // Test logic for BC-994: Handle resurrection after tombstone purge + // Detect documents resurrecting without oldDoc after tombstone expiry + if (!oldDoc && doc.updatedAt) { + var ONE_HOUR_MS = 60 * 60 * 1000; + var updatedAtTimestamp = new Date(doc.updatedAt).getTime(); + var cutoffTimestamp = Date.now() - ONE_HOUR_MS; + + if (updatedAtTimestamp < cutoffTimestamp) { + // Document is resurrecting after tombstone expired + // Route to soft_deleted channel so auto-purge will remove from cblite + console.log(">>> Soft deleting document: updatedAt is older than 1 hour"); + channel("soft_deleted"); + // Set TTL to 5 minutes for testing (production would use 6 months) + expiry(5 * 60); // 5 minutes in seconds + console.log(">>> Document routed to soft_deleted channel with 5-minute TTL"); + return; + } + } + if(doc.channels) { channel(doc.channels); } diff --git a/examples/ticket_70596.rs b/examples/ticket_70596.rs index afcfa26..6005c7c 100644 --- a/examples/ticket_70596.rs +++ b/examples/ticket_70596.rs @@ -5,7 +5,7 @@ use couchbase_lite::*; use utils::*; fn main() { - let mut db = Database::open( + let mut db_cblite = Database::open( "test1", Some(DatabaseConfiguration { directory: Path::new("./"), @@ -19,8 +19,8 @@ fn main() { let session_token = get_session("great_name"); println!("Sync gateway session token: {session_token}"); - let mut repl = - setup_replicator(db.clone(), session_token).add_document_listener(Box::new(doc_listener)); + let mut repl = setup_replicator(db_cblite.clone(), session_token) + .add_document_listener(Box::new(doc_listener)); repl.start(false); @@ -28,22 +28,22 @@ fn main() { // Auto-purge test scenario from support ticket https://support.couchbase.com/hc/en-us/requests/70596?page=1 // Testing if documents pushed to inaccessible channels get auto-purged - create_doc(&mut db, "doc1", "channel1"); - create_doc(&mut db, "doc2", "channel2"); + create_doc(&mut db_cblite, "doc1", "channel1"); + create_doc(&mut db_cblite, "doc2", "channel2"); std::thread::sleep(std::time::Duration::from_secs(10)); - assert!(get_doc(&db, "doc1").is_ok()); - assert!(get_doc(&db, "doc2").is_ok()); // This looks buggy + assert!(get_doc(&db_cblite, "doc1").is_ok()); + assert!(get_doc(&db_cblite, "doc2").is_ok()); // This looks buggy - change_channel(&mut db, "doc1", "channel2"); + change_channel(&mut db_cblite, "doc1", "channel2"); std::thread::sleep(std::time::Duration::from_secs(10)); - assert!(get_doc(&db, "doc1").is_err()); + assert!(get_doc(&db_cblite, "doc1").is_err()); repl.stop(None); } -fn create_doc(db: &mut Database, id: &str, channel: &str) { +fn create_doc(db_cblite: &mut Database, id: &str, channel: &str) { let mut doc = Document::new_with_id(id); doc.set_properties_as_json( &serde_json::json!({ @@ -52,7 +52,7 @@ fn create_doc(db: &mut Database, id: &str, channel: &str) { .to_string(), ) .unwrap(); - db.save_document(&mut doc).unwrap(); + db_cblite.save_document(&mut doc).unwrap(); println!( "Created doc {id} with content: {}", @@ -60,24 +60,24 @@ fn create_doc(db: &mut Database, id: &str, channel: &str) { ); } -fn get_doc(db: &Database, id: &str) -> Result { - db.get_document(id) +fn get_doc(db_cblite: &Database, id: &str) -> Result { + db_cblite.get_document(id) } -fn change_channel(db: &mut Database, id: &str, channel: &str) { - let mut doc = get_doc(db, id).unwrap(); +fn change_channel(db_cblite: &mut Database, id: &str, channel: &str) { + let mut doc = get_doc(db_cblite, id).unwrap(); let mut prop = doc.mutable_properties(); prop.at("channels").put_string(channel); - let _ = db.save_document(&mut doc); + let _ = db_cblite.save_document(&mut doc); println!( "Changed doc {id} with content: {}", doc.properties_as_json() ); } -fn setup_replicator(db: Database, session_token: String) -> Replicator { +fn setup_replicator(db_cblite: Database, session_token: String) -> Replicator { let repl_conf = ReplicatorConfiguration { - database: Some(db.clone()), + database: Some(db_cblite.clone()), endpoint: Endpoint::new_with_url(SYNC_GW_URL).unwrap(), replicator_type: ReplicatorType::PushAndPull, continuous: true, diff --git a/examples/tombstone_purge_test.rs b/examples/tombstone_purge_test.rs new file mode 100644 index 0000000..a86e525 --- /dev/null +++ b/examples/tombstone_purge_test.rs @@ -0,0 +1,298 @@ +mod utils; + +use couchbase_lite::*; +use std::path::Path; +use utils::*; + +#[allow(deprecated)] +fn main() { + println!("=== Tombstone Purge Test (FULL - 1 hour) ==="); + println!("This test validates complete tombstone purge following Thomas's recommendation."); + println!("Total runtime: ~65-70 minutes\n"); + + // SETUP: Check git status + println!("SETUP: Checking git status..."); + let git_info = match check_git_status() { + Ok(info) => { + println!("✓ Git status clean (commit: {})\n", info.commit_short_sha); + info + } + Err(e) => { + eprintln!("✗ Git check failed:\n{}", e); + eprintln!("\nPlease commit changes before running this test."); + std::process::exit(1); + } + }; + + // SETUP: Rebuild Docker environment + println!("SETUP: Rebuilding Docker environment with correct configuration..."); + if let Err(e) = ensure_clean_environment() { + eprintln!("✗ Docker setup failed: {}", e); + std::process::exit(1); + } + + // SETUP: Initialize test reporter + let mut reporter = match TestReporter::new("tombstone_purge_test_full", git_info) { + Ok(r) => r, + Err(e) => { + eprintln!("✗ Failed to initialize reporter: {}", e); + std::process::exit(1); + } + }; + + // SETUP: Verify CBS configuration + reporter.log("SETUP: Verifying CBS metadata purge interval configuration..."); + get_metadata_purge_interval(); + reporter.log(""); + + let mut db_cblite = Database::open( + "tombstone_test_full", + Some(DatabaseConfiguration { + directory: Path::new("./"), + #[cfg(feature = "enterprise")] + encryption_key: None, + }), + ) + .unwrap(); + + // Setup user with access to channel1 only + add_or_update_user("test_user", vec!["channel1".into()]); + let session_token = get_session("test_user"); + reporter.log(&format!("Sync gateway session token: {session_token}\n")); + + // Setup replicator with auto-purge enabled + let mut repl = setup_replicator(db_cblite.clone(), session_token) + .add_document_listener(Box::new(doc_listener)); + + repl.start(false); + std::thread::sleep(std::time::Duration::from_secs(3)); + + // STEP 1: Create document in channel1 and replicate + reporter.log("STEP 1: Creating doc1 in channel1..."); + create_doc(&mut db_cblite, "doc1", "channel1"); + std::thread::sleep(std::time::Duration::from_secs(5)); + + assert!(get_doc(&db_cblite, "doc1").is_ok()); + let state1 = get_sync_xattr("doc1"); + reporter.checkpoint( + "STEP_1_CREATED", + state1, + vec!["Document created in channel1 and replicated".to_string()], + ); + reporter.log("✓ doc1 created and replicated\n"); + + // STEP 2: Delete doc1 (creating a tombstone) + reporter.log("STEP 2: Deleting doc1 (creating tombstone)..."); + let mut doc1 = get_doc(&db_cblite, "doc1").unwrap(); + db_cblite.delete_document(&mut doc1).unwrap(); + std::thread::sleep(std::time::Duration::from_secs(5)); + + let state2 = get_sync_xattr("doc1"); + reporter.checkpoint( + "STEP_2_DELETED", + state2, + vec!["Document deleted, tombstone created".to_string()], + ); + reporter.log("✓ doc1 deleted locally\n"); + + // STEP 3: Purge tombstone from SGW + reporter.log("STEP 3: Purging tombstone from SGW..."); + let mut notes3 = vec![]; + if let Some(tombstone_rev) = get_doc_rev("doc1") { + purge_doc_from_sgw("doc1", &tombstone_rev); + notes3.push(format!( + "Tombstone purged from SGW (rev: {})", + tombstone_rev + )); + reporter.log(&format!( + "✓ Tombstone purged from SGW (rev: {tombstone_rev})\n" + )); + } else { + notes3.push("Could not get tombstone revision from SGW (404)".to_string()); + notes3.push("Tombstone may not exist in SGW or was auto-purged".to_string()); + reporter.log("⚠ Could not get tombstone revision from SGW"); + reporter + .log(" This is not blocking - tombstone may not exist in SGW or was auto-purged\n"); + } + reporter.checkpoint("STEP_3_SGW_PURGE_ATTEMPTED", None, notes3); + + // STEP 4: CBS metadata purge interval should already be configured at bucket creation + reporter.log("STEP 4: CBS metadata purge interval configuration..."); + reporter.log(" Purge interval was set to 0.04 days (1 hour) at bucket creation."); + reporter.log(" This ensures tombstones created now are eligible for purge after 1 hour.\n"); + + let state4 = get_sync_xattr("doc1"); + reporter.checkpoint( + "STEP_4_BEFORE_WAIT", + state4, + vec![ + "Tombstone state before waiting for purge interval".to_string(), + "Purge interval: 0.04 days (1 hour)".to_string(), + ], + ); + + // Check doc in CBS before waiting + reporter.log("Checking doc1 in CBS before wait..."); + check_doc_in_cbs("doc1"); + reporter.log(""); + + // STEP 5: Wait for purge interval + margin + reporter.log("STEP 5: Waiting 65 minutes for tombstone to be eligible for purge..."); + reporter.log("This is the minimum time required by CBS to purge tombstones."); + reporter.log("Progress updates every 5 minutes:\n"); + + let start_time = std::time::Instant::now(); + for minute in 1..=65 { + if minute % 5 == 0 || minute == 1 || minute == 65 { + let elapsed = start_time.elapsed().as_secs() / 60; + let remaining = 65 - minute; + reporter.log(&format!( + " [{minute}/65] {elapsed} minutes elapsed, {remaining} minutes remaining..." + )); + } + std::thread::sleep(std::time::Duration::from_secs(60)); + } + reporter.log("✓ Wait complete (65 minutes elapsed)\n"); + + // STEP 6: Compact CBS bucket + reporter.log("STEP 6: Compacting CBS bucket..."); + compact_cbs_bucket(); + std::thread::sleep(std::time::Duration::from_secs(5)); + reporter.log("✓ CBS compaction triggered\n"); + + // STEP 7: Compact SGW database + reporter.log("STEP 7: Compacting SGW database..."); + compact_sgw_database(); + std::thread::sleep(std::time::Duration::from_secs(5)); + reporter.log("✓ SGW compaction complete\n"); + + // STEP 8: Check if tombstone still exists in CBS + reporter.log("STEP 8: Checking if tombstone exists in CBS..."); + check_doc_in_cbs("doc1"); + let state8 = get_sync_xattr("doc1"); + let notes8 = if state8 + .as_ref() + .and_then(|s| s.get("flags")) + .and_then(|f| f.as_i64()) + == Some(1) + { + vec!["Tombstone still present after compaction".to_string()] + } else if state8.is_none() { + vec!["Tombstone successfully purged from CBS".to_string()] + } else { + vec!["Document is live (unexpected state)".to_string()] + }; + reporter.checkpoint("STEP_8_AFTER_COMPACTION", state8, notes8); + reporter.log(" If tombstone was purged, the query should return no results.\n"); + + // STEP 9: Re-create doc1 and verify it's treated as new + reporter.log("STEP 9: Re-creating doc1 with same ID..."); + create_doc(&mut db_cblite, "doc1", "channel1"); + std::thread::sleep(std::time::Duration::from_secs(10)); + + let state9 = get_sync_xattr("doc1"); + let notes9 = vec!["Document re-created after tombstone purge test".to_string()]; + reporter.checkpoint("STEP_9_RECREATED", state9, notes9); + + // Verify doc exists locally + if get_doc(&db_cblite, "doc1").is_ok() { + reporter.log("✓ doc1 re-created successfully"); + reporter.log("\n=== CRITICAL CHECK ==="); + reporter.log("Review the replication logs above:"); + reporter.log(" - flags=0: Document treated as NEW (tombstone successfully purged) ✓"); + reporter.log(" - flags=1: Document recognized as deleted (tombstone still exists) ✗"); + reporter.log("======================\n"); + } else { + reporter.log("✗ doc1 could not be re-created\n"); + } + + // Check final state in CBS + reporter.log("Final CBS state:"); + check_doc_in_cbs("doc1"); + + repl.stop(None); + + reporter.log("\n=== Test complete ==="); + reporter.log(&format!( + "Total runtime: ~{} minutes", + start_time.elapsed().as_secs() / 60 + )); + + // Generate report + if let Err(e) = reporter.finalize() { + eprintln!("⚠ Failed to generate report: {}", e); + } +} + +#[allow(deprecated)] +fn create_doc(db_cblite: &mut Database, id: &str, channel: &str) { + let mut doc = Document::new_with_id(id); + doc.set_properties_as_json( + &serde_json::json!({ + "channels": channel, + "test_data": "tombstone purge test", + "timestamp": std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + }) + .to_string(), + ) + .unwrap(); + db_cblite.save_document(&mut doc).unwrap(); + + println!( + " Created doc {id} with content: {}", + doc.properties_as_json() + ); +} + +#[allow(deprecated)] +fn get_doc(db_cblite: &Database, id: &str) -> Result { + db_cblite.get_document(id) +} + +fn setup_replicator(db_cblite: Database, session_token: String) -> Replicator { + let repl_conf = ReplicatorConfiguration { + database: Some(db_cblite.clone()), + endpoint: Endpoint::new_with_url(SYNC_GW_URL).unwrap(), + replicator_type: ReplicatorType::PushAndPull, + continuous: true, + disable_auto_purge: false, // Auto-purge ENABLED + max_attempts: 3, + max_attempt_wait_time: 1, + heartbeat: 60, + authenticator: None, + proxy: None, + headers: vec![( + "Cookie".to_string(), + format!("SyncGatewaySession={session_token}"), + )] + .into_iter() + .collect(), + pinned_server_certificate: None, + trusted_root_certificates: None, + channels: MutableArray::default(), + document_ids: MutableArray::default(), + collections: None, + accept_parent_domain_cookies: false, + #[cfg(feature = "enterprise")] + accept_only_self_signed_server_certificate: false, + }; + let repl_context = ReplicationConfigurationContext::default(); + Replicator::new(repl_conf, Box::new(repl_context)).unwrap() +} + +fn doc_listener(direction: Direction, documents: Vec) { + println!("=== Document(s) replicated ==="); + println!("Direction: {direction:?}"); + for document in documents { + println!("Document: {document:?}"); + if document.flags == 1 { + println!(" ⚠ flags=1 - Document recognized as deleted/tombstone"); + } else if document.flags == 0 { + println!(" ✓ flags=0 - Document treated as new"); + } + } + println!("===\n"); +} diff --git a/examples/tombstone_quick_check.rs b/examples/tombstone_quick_check.rs new file mode 100644 index 0000000..f9c8bd3 --- /dev/null +++ b/examples/tombstone_quick_check.rs @@ -0,0 +1,144 @@ +mod utils; + +use couchbase_lite::*; +use std::path::Path; +use utils::*; + +#[allow(deprecated)] +fn main() { + println!("=== Tombstone Quick Check (30 seconds) ==="); + println!("This is a rapid validation test for tombstone detection via XATTRs.\n"); + + let mut db_cblite = Database::open( + "tombstone_quick_check", + Some(DatabaseConfiguration { + directory: Path::new("./"), + #[cfg(feature = "enterprise")] + encryption_key: None, + }), + ) + .unwrap(); + + // Setup user with access to channel1 only + add_or_update_user("quick_test_user", vec!["channel1".into()]); + let session_token = get_session("quick_test_user"); + println!("Session token: {session_token}\n"); + + // Setup replicator with auto-purge enabled + let mut repl = setup_replicator(db_cblite.clone(), session_token) + .add_document_listener(Box::new(doc_listener)); + + repl.start(false); + std::thread::sleep(std::time::Duration::from_secs(3)); + + println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"); + println!("TEST 1: Create document and check CBS state"); + println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"); + + create_doc(&mut db_cblite, "quick_doc", "channel1"); + std::thread::sleep(std::time::Duration::from_secs(3)); + + println!("\n📊 CBS State after creation:"); + check_doc_in_cbs("quick_doc"); + println!("✓ Expected: Document exists as LIVE document\n"); + + println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"); + println!("TEST 2: Delete document and check CBS state"); + println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"); + + let mut doc = db_cblite.get_document("quick_doc").unwrap(); + db_cblite.delete_document(&mut doc).unwrap(); + println!("Document deleted locally"); + std::thread::sleep(std::time::Duration::from_secs(3)); + + println!("\n📊 CBS State after deletion:"); + check_doc_in_cbs("quick_doc"); + println!("✓ Expected: Document exists as TOMBSTONE (deleted: true)\n"); + + println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"); + println!("TEST 3: Re-create document and check CBS state"); + println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"); + + create_doc(&mut db_cblite, "quick_doc", "channel1"); + std::thread::sleep(std::time::Duration::from_secs(3)); + + println!("\n📊 CBS State after re-creation:"); + check_doc_in_cbs("quick_doc"); + println!("✓ Expected: Document exists as LIVE document\n"); + + println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"); + println!("TEST 4: Check replication flags"); + println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"); + + println!("Review the replication logs above:"); + println!(" - Initial creation: should have flags=0 (new)"); + println!(" - After deletion: should have flags=1 (deleted)"); + println!(" - After re-creation: should have flags=0 (new) ✓\n"); + + repl.stop(None); + println!("=== Quick check complete ==="); +} + +#[allow(deprecated)] +fn create_doc(db_cblite: &mut Database, id: &str, channel: &str) { + let mut doc = Document::new_with_id(id); + doc.set_properties_as_json( + &serde_json::json!({ + "channels": channel, + "test_data": "quick check", + "timestamp": std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + }) + .to_string(), + ) + .unwrap(); + db_cblite.save_document(&mut doc).unwrap(); + println!(" Created doc {id}"); +} + +fn setup_replicator(db_cblite: Database, session_token: String) -> Replicator { + let repl_conf = ReplicatorConfiguration { + database: Some(db_cblite.clone()), + endpoint: Endpoint::new_with_url(SYNC_GW_URL).unwrap(), + replicator_type: ReplicatorType::PushAndPull, + continuous: true, + disable_auto_purge: false, + max_attempts: 3, + max_attempt_wait_time: 1, + heartbeat: 60, + authenticator: None, + proxy: None, + headers: vec![( + "Cookie".to_string(), + format!("SyncGatewaySession={session_token}"), + )] + .into_iter() + .collect(), + pinned_server_certificate: None, + trusted_root_certificates: None, + channels: MutableArray::default(), + document_ids: MutableArray::default(), + collections: None, + accept_parent_domain_cookies: false, + #[cfg(feature = "enterprise")] + accept_only_self_signed_server_certificate: false, + }; + let repl_context = ReplicationConfigurationContext::default(); + Replicator::new(repl_conf, Box::new(repl_context)).unwrap() +} + +fn doc_listener(direction: Direction, documents: Vec) { + for document in documents { + let flag_meaning = match document.flags { + 0 => "NEW", + 1 => "DELETED", + _ => "OTHER", + }; + println!( + " 📡 Replicated [{:?}]: {} (flags={} - {})", + direction, document.id, document.flags, flag_meaning + ); + } +} diff --git a/examples/utils/cbs_admin.rs b/examples/utils/cbs_admin.rs index 4b8e3ed..a4a003d 100644 --- a/examples/utils/cbs_admin.rs +++ b/examples/utils/cbs_admin.rs @@ -40,25 +40,247 @@ pub fn compact_cbs_bucket() { } } +pub fn get_sync_xattr(doc_id: &str) -> Option { + let url = "http://localhost:8093/query/service"; + let query = format!( + "SELECT META().xattrs._sync as sync_metadata FROM `{CBS_BUCKET}` USE KEYS ['{doc_id}']" + ); + let body = serde_json::json!({"statement": query}); + + let response = reqwest::blocking::Client::new() + .post(url) + .basic_auth(CBS_ADMIN_USER, Some(CBS_ADMIN_PWD)) + .json(&body) + .send() + .ok()?; + + let text = response.text().ok()?; + let json: serde_json::Value = serde_json::from_str(&text).ok()?; + + json.get("results")? + .as_array()? + .first()? + .get("sync_metadata") + .cloned() +} + pub fn check_doc_in_cbs(doc_id: &str) { - let url = format!("{CBS_URL}:8093/query/service"); + // Use port 8093 for Query service (not 8091 which is admin/REST API) + // Query XATTRs to see tombstones in shared bucket access mode + // The _sync xattr contains Sync Gateway metadata including deleted status + // + // WARNING: Querying _sync xattr directly is UNSUPPORTED in production per Sync Gateway docs + // This is only for testing/debugging purposes. The _sync structure can change between versions. + // Reference: https://docs.couchbase.com/sync-gateway/current/shared-bucket-access.html + let url = "http://localhost:8093/query/service"; + + // Query the entire _sync xattr to see its structure + // This helps debug what fields are actually available let query = format!( - "SELECT META().id, META().deleted FROM `{CBS_BUCKET}` WHERE META().id = '{doc_id}'" + "SELECT META().id, META().xattrs._sync as sync_metadata FROM `{CBS_BUCKET}` USE KEYS ['{doc_id}']" ); let body = serde_json::json!({"statement": query}); let response = reqwest::blocking::Client::new() - .post(&url) + .post(url) .basic_auth(CBS_ADMIN_USER, Some(CBS_ADMIN_PWD)) .json(&body) .send(); match response { Ok(resp) => { + let status = resp.status(); if let Ok(text) = resp.text() { - println!("CBS check for {doc_id}: {text}"); + // Parse the response to show results more clearly + if let Ok(json) = serde_json::from_str::(&text) { + if let Some(results) = json["results"].as_array() { + if results.is_empty() { + println!( + "CBS check for {doc_id}: ✓ Document not found (completely purged)" + ); + } else { + println!("CBS check for {doc_id}: Found {} result(s)", results.len()); + for result in results { + // Display the full sync_metadata to understand its structure + if let Some(sync_meta) = result.get("sync_metadata") { + if sync_meta.is_null() { + println!( + " ⚠ sync_metadata is NULL - may lack permissions to read system xattrs" + ); + println!( + " 💡 System xattrs (starting with _) may require special RBAC roles" + ); + } else { + println!(" 📦 Full _sync xattr content:"); + println!( + "{}", + serde_json::to_string_pretty(sync_meta).unwrap() + ); + + // Detect tombstone status from _sync.flags field + // flags == 1 indicates a deleted/tombstone document + // Other indicators: tombstoned_at field, channels.*.del == true + let flags = sync_meta + .get("flags") + .and_then(|v| v.as_i64()) + .unwrap_or(0); + + let has_tombstoned_at = + sync_meta.get("tombstoned_at").is_some(); + + let is_tombstone = flags == 1 || has_tombstoned_at; + + if is_tombstone { + println!("\n ✓ Document is TOMBSTONE"); + println!(" - flags: {}", flags); + if has_tombstoned_at { + println!( + " - tombstoned_at: {}", + sync_meta["tombstoned_at"] + ); + } + } else { + println!("\n ✓ Document is LIVE"); + println!(" - flags: {}", flags); + } + } + } else { + println!(" ⚠ No sync_metadata field in result"); + println!( + " Full result: {}", + serde_json::to_string_pretty(result).unwrap() + ); + } + } + } + } else { + println!("CBS check for {doc_id}: status={status}, response={text}"); + } + } else { + println!("CBS check for {doc_id}: status={status}, response={text}"); + } + } else { + println!("CBS check for {doc_id}: status={status}, could not read response"); } } Err(e) => println!("CBS check error: {e}"), } } + +pub fn get_metadata_purge_interval() { + let url = format!("{CBS_URL}/pools/default/buckets/{CBS_BUCKET}"); + + let response = reqwest::blocking::Client::new() + .get(&url) + .basic_auth(CBS_ADMIN_USER, Some(CBS_ADMIN_PWD)) + .send(); + + match response { + Ok(resp) => { + let status = resp.status(); + if let Ok(text) = resp.text() { + if let Ok(json) = serde_json::from_str::(&text) { + // Search for purgeInterval in multiple possible locations + let locations = vec![ + ( + "autoCompactionSettings.purgeInterval", + json.get("autoCompactionSettings") + .and_then(|a| a.get("purgeInterval")), + ), + ("purgeInterval", json.get("purgeInterval")), + ]; + + let mut found = false; + for (path, value) in locations { + if let Some(purge_interval) = value { + println!( + "✓ CBS metadata purge interval (at {path}): {}", + purge_interval + ); + if let Some(days) = purge_interval.as_f64() { + println!( + " = {days} days (~{:.1} hours, ~{:.0} minutes)", + days * 24.0, + days * 24.0 * 60.0 + ); + } + found = true; + break; + } + } + + if !found { + println!("⚠ purgeInterval not found in bucket config"); + if let Some(auto_compact) = json.get("autoCompactionSettings") { + println!(" autoCompactionSettings content:"); + println!(" {}", serde_json::to_string_pretty(auto_compact).unwrap()); + } + println!("\n Searching for 'purge' related fields..."); + if let Some(obj) = json.as_object() { + for (key, value) in obj { + if key.to_lowercase().contains("purge") { + println!(" Found: {} = {}", key, value); + } + } + } + } + } else { + println!("Get metadata purge interval: status={status}, could not parse JSON"); + } + } else { + println!("Get metadata purge interval: status={status}, could not read response"); + } + } + Err(e) => println!("Get metadata purge interval error: {e}"), + } +} + +pub fn set_metadata_purge_interval(days: f64) { + const MIN_PURGE_INTERVAL_DAYS: f64 = 0.04; // 1 hour minimum per CBS spec + + if days < MIN_PURGE_INTERVAL_DAYS { + println!( + "⚠ Warning: CBS metadata purge interval minimum is {MIN_PURGE_INTERVAL_DAYS} days (1 hour)." + ); + println!( + " Requested: {days} days (~{:.1} minutes)", + days * 24.0 * 60.0 + ); + println!(" CBS may not enforce purge before the minimum interval."); + println!(" Proceeding with requested value for testing purposes...\n"); + } + + let url = format!("{CBS_URL}/pools/default/buckets/{CBS_BUCKET}"); + + // IMPORTANT: Must set autoCompactionDefined=true to enable per-bucket override + // parallelDBAndViewCompaction is also required by the API + let params = [ + ("autoCompactionDefined", "true"), + ("purgeInterval", &days.to_string()), + ("parallelDBAndViewCompaction", "false"), + ]; + + let response = reqwest::blocking::Client::new() + .post(&url) + .basic_auth(CBS_ADMIN_USER, Some(CBS_ADMIN_PWD)) + .form(¶ms) + .send(); + + match response { + Ok(resp) => { + let status = resp.status(); + if let Ok(body) = resp.text() { + println!( + "Set metadata purge interval to {days} days: status={status}, body={body}" + ); + } else { + println!("Set metadata purge interval to {days} days: status={status}"); + } + } + Err(e) => println!("Set metadata purge interval error: {e}"), + } + + // Verify the setting was applied + println!("\nVerifying configuration:"); + get_metadata_purge_interval(); +} diff --git a/examples/utils/docker_manager.rs b/examples/utils/docker_manager.rs new file mode 100644 index 0000000..5501bc1 --- /dev/null +++ b/examples/utils/docker_manager.rs @@ -0,0 +1,205 @@ +use std::path::Path; +use std::process::{Command, Stdio}; +use std::thread; +use std::time::Duration; + +const DOCKER_CONF_DIR: &str = "examples/docker-conf"; +const MAX_WAIT_SECONDS: u64 = 120; + +pub fn ensure_clean_environment() -> Result<(), String> { + println!("🐳 Managing Docker environment...\n"); + + // Check if docker and docker compose are available + check_docker_available()?; + + // Navigate to docker-conf directory + let docker_dir = Path::new(DOCKER_CONF_DIR); + if !docker_dir.exists() { + return Err(format!( + "Docker configuration directory not found: {}", + DOCKER_CONF_DIR + )); + } + + // Stop and remove containers + volumes + println!(" [1/4] Stopping and removing existing containers..."); + stop_containers()?; + + // Build/pull images + println!(" [2/4] Building/pulling Docker images..."); + build_images()?; + + // Start containers + println!(" [3/4] Starting containers..."); + start_containers()?; + + // Wait for services to be healthy + println!(" [4/4] Waiting for services to be healthy..."); + wait_for_healthy_services()?; + + // Verify timezone synchronization + println!(" [5/5] Verifying timezone synchronization..."); + verify_timezone_sync()?; + + println!("✓ Docker environment ready\n"); + Ok(()) +} + +fn check_docker_available() -> Result<(), String> { + // Check docker + let docker_check = Command::new("docker").arg("--version").output(); + + if docker_check.is_err() { + return Err( + "Docker is not installed or not available in PATH. Please install Docker.".to_string(), + ); + } + + // Check docker compose + let compose_check = Command::new("docker").args(["compose", "version"]).output(); + + if compose_check.is_err() { + return Err("Docker Compose is not available. Please install Docker Compose.".to_string()); + } + + Ok(()) +} + +fn stop_containers() -> Result<(), String> { + let output = Command::new("docker") + .args(["compose", "down", "-v"]) + .current_dir(DOCKER_CONF_DIR) + .output() + .map_err(|e| format!("Failed to stop containers: {}", e))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + // Don't fail if containers weren't running + if !stderr.contains("No such container") && !stderr.is_empty() { + eprintln!("Warning: docker compose down had errors: {}", stderr); + } + } + + Ok(()) +} + +fn build_images() -> Result<(), String> { + let output = Command::new("docker") + .args(["compose", "build"]) + .current_dir(DOCKER_CONF_DIR) + .stdout(Stdio::null()) // Suppress verbose build output + .stderr(Stdio::inherit()) + .output() + .map_err(|e| format!("Failed to build images: {}", e))?; + + if !output.status.success() { + return Err("Docker compose build failed".to_string()); + } + + Ok(()) +} + +fn start_containers() -> Result<(), String> { + let output = Command::new("docker") + .args(["compose", "up", "-d"]) + .current_dir(DOCKER_CONF_DIR) + .output() + .map_err(|e| format!("Failed to start containers: {}", e))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(format!("Docker compose up failed: {}", stderr)); + } + + Ok(()) +} + +fn wait_for_healthy_services() -> Result<(), String> { + let start = std::time::Instant::now(); + + loop { + if start.elapsed().as_secs() > MAX_WAIT_SECONDS { + return Err(format!( + "Services did not become healthy within {} seconds", + MAX_WAIT_SECONDS + )); + } + + // Check if Sync Gateway is responding + let sgw_ready = reqwest::blocking::get("http://localhost:4985") + .map(|r| r.status().is_success()) + .unwrap_or(false); + + // Check if CBS is responding + let cbs_ready = reqwest::blocking::get("http://localhost:8091") + .map(|r| r.status().is_success()) + .unwrap_or(false); + + if sgw_ready && cbs_ready { + // Give extra time for full initialization + thread::sleep(Duration::from_secs(5)); + return Ok(()); + } + + thread::sleep(Duration::from_secs(2)); + } +} + +pub fn get_docker_logs(service_name: &str, output_path: &Path) -> Result<(), String> { + let output = Command::new("docker") + .args(["compose", "logs", "--no-color", service_name]) + .current_dir(DOCKER_CONF_DIR) + .output() + .map_err(|e| format!("Failed to get logs for {}: {}", service_name, e))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(format!("Failed to get logs: {}", stderr)); + } + + std::fs::write(output_path, &output.stdout) + .map_err(|e| format!("Failed to write logs to file: {}", e))?; + + Ok(()) +} + +fn verify_timezone_sync() -> Result<(), String> { + // Get local timezone + let local_tz = std::env::var("TZ").unwrap_or_else(|_| { + // Try to get system timezone + let output = Command::new("date").arg("+%Z").output(); + + if let Ok(output) = output { + String::from_utf8_lossy(&output.stdout).trim().to_string() + } else { + "UTC".to_string() + } + }); + + println!(" Local timezone: {}", local_tz); + + // Check SGW container timezone + let sgw_date = Command::new("docker") + .args(["compose", "exec", "-T", "cblr-sync-gateway", "date", "+%Z"]) + .current_dir(DOCKER_CONF_DIR) + .output(); + + if let Ok(output) = sgw_date { + let container_tz = String::from_utf8_lossy(&output.stdout).trim().to_string(); + println!(" Sync Gateway timezone: {}", container_tz); + + if container_tz.is_empty() { + println!(" ⚠ Warning: Could not determine container timezone"); + } + } else { + println!( + " ⚠ Warning: Could not check container timezone (containers may still be starting)" + ); + } + + // Note: We don't fail on timezone mismatch, just log it + // The TZ environment variable should be passed through docker-compose.yml + println!(" 💡 Tip: Set TZ environment variable before docker compose up to sync timezones"); + + Ok(()) +} diff --git a/examples/utils/git_checker.rs b/examples/utils/git_checker.rs new file mode 100644 index 0000000..61a6c28 --- /dev/null +++ b/examples/utils/git_checker.rs @@ -0,0 +1,67 @@ +use std::process::Command; + +#[derive(Debug)] +pub struct GitInfo { + pub commit_sha: String, + pub commit_short_sha: String, + pub branch: String, +} + +pub fn check_git_status() -> Result { + // Check if git is available + let git_available = Command::new("git").arg("--version").output().is_ok(); + + if !git_available { + return Err("Git is not installed or not available in PATH".to_string()); + } + + // Check for uncommitted changes + let status_output = Command::new("git") + .args(["status", "--porcelain"]) + .output() + .map_err(|e| format!("Failed to run git status: {}", e))?; + + let status_str = String::from_utf8_lossy(&status_output.stdout); + if !status_str.trim().is_empty() { + return Err(format!( + "Git working directory has uncommitted changes:\n{}\n\nPlease commit or stash changes before running the test.", + status_str + )); + } + + // Get commit SHA + let sha_output = Command::new("git") + .args(["rev-parse", "HEAD"]) + .output() + .map_err(|e| format!("Failed to get commit SHA: {}", e))?; + + let commit_sha = String::from_utf8_lossy(&sha_output.stdout) + .trim() + .to_string(); + + // Get short SHA + let short_sha_output = Command::new("git") + .args(["rev-parse", "--short", "HEAD"]) + .output() + .map_err(|e| format!("Failed to get short commit SHA: {}", e))?; + + let commit_short_sha = String::from_utf8_lossy(&short_sha_output.stdout) + .trim() + .to_string(); + + // Get current branch + let branch_output = Command::new("git") + .args(["rev-parse", "--abbrev-ref", "HEAD"]) + .output() + .map_err(|e| format!("Failed to get branch name: {}", e))?; + + let branch = String::from_utf8_lossy(&branch_output.stdout) + .trim() + .to_string(); + + Ok(GitInfo { + commit_sha, + commit_short_sha, + branch, + }) +} diff --git a/examples/utils/mod.rs b/examples/utils/mod.rs index cff434c..06e91a9 100644 --- a/examples/utils/mod.rs +++ b/examples/utils/mod.rs @@ -1,8 +1,20 @@ +#[allow(dead_code)] pub mod cbs_admin; +#[allow(dead_code)] pub mod constants; +#[allow(dead_code)] +pub mod docker_manager; +#[allow(dead_code)] +pub mod git_checker; +#[allow(dead_code)] pub mod sgw_admin; +#[allow(dead_code)] +pub mod test_reporter; // Re-export commonly used functions +pub use cbs_admin::*; pub use constants::*; +pub use docker_manager::*; +pub use git_checker::*; pub use sgw_admin::*; -pub use cbs_admin::*; +pub use test_reporter::*; diff --git a/examples/utils/sgw_admin.rs b/examples/utils/sgw_admin.rs index 16b3142..fdbbe1b 100644 --- a/examples/utils/sgw_admin.rs +++ b/examples/utils/sgw_admin.rs @@ -31,18 +31,22 @@ pub fn get_session(name: &str) -> String { } pub fn get_doc_rev(doc_id: &str) -> Option { - let url = format!("{SYNC_GW_URL_ADMIN}/{doc_id}"); + // Try to get the document, including deleted/tombstone versions + let url = format!("{SYNC_GW_URL_ADMIN}/{doc_id}?deleted=true"); let result = reqwest::blocking::Client::new().get(&url).send(); match result { Ok(response) => { - println!("Get doc revision result: {response:?}"); - if response.status().is_success() { + let status = response.status(); + println!("Get doc revision result: status={status}"); + if status.is_success() { let json: serde_json::Value = response.json().unwrap(); let rev = json["_rev"].as_str().unwrap().to_string(); - println!("get_doc_rev for {doc_id}: found rev {rev}"); + let is_deleted = json["_deleted"].as_bool().unwrap_or(false); + println!("get_doc_rev for {doc_id}: found rev {rev} (deleted: {is_deleted})"); Some(rev) } else { + println!("get_doc_rev for {doc_id}: status {status}, document not found"); None } } @@ -146,3 +150,69 @@ pub fn compact_sgw_database() { Err(e) => println!("Compact SGW database error: {e}"), } } + +pub fn delete_doc_from_central(doc_id: &str) -> bool { + if let Some(rev) = get_doc_rev(doc_id) { + let url = format!("{SYNC_GW_URL_ADMIN}/{doc_id}?rev={rev}"); + let response = reqwest::blocking::Client::new().delete(&url).send(); + + match response { + Ok(resp) => { + let status = resp.status(); + if status.is_success() { + println!("✓ Deleted {doc_id} from central (rev: {rev})"); + return true; + } else { + println!("✗ Failed to delete {doc_id} from central: status={status}"); + return false; + } + } + Err(e) => { + println!("✗ Error deleting {doc_id} from central: {e}"); + return false; + } + } + } else { + println!("✗ Could not get revision for {doc_id} to delete from central"); + false + } +} + +pub fn check_doc_exists_in_central(doc_id: &str) -> bool { + let url = format!("{SYNC_GW_URL_ADMIN}/{doc_id}"); + let result = reqwest::blocking::Client::new().get(&url).send(); + + match result { + Ok(response) => { + let status = response.status(); + if status.is_success() { + if let Ok(json) = response.json::() { + let is_deleted = json + .get("_deleted") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + if is_deleted { + println!(" Document {doc_id} exists in central as DELETED/TOMBSTONE"); + false + } else { + println!(" ✓ Document {doc_id} exists in central as LIVE"); + true + } + } else { + println!(" Document {doc_id}: status={status}, could not parse JSON"); + false + } + } else if status.as_u16() == 404 { + println!(" ✓ Document {doc_id} NOT found in central (purged or never existed)"); + false + } else { + println!(" Document {doc_id}: unexpected status={status}"); + false + } + } + Err(e) => { + println!(" Error checking {doc_id} in central: {e}"); + false + } + } +} diff --git a/examples/utils/test_reporter.rs b/examples/utils/test_reporter.rs new file mode 100644 index 0000000..1c8f718 --- /dev/null +++ b/examples/utils/test_reporter.rs @@ -0,0 +1,264 @@ +use crate::utils::docker_manager; +use crate::utils::git_checker::GitInfo; +use serde::{Deserialize, Serialize}; +use std::fs; +use std::io::Write; +use std::path::PathBuf; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; + +const GITHUB_REPO_URL: &str = "https://github.com/doctolib/couchbase-lite-rust"; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Checkpoint { + pub step: String, + pub timestamp: String, + pub elapsed_seconds: u64, + pub tombstone_state: Option, + pub notes: Vec, +} + +pub struct TestReporter { + run_dir: PathBuf, + start_time: Instant, + start_timestamp: String, + git_info: GitInfo, + test_name: String, + checkpoints: Vec, + console_output: Vec, +} + +impl TestReporter { + pub fn new(test_name: &str, git_info: GitInfo) -> Result { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + let start_timestamp = chrono::DateTime::from_timestamp(timestamp as i64, 0) + .ok_or("Invalid timestamp")? + .format("%Y-%m-%d_%H-%M-%S") + .to_string(); + + let run_dir_name = format!("test_run_{}_{}", start_timestamp, git_info.commit_short_sha); + + let run_dir = PathBuf::from("test_results").join(run_dir_name); + + // Create directory + fs::create_dir_all(&run_dir) + .map_err(|e| format!("Failed to create test results directory: {}", e))?; + + println!("📊 Test results will be saved to: {}\n", run_dir.display()); + + Ok(Self { + run_dir, + start_time: Instant::now(), + start_timestamp, + git_info, + test_name: test_name.to_string(), + checkpoints: Vec::new(), + console_output: Vec::new(), + }) + } + + pub fn checkpoint( + &mut self, + step: &str, + tombstone_state: Option, + notes: Vec, + ) { + let elapsed = self.start_time.elapsed().as_secs(); + let timestamp = chrono::DateTime::from_timestamp( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() as i64, + 0, + ) + .unwrap() + .format("%Y-%m-%d %H:%M:%S") + .to_string(); + + let checkpoint = Checkpoint { + step: step.to_string(), + timestamp, + elapsed_seconds: elapsed, + tombstone_state, + notes, + }; + + self.checkpoints.push(checkpoint); + } + + pub fn log(&mut self, message: &str) { + self.console_output.push(message.to_string()); + println!("{}", message); + } + + pub fn finalize(&self) -> Result<(), String> { + println!("\n📝 Generating test report..."); + + // Generate all report files + self.generate_metadata()?; + self.generate_readme()?; + self.generate_tombstone_states()?; + self.generate_test_output()?; + self.extract_docker_logs()?; + + println!("✓ Test report generated in: {}\n", self.run_dir.display()); + println!("📂 Report contents:"); + println!(" - README.md: Executive summary"); + println!(" - metadata.json: Test metadata and environment"); + println!(" - tombstone_states.json: Tombstone xattr at each checkpoint"); + println!(" - test_output.log: Complete console output"); + println!(" - cbs_logs.log: Couchbase Server logs"); + println!(" - sgw_logs.log: Sync Gateway logs"); + + Ok(()) + } + + fn generate_metadata(&self) -> Result<(), String> { + let metadata = serde_json::json!({ + "test_name": self.test_name, + "start_time": self.start_timestamp, + "duration_seconds": self.start_time.elapsed().as_secs(), + "git": { + "commit_sha": self.git_info.commit_sha, + "commit_short_sha": self.git_info.commit_short_sha, + "branch": self.git_info.branch, + "github_link": format!("{}/tree/{}", GITHUB_REPO_URL, self.git_info.commit_sha), + }, + "environment": { + "couchbase_server": "7.x", + "sync_gateway": "4.0.0 EE", + "couchbase_lite": "3.2.3", + "enable_shared_bucket_access": true, + }, + }); + + let metadata_path = self.run_dir.join("metadata.json"); + let json = serde_json::to_string_pretty(&metadata) + .map_err(|e| format!("Failed to serialize metadata: {}", e))?; + + fs::write(&metadata_path, json) + .map_err(|e| format!("Failed to write metadata.json: {}", e))?; + + Ok(()) + } + + fn generate_readme(&self) -> Result<(), String> { + let github_link = format!("{}/tree/{}", GITHUB_REPO_URL, self.git_info.commit_sha); + + let mut readme = String::new(); + readme.push_str(&format!("# Test Run: {}\n\n", self.test_name)); + readme.push_str(&format!("**Date**: {}\n", self.start_timestamp)); + readme.push_str(&format!( + "**Duration**: {} seconds (~{} minutes)\n\n", + self.start_time.elapsed().as_secs(), + self.start_time.elapsed().as_secs() / 60 + )); + + readme.push_str("## Environment\n\n"); + readme.push_str(&format!( + "- **Commit**: {} ([view on GitHub]({}))\n", + self.git_info.commit_short_sha, github_link + )); + readme.push_str(&format!("- **Branch**: {}\n", self.git_info.branch)); + readme.push_str("- **Couchbase Server**: 7.x\n"); + readme.push_str("- **Sync Gateway**: 4.0.0 EE\n"); + readme.push_str("- **Couchbase Lite**: 3.2.3 (Rust)\n"); + readme.push_str("- **enable_shared_bucket_access**: true\n\n"); + + readme.push_str("## Test Checkpoints\n\n"); + for checkpoint in &self.checkpoints { + readme.push_str(&format!( + "### {} ({}s elapsed)\n", + checkpoint.step, checkpoint.elapsed_seconds + )); + readme.push_str(&format!("**Time**: {}\n\n", checkpoint.timestamp)); + + if let Some(ref state) = checkpoint.tombstone_state { + let flags = state.get("flags").and_then(|f| f.as_i64()); + let tombstoned_at = state.get("tombstoned_at"); + + match flags { + Some(1) => { + readme.push_str("**Status**: 🪦 TOMBSTONE\n"); + readme.push_str(&format!("- `flags`: 1\n")); + if let Some(ts) = tombstoned_at { + readme.push_str(&format!("- `tombstoned_at`: {}\n", ts)); + } + } + Some(0) | None => { + readme.push_str("**Status**: ✅ LIVE DOCUMENT\n"); + readme.push_str(&format!("- `flags`: {:?}\n", flags.unwrap_or(0))); + } + _ => { + readme.push_str(&format!("**Status**: ❓ UNKNOWN (flags: {:?})\n", flags)); + } + } + } else { + readme.push_str("**Status**: Document not found or not queried\n"); + } + + if !checkpoint.notes.is_empty() { + readme.push_str("\n**Notes**:\n"); + for note in &checkpoint.notes { + readme.push_str(&format!("- {}\n", note)); + } + } + + readme.push_str("\n"); + } + + readme.push_str("## Files in This Report\n\n"); + readme.push_str("- `README.md`: This file - executive summary\n"); + readme.push_str("- `metadata.json`: Test metadata (commit, timestamp, environment)\n"); + readme.push_str("- `tombstone_states.json`: Full _sync xattr content at each checkpoint\n"); + readme.push_str("- `test_output.log`: Complete console output from the test\n"); + readme.push_str("- `cbs_logs.log`: Couchbase Server container logs\n"); + readme.push_str("- `sgw_logs.log`: Sync Gateway container logs\n"); + + let readme_path = self.run_dir.join("README.md"); + fs::write(&readme_path, readme).map_err(|e| format!("Failed to write README.md: {}", e))?; + + Ok(()) + } + + fn generate_tombstone_states(&self) -> Result<(), String> { + let states_path = self.run_dir.join("tombstone_states.json"); + let json = serde_json::to_string_pretty(&self.checkpoints) + .map_err(|e| format!("Failed to serialize checkpoints: {}", e))?; + + fs::write(&states_path, json) + .map_err(|e| format!("Failed to write tombstone_states.json: {}", e))?; + + Ok(()) + } + + fn generate_test_output(&self) -> Result<(), String> { + let output_path = self.run_dir.join("test_output.log"); + let mut file = fs::File::create(&output_path) + .map_err(|e| format!("Failed to create test_output.log: {}", e))?; + + for line in &self.console_output { + writeln!(file, "{}", line) + .map_err(|e| format!("Failed to write to test_output.log: {}", e))?; + } + + Ok(()) + } + + fn extract_docker_logs(&self) -> Result<(), String> { + println!(" Extracting Docker logs..."); + + // CBS logs + let cbs_logs_path = self.run_dir.join("cbs_logs.log"); + docker_manager::get_docker_logs("cblr-couchbase-server", &cbs_logs_path)?; + + // SGW logs + let sgw_logs_path = self.run_dir.join("sgw_logs.log"); + docker_manager::get_docker_logs("cblr-sync-gateway", &sgw_logs_path)?; + + Ok(()) + } +}