Skip to content

Commit f3da090

Browse files
committed
persist: ensure batches have correct schema IDs
1 parent 535dab7 commit f3da090

File tree

1 file changed

+42
-4
lines changed

1 file changed

+42
-4
lines changed

src/persist-client/src/write.rs

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,10 @@ where
557557
where
558558
D: Send + Sync,
559559
{
560-
for batch in batches.iter() {
560+
// Before we append any data, we require a registered write schema.
561+
let schema_id = self.ensure_schema_registered().await;
562+
563+
for batch in batches.iter_mut() {
561564
if self.machine.shard_id() != batch.shard_id() {
562565
return Err(InvalidUsage::BatchNotFromThisShard {
563566
batch_shard: batch.shard_id(),
@@ -574,10 +577,12 @@ where
574577
TODO: Error on very old versions once the leaked blob detector exists."
575578
)
576579
}
577-
}
578580

579-
// Before we append any data, we require a registered write schema.
580-
self.ensure_schema_registered().await;
581+
// The batch may have been written by a writer without a registered schema.
582+
// Ensure we have a schema ID in the batch metadata before we append, to avoid type
583+
// confusion later.
584+
ensure_batch_schema(batch, schema_id);
585+
}
581586

582587
let lower = expected_upper.clone();
583588
let upper = new_upper;
@@ -1091,6 +1096,39 @@ impl<K: Codec, V: Codec, T, D> Drop for WriteHandle<K, V, T, D> {
10911096
}
10921097
}
10931098

1099+
/// Ensure the given batch uses the given schema ID.
1100+
///
1101+
/// If the batch has no schema set, initialize it to the given one.
1102+
/// If the batch has a schema set, assert that it matches the given one.
1103+
fn ensure_batch_schema<K, V, T, D>(batch: &mut Batch<K, V, T, D>, schema_id: SchemaId)
1104+
where
1105+
K: Debug + Codec,
1106+
V: Debug + Codec,
1107+
T: Timestamp + Lattice + Codec64,
1108+
D: Semigroup + Codec64,
1109+
{
1110+
let shard_id = batch.shard_id();
1111+
let ensure = |id: &mut Option<SchemaId>| match id {
1112+
Some(id) => assert_eq!(*id, schema_id, "schema ID mismatch; shard={shard_id}"),
1113+
None => *id = Some(schema_id),
1114+
};
1115+
1116+
for run_meta in &mut batch.batch.run_meta {
1117+
ensure(&mut run_meta.schema);
1118+
}
1119+
for part in &mut batch.batch.parts {
1120+
match part {
1121+
RunPart::Single(BatchPart::Hollow(part)) => ensure(&mut part.schema_id),
1122+
RunPart::Single(BatchPart::Inline { schema_id, .. }) => ensure(schema_id),
1123+
RunPart::Many(_hollow_run_ref) => {
1124+
// TODO: Fetch the parts in this run and rewrite them too. Alternatively, make
1125+
// `run_meta` the only place we keep schema IDs, so rewriting parts isn't
1126+
// necessary.
1127+
}
1128+
}
1129+
}
1130+
}
1131+
10941132
#[cfg(test)]
10951133
mod tests {
10961134
use std::str::FromStr;

0 commit comments

Comments
 (0)