Skip to content

Commit e64e1ac

Browse files
committed
Explicit upgrades for special shards
1 parent eea3e59 commit e64e1ac

File tree

9 files changed

+45
-10
lines changed

9 files changed

+45
-10
lines changed

src/adapter/src/catalog/open.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ impl Catalog {
589589
);
590590
}
591591

592-
catalog.storage().await.mark_bootstrap_complete();
592+
catalog.storage().await.mark_bootstrap_complete().await;
593593

594594
Ok(OpenCatalogResult {
595595
catalog,

src/adapter/src/catalog/open/builtin_item_migration.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ async fn migrate_builtin_collections_incompatible(
357357
shard_id,
358358
Arc::new(TableKeySchema),
359359
Arc::new(ShardIdSchema),
360-
diagnostics,
360+
diagnostics.clone(),
361361
USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
362362
)
363363
.await
@@ -550,6 +550,10 @@ async fn migrate_builtin_collections_incompatible(
550550
error!("Unable to remove old entries from migration shard: {e:?}");
551551
}
552552
}
553+
persist_client
554+
.upgrade_version::<TableKey, ShardId, Timestamp, StorageDiff>(shard_id, diagnostics)
555+
.await
556+
.expect("valid usage");
553557
}
554558
}
555559
.boxed();

src/catalog/src/durable.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
290290
fn is_savepoint(&self) -> bool;
291291

292292
/// Marks the bootstrap process as complete.
293-
fn mark_bootstrap_complete(&mut self);
293+
async fn mark_bootstrap_complete(&mut self);
294294

295295
/// Creates a new durable catalog state transaction.
296296
async fn transaction(&mut self) -> Result<Transaction, CatalogError>;

src/catalog/src/durable/persist.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1299,6 +1299,7 @@ impl UnopenedPersistCatalogState {
12991299
catalog
13001300
.increment_catalog_upgrade_shard_version(self.update_applier.organization_id)
13011301
.await;
1302+
13021303
let write_handle = catalog
13031304
.persist_client
13041305
.open_writer::<SourceData, (), Timestamp, i64>(
@@ -1711,8 +1712,14 @@ impl DurableCatalogState for PersistCatalogState {
17111712
matches!(self.mode, Mode::Savepoint)
17121713
}
17131714

1714-
fn mark_bootstrap_complete(&mut self) {
1715+
async fn mark_bootstrap_complete(&mut self) {
17151716
self.bootstrap_complete = true;
1717+
if matches!(self.mode, Mode::Writable) {
1718+
self.since_handle
1719+
.upgrade_version()
1720+
.await
1721+
.expect("invalid usage")
1722+
}
17161723
}
17171724

17181725
#[mz_ore::instrument(level = "debug")]

src/catalog/src/expr_cache.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,11 @@ impl ExpressionCache {
252252
.collect();
253253
self.durable_cache.try_set_many(&keys_to_remove).await?;
254254

255+
if remove_prior_versions {
256+
// We've purged old versions from the cache; upgrade the backing Persist version as well.
257+
self.durable_cache.upgrade_version().await;
258+
}
259+
255260
if compact_shard {
256261
let fuel = EXPRESSION_CACHE_FORCE_COMPACTION_FUEL.handle(dyncfgs);
257262
let wait = EXPRESSION_CACHE_FORCE_COMPACTION_WAIT.handle(dyncfgs);

src/durable-cache/src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,15 @@ impl<C: DurableCacheCodec> DurableCache<C> {
390390
)
391391
.await
392392
}
393+
394+
/// Upgrade the version associated with the backing shard. This should only be done once
395+
/// we've durably upgraded to the new version.
396+
pub async fn upgrade_version(&self) {
397+
self.since_handle
398+
.upgrade_version()
399+
.await
400+
.expect("invalid usage")
401+
}
393402
}
394403

395404
#[cfg(test)]

src/ore/src/process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
#[macro_export]
4242
macro_rules! halt {
4343
($($arg:expr),* $(,)?) => {{
44-
$crate::__private::tracing::warn!("halting process: {}", format!($($arg),*));
44+
$crate::__private::tracing::warn!("halting process: {}\n{:?}", format!($($arg),*), ::std::backtrace::Backtrace::capture());
4545
$crate::process::exit_thread_safe(166);
4646
}}
4747
}

src/persist-client/src/critical.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use serde::{Deserialize, Serialize};
2323
use timely::progress::{Antichain, Timestamp};
2424
use uuid::Uuid;
2525

26+
use crate::error::InvalidUsage;
2627
use crate::internal::machine::Machine;
2728
use crate::internal::state::Since;
2829
use crate::stats::SnapshotStats;
@@ -324,6 +325,19 @@ where
324325
}
325326
}
326327

328+
/// Upgrade the version associated with this shard, applying any state migrations. This
329+
/// is irrevocable and will fence out old versions: it should only be run only once Materialize
330+
/// has fully committed to the new version.
331+
pub async fn upgrade_version(&self) -> Result<(), InvalidUsage<T>> {
332+
match self.machine.upgrade_version().await {
333+
Ok(maintenance) => {
334+
let () = maintenance.perform(&self.machine, &self.gc).await;
335+
Ok(())
336+
}
337+
Err(version) => Err(InvalidUsage::IncompatibleVersion { version }),
338+
}
339+
}
340+
327341
// Expiry temporarily removed.
328342
// If you'd like to stop this handle from holding back the since of the shard,
329343
// downgrade it to [].

src/persist-client/src/internal/encoding.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,11 +206,7 @@ pub(crate) fn assert_code_can_read_data(code_version: &Version, data_version: &V
206206
if !cfg::code_can_read_data(code_version, data_version) {
207207
// We can't catch halts, so panic in test, so we can get unit test
208208
// coverage.
209-
if cfg!(test) {
210-
panic!("code at version {code_version} cannot read data with version {data_version}");
211-
} else {
212-
halt!("code at version {code_version} cannot read data with version {data_version}");
213-
}
209+
panic!("code at version {code_version} cannot read data with version {data_version}");
214210
}
215211
}
216212

0 commit comments

Comments
 (0)