Skip to content

Commit cea6a7f

Browse files
committed
Shift the version from State to StateCollections
1 parent dfbd1b0 commit cea6a7f

File tree

5 files changed

+26
-28
lines changed

5 files changed

+26
-28
lines changed

src/persist-client/src/cli/admin.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -590,9 +590,9 @@ where
590590
// code with this logic.
591591
let safe_version_change = match (commit, expected_version) {
592592
// We never actually write out state changes, so increasing the version is okay.
593-
(false, _) => cfg.build_version >= state.applier_version,
593+
(false, _) => cfg.build_version >= state.collections.version,
594594
// If the versions match that's okay because any commits won't change it.
595-
(true, None) => cfg.build_version == state.applier_version,
595+
(true, None) => cfg.build_version == state.collections.version,
596596
// !!DANGER ZONE!!
597597
(true, Some(expected)) => {
598598
// If we're not _extremely_ careful, the persistcli could make shards unreadable by
@@ -602,15 +602,15 @@ where
602602
// We only allow a mismatch in version if we provided the expected version to the
603603
// command, and the expected version is less than the current build, which
604604
// indicates this is an old shard.
605-
state.applier_version == expected && expected <= cfg.build_version
605+
state.collections.version == expected && expected <= cfg.build_version
606606
}
607607
};
608608
if !safe_version_change {
609609
// We could add a flag to override this check, if that comes up.
610610
return Err(anyhow!(
611611
"version of this tool {} does not match version of state {} when --commit is {commit}. bailing so we don't corrupt anything",
612612
cfg.build_version,
613-
state.applier_version
613+
state.collections.version
614614
));
615615
}
616616
break;

