diff --git a/src/persist-client/src/cli/admin.rs b/src/persist-client/src/cli/admin.rs index e35018f109657..8d32cba8604fa 100644 --- a/src/persist-client/src/cli/admin.rs +++ b/src/persist-client/src/cli/admin.rs @@ -231,7 +231,6 @@ pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> { Arc::clone(&pubsub_sender), )); - // We need a PersistClient to open a write handle so we can append an empty batch. let persist_client = PersistClient::new( cfg, blob, @@ -259,21 +258,7 @@ pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> { diagnostics, ) .await?; - - if !write_handle.upper().is_empty() { - let empty_batch: Vec<( - (crate::cli::inspect::K, crate::cli::inspect::V), - u64, - i64, - )> = vec![]; - let lower = write_handle.upper().clone(); - let upper = Antichain::new(); - - let result = write_handle.append(empty_batch, lower, upper).await?; - if let Err(err) = result { - anyhow::bail!("failed to force downgrade upper, {err:?}"); - } - } + write_handle.advance_upper(&Antichain::new()).await; } if force_downgrade_since { diff --git a/src/persist-client/src/internal/apply.rs b/src/persist-client/src/internal/apply.rs index ad22a43f397eb..163a9d554c201 100644 --- a/src/persist-client/src/internal/apply.rs +++ b/src/persist-client/src/internal/apply.rs @@ -254,6 +254,22 @@ where }) } + /// Returns the ID of the given schema, if known at the current state. + pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option { + self.state + .read_lock(&self.metrics.locks.applier_read_cacheable, |state| { + // The common case is that the requested schema is a recent one, so as a minor + // optimization, do this search in reverse order. + let mut schemas = state.collections.schemas.iter().rev(); + schemas + .find(|(_, x)| { + K::decode_schema(&x.key) == *key_schema + && V::decode_schema(&x.val) == *val_schema + }) + .map(|(id, _)| *id) + }) + } + /// Returns whether the current's state `since` and `upper` are both empty. /// /// Due to sharing state with other handles, successive reads to this fn or any other may diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index 782f6702c4f72..d15f43f6d33df 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -65,8 +65,6 @@ use crate::{PersistConfig, ShardId, WriterId, cfg}; /// A key and value `Schema` of data written to a batch or shard. #[derive(Debug)] pub struct Schemas { - // TODO: Remove the Option once this finishes rolling out and all shards - // have a registered schema. /// Id under which this schema is registered in the shard's schema registry, /// if any. pub id: Option, diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index d8220ccb76d63..562804acdcb76 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -700,6 +700,11 @@ where self.applier.latest_schema() } + /// Returns the ID of the given schema, if known at the current state. + pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option { + self.applier.find_schema(key_schema, val_schema) + } + /// See [crate::PersistClient::compare_and_evolve_schema]. /// /// TODO: Unify this with [Self::register_schema]? diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index d1b8ee3823f8c..a454461a18559 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -24,7 +24,7 @@ use differential_dataflow::lattice::Lattice; use itertools::Itertools; use mz_build_info::{BuildInfo, build_info}; use mz_dyncfg::ConfigSet; -use mz_ore::{instrument, soft_assert_or_log}; +use mz_ore::instrument; use mz_persist::location::{Blob, Consensus, ExternalError}; use mz_persist_types::schema::SchemaId; use mz_persist_types::{Codec, Codec64, Opaque}; @@ -490,10 +490,6 @@ impl PersistClient { /// /// Use this to save latency and a bit of persist traffic if you're just /// going to immediately drop or expire the [ReadHandle]. - /// - /// The `_schema` parameter is currently unused, but should be an object - /// that represents the schema of the data in the shard. This will be required - /// in the future. #[instrument(level = "debug", fields(shard = %shard_id))] pub async fn open_writer( &self, @@ -511,23 +507,11 @@ impl PersistClient { let machine = self.make_machine(shard_id, diagnostics.clone()).await?; let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime)); - // TODO: Because schemas are ordered, as part of the persist schema - // changes work, we probably want to build some way to allow persist - // users to control the order. For example, maybe a - // `PersistClient::compare_and_append_schema(current_schema_id, - // next_schema)`. Presumably this would then be passed in to open_writer - // instead of us implicitly registering it here. - // NB: The overwhelming common case is that this schema is already - // registered. In this case, the cmd breaks early and nothing is - // written to (or read from) CRDB. - let (schema_id, maintenance) = machine.register_schema(&*key_schema, &*val_schema).await; - maintenance.start_performing(&machine, &gc); - soft_assert_or_log!( - schema_id.is_some(), - "unable to register schemas {:?} {:?}", - key_schema, - val_schema, - ); + // We defer registering the schema until write time, to allow opening + // write handles in a "read-only" mode where they don't implicitly + // modify persist state. But it might already be registered, in which + // case we can fetch its ID. + let schema_id = machine.find_schema(&*key_schema, &*val_schema); let writer_id = WriterId::new(); let schemas = Schemas { @@ -1992,7 +1976,6 @@ mod tests { #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented async fn finalize_empty_shard(dyncfgs: ConfigUpdates) { - const EMPTY: &[(((), ()), u64, i64)] = &[]; let persist_client = new_test_client(&dyncfgs).await; let shard_id = ShardId::new(); @@ -2006,11 +1989,7 @@ mod tests { // Advance since and upper to empty, which is a pre-requisite for // finalization/tombstoning. let () = read.downgrade_since(&Antichain::new()).await; - let () = write - .compare_and_append(EMPTY, Antichain::from_elem(0), Antichain::new()) - .await - .expect("usage should be valid") - .expect("upper should match"); + let () = write.advance_upper(&Antichain::new()).await; let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests()) @@ -2047,7 +2026,6 @@ mod tests { #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented async fn finalize_shard(dyncfgs: ConfigUpdates) { - const EMPTY: &[(((), ()), u64, i64)] = &[]; const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)]; let persist_client = new_test_client(&dyncfgs).await; @@ -2069,11 +2047,7 @@ mod tests { // Advance since and upper to empty, which is a pre-requisite for // finalization/tombstoning. let () = read.downgrade_since(&Antichain::new()).await; - let () = write - .compare_and_append(EMPTY, Antichain::from_elem(1), Antichain::new()) - .await - .expect("usage should be valid") - .expect("upper should match"); + let () = write.advance_upper(&Antichain::new()).await; let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests()) diff --git a/src/persist-client/src/schema.rs b/src/persist-client/src/schema.rs index c7780bee1601b..9db5b7dc40c70 100644 --- a/src/persist-client/src/schema.rs +++ b/src/persist-client/src/schema.rs @@ -605,7 +605,7 @@ mod tests { let schema0 = StringsSchema(vec![false]); let schema1 = StringsSchema(vec![false, true]); - let write0 = client + let mut write0 = client .open_writer::( shard_id, Arc::new(schema0.clone()), @@ -614,6 +614,8 @@ mod tests { ) .await .unwrap(); + + write0.ensure_schema_registered().await; assert_eq!(write0.write_schemas.id.unwrap(), SchemaId(0)); // Not backward compatible (yet... we don't support dropping a column at diff --git a/src/persist-client/src/write.rs b/src/persist-client/src/write.rs index 157bcf37218f5..b07b6a17a0590 100644 --- a/src/persist-client/src/write.rs +++ b/src/persist-client/src/write.rs @@ -221,6 +221,31 @@ where self.write_schemas.id } + /// Registers the write schema, if it isn't already registered. + /// + /// # Panics + /// + /// This method expects that either the shard doesn't yet have any schema registered, or one of + /// the registered schemas is the same as the write schema. If all registered schemas are + /// different from the write schema, it panics. + pub async fn ensure_schema_registered(&mut self) -> SchemaId { + let Schemas { id, key, val } = &self.write_schemas; + + if let Some(id) = id { + return *id; + } + + let (schema_id, maintenance) = self.machine.register_schema(key, val).await; + maintenance.start_performing(&self.machine, &self.gc); + + let Some(schema_id) = schema_id else { + panic!("unable to register schemas: {key:?} {val:?}"); + }; + + self.write_schemas.id = Some(schema_id); + schema_id + } + /// A cached version of the shard-global `upper` frontier. /// /// This is the most recent upper discovered by this handle. It is @@ -256,6 +281,50 @@ where &self.upper } + /// Advance the shard's upper by the given frontier. + /// + /// If the provided `target` is less than or equal to the shard's upper, this is a no-op. + /// + /// In contrast to the various compare-and-append methods, this method does not require the + /// handle's write schema to be registered with the shard. That is, it is fine to use a dummy + /// schema when creating a writer just to advance a shard upper. + pub async fn advance_upper(&mut self, target: &Antichain) { + // We avoid `fetch_recent_upper` here, to avoid a consensus roundtrip if the known upper is + // already beyond the target. + let mut lower = self.shared_upper().clone(); + + while !PartialOrder::less_equal(target, &lower) { + let since = Antichain::from_elem(T::minimum()); + let desc = Description::new(lower.clone(), target.clone(), since); + let batch = HollowBatch::empty(desc); + + let heartbeat_timestamp = (self.cfg.now)(); + let res = self + .machine + .compare_and_append( + &batch, + &self.writer_id, + &self.debug_state, + heartbeat_timestamp, + ) + .await; + + use CompareAndAppendRes::*; + let new_upper = match res { + Success(_seq_no, maintenance) => { + maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref()); + batch.desc.upper().clone() + } + UpperMismatch(_seq_no, actual_upper) => actual_upper, + InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"), + InlineBackpressure => unreachable!("batch was empty"), + }; + + self.upper.clone_from(&new_upper); + lower = new_upper; + } + } + /// Applies `updates` to this shard and downgrades this handle's upper to /// `upper`. /// @@ -488,6 +557,9 @@ where where D: Send + Sync, { + // Before we append any data, we require a registered write schema. + let schema_id = self.ensure_schema_registered().await; + for batch in batches.iter() { if self.machine.shard_id() != batch.shard_id() { return Err(InvalidUsage::BatchNotFromThisShard { @@ -642,8 +714,12 @@ where } } - let combined_batch = + let mut combined_batch = HollowBatch::new(desc.clone(), parts, num_updates, run_metas, run_splits); + // The batch may have been written by a writer without a registered schema. + // Ensure we have a schema ID in the batch metadata before we append, to avoid type + // confusion later. + ensure_batch_schema(&mut combined_batch, self.shard_id(), schema_id); let heartbeat_timestamp = (self.cfg.now)(); let res = self .machine @@ -1019,6 +1095,35 @@ impl Drop for WriteHandle { } } +/// Ensure the given batch uses the given schema ID. +/// +/// If the batch has no schema set, initialize it to the given one. +/// If the batch has a schema set, assert that it matches the given one. +fn ensure_batch_schema(batch: &mut HollowBatch, shard_id: ShardId, schema_id: SchemaId) +where + T: Timestamp + Lattice + Codec64, +{ + let ensure = |id: &mut Option| match id { + Some(id) => assert_eq!(*id, schema_id, "schema ID mismatch; shard={shard_id}"), + None => *id = Some(schema_id), + }; + + for run_meta in &mut batch.run_meta { + ensure(&mut run_meta.schema); + } + for part in &mut batch.parts { + match part { + RunPart::Single(BatchPart::Hollow(part)) => ensure(&mut part.schema_id), + RunPart::Single(BatchPart::Inline { schema_id, .. }) => ensure(schema_id), + RunPart::Many(_hollow_run_ref) => { + // TODO: Fetch the parts in this run and rewrite them too. Alternatively, make + // `run_meta` the only place we keep schema IDs, so rewriting parts isn't + // necessary. + } + } + } +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index ace20dd86e326..1c1eef7c780b9 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -3248,40 +3248,23 @@ async fn finalize_shards_task( Some(shard_id) } else { debug!(%shard_id, "finalizing shard"); - let finalize = || async move { + let finalize = || async move { // TODO: thread the global ID into the shard finalization WAL let diagnostics = Diagnostics::from_purpose("finalizing shards"); - let schemas = persist_client.latest_schema::(shard_id, diagnostics.clone()).await.expect("codecs have not changed"); - let (key_schema, val_schema) = match schemas { - Some((_, key_schema, val_schema)) => (key_schema, val_schema), - None => (RelationDesc::empty(), UnitSchema), - }; - - let empty_batch: Vec<((SourceData, ()), T, StorageDiff)> = vec![]; + // We only use the writer to advance the upper, so using a dummy schema is + // fine. let mut write_handle: WriteHandle = persist_client .open_writer( shard_id, - Arc::new(key_schema), - Arc::new(val_schema), + Arc::new(RelationDesc::empty()), + Arc::new(UnitSchema), diagnostics, ) .await .expect("invalid persist usage"); - - let upper = write_handle.upper(); - - if !upper.is_empty() { - let append = write_handle - .append(empty_batch, upper.clone(), Antichain::new()) - .await?; - - if let Err(e) = append { - warn!(%shard_id, "tried to finalize a shard with an advancing upper: {e:?}"); - return Ok(()); - } - } + write_handle.advance_upper(&Antichain::new()).await; write_handle.expire().await; if force_downgrade_since { @@ -3317,9 +3300,7 @@ async fn finalize_shards_task( .compare_and_downgrade_since(&epoch, (&epoch, &new_since)) .await; if let Err(e) = downgrade { - warn!( - "tried to finalize a shard with an advancing epoch: {e:?}" - ); + warn!("tried to finalize a shard with an advancing epoch: {e:?}"); return Ok(()); } // Not available now, so finalization is broken. diff --git a/src/txn-wal/src/txns.rs b/src/txn-wal/src/txns.rs index f5666f4394d51..41108632d8307 100644 --- a/src/txn-wal/src/txns.rs +++ b/src/txn-wal/src/txns.rs @@ -214,7 +214,15 @@ where ) -> Result { let op = &Arc::clone(&self.metrics).register; op.run(async { - let data_writes = data_writes.into_iter().collect::>(); + let mut data_writes = data_writes.into_iter().collect::>(); + + // The txns system requires that all participating data shards have a + // schema registered. Importantly, we must register a data shard's + // schema _before_ we publish it to the txns shard. + for data_write in &mut data_writes { + data_write.ensure_schema_registered().await; + } + let updates = data_writes .iter() .map(|data_write| { @@ -285,13 +293,11 @@ where } } for data_write in data_writes { - let new_schema_id = data_write.schema_id(); - // If we already have a write handle for a newer version of a table, don't replace // it! Currently we only support adding columns to tables with a default value, so // the latest/newest schema will always be the most complete. // - // TODO(alter_table): Revist when we support dropping columns. + // TODO(alter_table): Revisit when we support dropping columns. match self.datas.data_write_for_commit.get(&data_write.shard_id()) { None => { self.datas @@ -299,31 +305,35 @@ where .insert(data_write.shard_id(), DataWriteCommit(data_write)); } Some(previous) => { - match (previous.schema_id(), new_schema_id) { - (Some(previous_id), None) => { - mz_ore::soft_panic_or_log!( - "tried registering a WriteHandle replacing one with a SchemaId prev_schema_id: {:?} shard_id: {:?}", - previous_id, - previous.shard_id(), - ); - }, - (Some(previous_id), Some(new_id)) if previous_id > new_id => { - mz_ore::soft_panic_or_log!( - "tried registering a WriteHandle with an older SchemaId prev_schema_id: {:?} new_schema_id: {:?} shard_id: {:?}", - previous_id, - new_id, - previous.shard_id(), - ); - }, - (previous_schema_id, new_schema_id) => { - if previous_schema_id.is_none() && new_schema_id.is_none() { - tracing::warn!("replacing WriteHandle without any SchemaIds to reason about"); - } else { - tracing::info!(?previous_schema_id, ?new_schema_id, shard_id = ?previous.shard_id(), "replacing WriteHandle"); - } - self.datas.data_write_for_commit.insert(data_write.shard_id(), DataWriteCommit(data_write)); - } + let new_schema_id = data_write.schema_id().expect("ensured above"); + + if let Some(prev_schema_id) = previous.schema_id() + && prev_schema_id > new_schema_id + { + mz_ore::soft_panic_or_log!( + "tried registering a WriteHandle with an older SchemaId; \ + prev_schema_id: {} new_schema_id: {} shard_id: {}", + prev_schema_id, + new_schema_id, + previous.shard_id(), + ); + continue; + } else if previous.schema_id().is_none() { + mz_ore::soft_panic_or_log!( + "encountered data shard without a schema; shard_id: {}", + previous.shard_id(), + ); } + + tracing::info!( + prev_schema_id = ?previous.schema_id(), + ?new_schema_id, + shard_id = %previous.shard_id(), + "replacing WriteHandle" + ); + self.datas + .data_write_for_commit + .insert(data_write.shard_id(), DataWriteCommit(data_write)); } } } @@ -756,12 +766,8 @@ where .expect("codecs have not changed"); let (key_schema, val_schema) = match schemas { Some((_, key_schema, val_schema)) => (Arc::new(key_schema), Arc::new(val_schema)), - // - For new shards we will always have at least one schema - // registered by the time we reach this point, because that - // happens at txn-registration time. - // - For pre-existing shards, every txns shard will have had - // open_writer called on it at least once in the previous release, - // so the schema should exist. + // We will always have at least one schema registered by the time we reach this point, + // because that is ensured at txn-registration time. None => unreachable!("data shard {} should have a schema", data_id), }; let wrapped = self