src/persist-client/src/internal/encoding.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -736,7 +736,7 @@ impl<T: Timestamp + Lattice + Codec64> UntypedState<T> {
736736
let state = Rollup::from_proto(proto)
737737
.expect("internal error: invalid encoded state")
738738
.state;
739-
check_data_version(build_version, &state.state.applier_version);
739+
check_data_version(build_version, &state.state.collections.version);
740740
state
741741
}
742742
}
@@ -864,7 +864,7 @@ impl RustType<ProtoInlinedDiffs> for InlinedDiffs {
864864
impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
865865
fn into_proto(&self) -> ProtoRollup {
866866
ProtoRollup {
867-
applier_version: self.state.state.applier_version.to_string(),
867+
applier_version: self.state.state.collections.version.to_string(),
868868
shard_id: self.state.state.shard_id.into_proto(),
869869
seqno: self.state.state.seqno.into_proto(),
870870
walltime_ms: self.state.state.walltime_ms.into_proto(),
@@ -972,6 +972,7 @@ impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
972972
.transpose()?;
973973
let active_gc = x.active_gc.map(|gc| gc.into_rust()).transpose()?;
974974
let collections = StateCollections {
975+
version: applier_version.clone(),
975976
rollups,
976977
active_rollup,
977978
active_gc,
@@ -983,7 +984,6 @@ impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
983984
trace: x.trace.into_rust_if_some("trace")?,
984985
};
985986
let state = State {
986-
applier_version,
987987
shard_id: x.shard_id.into_rust()?,
988988
seqno: x.seqno.into_rust()?,
989989
walltime_ms: x.walltime_ms,

src/persist-client/src/internal/state.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,6 +1304,11 @@ pub struct NoOpStateTransition<T>(pub T);
13041304
#[derive(Debug, Clone)]
13051305
#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
13061306
pub struct StateCollections<T> {
1307+
/// The version of this state. This is typically identical to the version of the code
1308+
/// that wrote it, but may diverge during 0dt upgrades and similar operations when a
1309+
/// new version of code is intentionally interoperating with an older state format.
1310+
pub(crate) version: Version,
1311+
13071312
// - Invariant: `<= all reader.since`
13081313
// - Invariant: Doesn't regress across state versions.
13091314
pub(crate) last_gc_req: SeqNo,
@@ -2221,10 +2226,6 @@ where
22212226
#[derive(Debug)]
22222227
#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
22232228
pub struct State<T> {
2224-
/// The version of this state. This is typically identical to the version of the code
2225-
/// that wrote it, but may diverge during 0dt upgrades and similar operations when a
2226-
/// new version of code is intentionally interoperating with an older state format.
2227-
pub(crate) applier_version: semver::Version,
22282229
pub(crate) shard_id: ShardId,
22292230

22302231
pub(crate) seqno: SeqNo,
@@ -2254,10 +2255,9 @@ pub struct TypedState<K, V, T, D> {
22542255

22552256
impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
22562257
#[cfg(any(test, debug_assertions))]
2257-
pub(crate) fn clone(&self, applier_version: Version, hostname: String) -> Self {
2258+
pub(crate) fn clone(&self, hostname: String) -> Self {
22582259
TypedState {
22592260
state: State {
2260-
applier_version,
22612261
shard_id: self.shard_id.clone(),
22622262
seqno: self.seqno.clone(),
22632263
walltime_ms: self.walltime_ms,
@@ -2271,7 +2271,6 @@ impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
22712271
pub(crate) fn clone_for_rollup(&self) -> Self {
22722272
TypedState {
22732273
state: State {
2274-
applier_version: self.applier_version.clone(),
22752274
shard_id: self.shard_id.clone(),
22762275
seqno: self.seqno.clone(),
22772276
walltime_ms: self.walltime_ms,
@@ -2338,12 +2337,12 @@ where
23382337
walltime_ms: u64,
23392338
) -> Self {
23402339
let state = State {
2341-
applier_version,
23422340
shard_id,
23432341
seqno: SeqNo::minimum(),
23442342
walltime_ms,
23452343
hostname,
23462344
collections: StateCollections {
2345+
version: applier_version,
23472346
last_gc_req: SeqNo::minimum(),
23482347
rollups: BTreeMap::new(),
23492348
active_rollup: None,
@@ -2373,15 +2372,16 @@ where
23732372
// each version of state with the _max_ version of code that has ever
23742373
// contributed to it. Otherwise, we'd erroneously allow rolling back an
23752374
// arbitrary number of versions if they were done one-by-one.
2376-
let new_applier_version = std::cmp::max(&self.applier_version, &cfg.build_version);
2375+
let new_applier_version = std::cmp::max(&self.collections.version, &cfg.build_version);
23772376
let mut new_state = State {
2378-
applier_version: new_applier_version.clone(),
23792377
shard_id: self.shard_id,
23802378
seqno: self.seqno.next(),
23812379
walltime_ms: (cfg.now)(),
23822380
hostname: cfg.hostname.clone(),
23832381
collections: self.collections.clone(),
23842382
};
2383+
new_state.collections.version = new_applier_version.clone();
2384+
23852385
// Make sure walltime_ms is strictly increasing, in case clocks are
23862386
// offset.
23872387
if new_state.walltime_ms <= self.walltime_ms {
@@ -2752,13 +2752,13 @@ fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::
27522752
impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
27532753
fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
27542754
let State {
2755-
applier_version,
27562755
shard_id,
27572756
seqno,
27582757
walltime_ms,
27592758
hostname,
27602759
collections:
27612760
StateCollections {
2761+
version: applier_version,
27622762
last_gc_req,
27632763
rollups,
27642764
active_rollup,
@@ -3140,12 +3140,12 @@ pub(crate) mod tests {
31403140
(shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
31413141
(active_gc, leased_readers, critical_readers, writers, schemas, trace),
31423142
)| State {
3143-
applier_version: semver::Version::new(1, 2, 3),
31443143
shard_id,
31453144
seqno,
31463145
walltime_ms,
31473146
hostname,
31483147
collections: StateCollections {
3148+
version: Version::new(1, 2, 3),
31493149
last_gc_req,
31503150
rollups,
31513151
active_rollup,

src/persist-client/src/internal/state_diff.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -146,13 +146,13 @@ impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
146146
// Deconstruct from and to so we get a compile failure if new
147147
// fields are added.
148148
let State {
149-
applier_version: _,
150149
shard_id: from_shard_id,
151150
seqno: from_seqno,
152151
hostname: from_hostname,
153152
walltime_ms: _, // Intentionally unused
154153
collections:
155154
StateCollections {
155+
version: _,
156156
last_gc_req: from_last_gc_req,
157157
rollups: from_rollups,
158158
active_rollup: from_active_rollup,
@@ -165,13 +165,13 @@ impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
165165
},
166166
} = from;
167167
let State {
168-
applier_version: to_applier_version,
169168
shard_id: to_shard_id,
170169
seqno: to_seqno,
171170
walltime_ms: to_walltime_ms,
172171
hostname: to_hostname,
173172
collections:
174173
StateCollections {
174+
version: to_applier_version,
175175
last_gc_req: to_last_gc_req,
176176
rollups: to_rollups,
177177
active_rollup: to_active_rollup,
@@ -319,10 +319,7 @@ impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
319319

320320
use crate::internal::state::ProtoStateDiff;
321321

322-
let mut roundtrip_state = from_state.clone(
323-
from_state.applier_version.clone(),
324-
from_state.hostname.clone(),
325-
);
322+
let mut roundtrip_state = from_state.clone(from_state.hostname.clone());
326323
roundtrip_state.apply_diff(metrics, diff.clone())?;
327324

328325
if &roundtrip_state != to_state {
@@ -397,7 +394,8 @@ impl<T: Timestamp + Lattice + Codec64> State<T> {
397394
// issues that may arise from diff application. We pass along the original
398395
// Bytes it decoded from just so we can decode in this error path, while
399396
// avoiding any extraneous clones in the expected Ok path.
400-
let diff = StateDiff::<T>::decode(&self.applier_version, data);
397+
// FIXME: this passes the state version but the method requires the build version.
398+
let diff = StateDiff::<T>::decode(&self.collections.version, data);
401399
panic!(
402400
"state diff should apply cleanly: {} diff {:?} state {:?}",
403401
err, diff, self
@@ -446,7 +444,6 @@ impl<T: Timestamp + Lattice + Codec64> State<T> {
446444
));
447445
}
448446
self.seqno = diff_seqno_to;
449-
self.applier_version = diff_applier_version;
450447
self.walltime_ms = diff_walltime_ms;
451448
force_apply_diffs_single(
452449
&self.shard_id,
@@ -460,6 +457,7 @@ impl<T: Timestamp + Lattice + Codec64> State<T> {
460457
// Deconstruct collections so we get a compile failure if new fields are
461458
// added.
462459
let StateCollections {
460+
version,
463461
last_gc_req,
464462
rollups,
465463
active_rollup,
@@ -471,6 +469,7 @@ impl<T: Timestamp + Lattice + Codec64> State<T> {
471469
trace,
472470
} = &mut self.collections;
473471

472+
*version = diff_applier_version;
474473
apply_diffs_map("rollups", diff_rollups, rollups)?;
475474
apply_diffs_single("last_gc_req", diff_last_gc_req, last_gc_req)?;
476475
apply_diffs_single_option("active_rollup", diff_active_rollup, active_rollup)?;

src/persist-client/src/internal/state_versions.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1074,7 +1074,6 @@ impl<T: Timestamp + Lattice + Codec64> StateVersionsIter<T> {
10741074
pub fn into_rollup_proto_without_diffs(&self) -> impl serde::Serialize + use<T> {
10751075
Rollup::from_state_without_diffs(
10761076
State {
1077-
applier_version: self.state.applier_version.clone(),
10781077
shard_id: self.state.shard_id.clone(),
10791078
seqno: self.state.seqno.clone(),
10801079
walltime_ms: self.state.walltime_ms.clone(),

0 commit comments

Comments
 (0)