Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,105 @@ steps:
composition: testdrive
args: [--default-size=1, --slow]

- id: testdrive-size-1-1
label: ":racing_car: testdrive with SIZE 1"
depends_on: build-aarch64
timeout_in_minutes: 360
agents:
queue: hetzner-aarch64-4cpu-8gb
plugins:
- ./ci/plugins/mzcompose:
composition: testdrive
args: [--default-size=1, --slow]

- id: testdrive-size-1-2
label: ":racing_car: testdrive with SIZE 1"
depends_on: build-aarch64
timeout_in_minutes: 360
agents:
queue: hetzner-aarch64-4cpu-8gb
plugins:
- ./ci/plugins/mzcompose:
composition: testdrive
args: [--default-size=1, --slow]

- id: testdrive-size-1-3
label: ":racing_car: testdrive with SIZE 1"
depends_on: build-aarch64
timeout_in_minutes: 360
agents:
queue: hetzner-aarch64-4cpu-8gb
plugins:
- ./ci/plugins/mzcompose:
composition: testdrive
args: [--default-size=1, --slow]

- id: testdrive-size-1-4
label: ":racing_car: testdrive with SIZE 1"
depends_on: build-aarch64
timeout_in_minutes: 360
agents:
queue: hetzner-aarch64-4cpu-8gb
plugins:
- ./ci/plugins/mzcompose:
composition: testdrive
args: [--default-size=1, --slow]

- id: testdrive-size-1-5
label: ":racing_car: testdrive with SIZE 1"
depends_on: build-aarch64
timeout_in_minutes: 360
agents:
queue: hetzner-aarch64-4cpu-8gb
plugins:
- ./ci/plugins/mzcompose:
composition: testdrive
args: [--default-size=1, --slow]

- id: testdrive-size-1-6
label: ":racing_car: testdrive with SIZE 1"
depends_on: build-aarch64
timeout_in_minutes: 360
agents:
queue: hetzner-aarch64-4cpu-8gb
plugins:
- ./ci/plugins/mzcompose:
composition: testdrive
args: [--default-size=1, --slow]

- id: testdrive-size-1-7
label: ":racing_car: testdrive with SIZE 1"
depends_on: build-aarch64
timeout_in_minutes: 360
agents:
queue: hetzner-aarch64-4cpu-8gb
plugins:
- ./ci/plugins/mzcompose:
composition: testdrive
args: [--default-size=1, --slow]

- id: testdrive-size-1-8
label: ":racing_car: testdrive with SIZE 1"
depends_on: build-aarch64
timeout_in_minutes: 360
agents:
queue: hetzner-aarch64-4cpu-8gb
plugins:
- ./ci/plugins/mzcompose:
composition: testdrive
args: [--default-size=1, --slow]

- id: testdrive-size-1-9
label: ":racing_car: testdrive with SIZE 1"
depends_on: build-aarch64
timeout_in_minutes: 360
agents:
queue: hetzner-aarch64-4cpu-8gb
plugins:
- ./ci/plugins/mzcompose:
composition: testdrive
args: [--default-size=1, --slow]

- id: testdrive-size-8
label: ":racing_car: testdrive with SIZE 8"
depends_on: build-aarch64
Expand Down
17 changes: 1 addition & 16 deletions src/persist-client/src/cli/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
Arc::clone(&pubsub_sender),
));

// We need a PersistClient to open a write handle so we can append an empty batch.
let persist_client = PersistClient::new(
cfg,
blob,
Expand Down Expand Up @@ -259,21 +258,7 @@ pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
diagnostics,
)
.await?;

if !write_handle.upper().is_empty() {
let empty_batch: Vec<(
(crate::cli::inspect::K, crate::cli::inspect::V),
u64,
i64,
)> = vec![];
let lower = write_handle.upper().clone();
let upper = Antichain::new();

let result = write_handle.append(empty_batch, lower, upper).await?;
if let Err(err) = result {
anyhow::bail!("failed to force downgrade upper, {err:?}");
}
}
write_handle.advance_upper(&Antichain::new()).await;
}

if force_downgrade_since {
Expand Down
16 changes: 16 additions & 0 deletions src/persist-client/src/internal/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,22 @@ where
})
}

/// Returns the ID of the given schema, if known at the current state.
pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
self.state
.read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
// The common case is that the requested schema is a recent one, so as a minor
// optimization, do this search in reverse order.
let mut schemas = state.collections.schemas.iter().rev();
schemas
.find(|(_, x)| {
K::decode_schema(&x.key) == *key_schema
&& V::decode_schema(&x.val) == *val_schema
})
.map(|(id, _)| *id)
})
}

/// Returns whether the current's state `since` and `upper` are both empty.
///
/// Due to sharing state with other handles, successive reads to this fn or any other may
Expand Down
2 changes: 0 additions & 2 deletions src/persist-client/src/internal/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ use crate::{PersistConfig, ShardId, WriterId, cfg};
/// A key and value `Schema` of data written to a batch or shard.
#[derive(Debug)]
pub struct Schemas<K: Codec, V: Codec> {
// TODO: Remove the Option once this finishes rolling out and all shards
// have a registered schema.
/// Id under which this schema is registered in the shard's schema registry,
/// if any.
pub id: Option<SchemaId>,
Expand Down
5 changes: 5 additions & 0 deletions src/persist-client/src/internal/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,11 @@ where
self.applier.latest_schema()
}

/// Returns the ID of the given schema, if known at the current state.
pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
self.applier.find_schema(key_schema, val_schema)
}

/// See [crate::PersistClient::compare_and_evolve_schema].
///
/// TODO: Unify this with [Self::register_schema]?
Expand Down
42 changes: 8 additions & 34 deletions src/persist-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use differential_dataflow::lattice::Lattice;
use itertools::Itertools;
use mz_build_info::{BuildInfo, build_info};
use mz_dyncfg::ConfigSet;
use mz_ore::{instrument, soft_assert_or_log};
use mz_ore::instrument;
use mz_persist::location::{Blob, Consensus, ExternalError};
use mz_persist_types::schema::SchemaId;
use mz_persist_types::{Codec, Codec64, Opaque};
Expand Down Expand Up @@ -490,10 +490,6 @@ impl PersistClient {
///
/// Use this to save latency and a bit of persist traffic if you're just
/// going to immediately drop or expire the [ReadHandle].
///
/// The `_schema` parameter is currently unused, but should be an object
/// that represents the schema of the data in the shard. This will be required
/// in the future.
#[instrument(level = "debug", fields(shard = %shard_id))]
pub async fn open_writer<K, V, T, D>(
&self,
Expand All @@ -511,23 +507,11 @@ impl PersistClient {
let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));

// TODO: Because schemas are ordered, as part of the persist schema
// changes work, we probably want to build some way to allow persist
// users to control the order. For example, maybe a
// `PersistClient::compare_and_append_schema(current_schema_id,
// next_schema)`. Presumably this would then be passed in to open_writer
// instead of us implicitly registering it here.
// NB: The overwhelming common case is that this schema is already
// registered. In this case, the cmd breaks early and nothing is
// written to (or read from) CRDB.
let (schema_id, maintenance) = machine.register_schema(&*key_schema, &*val_schema).await;
maintenance.start_performing(&machine, &gc);
soft_assert_or_log!(
schema_id.is_some(),
"unable to register schemas {:?} {:?}",
key_schema,
val_schema,
);
// We defer registering the schema until write time, to allow opening
// write handles in a "read-only" mode where they don't implicitly
// modify persist state. But it might already be registered, in which
// case we can fetch its ID.
let schema_id = machine.find_schema(&*key_schema, &*val_schema);

let writer_id = WriterId::new();
let schemas = Schemas {
Expand Down Expand Up @@ -1992,7 +1976,6 @@ mod tests {
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
async fn finalize_empty_shard(dyncfgs: ConfigUpdates) {
const EMPTY: &[(((), ()), u64, i64)] = &[];
let persist_client = new_test_client(&dyncfgs).await;

let shard_id = ShardId::new();
Expand All @@ -2006,11 +1989,7 @@ mod tests {
// Advance since and upper to empty, which is a pre-requisite for
// finalization/tombstoning.
let () = read.downgrade_since(&Antichain::new()).await;
let () = write
.compare_and_append(EMPTY, Antichain::from_elem(0), Antichain::new())
.await
.expect("usage should be valid")
.expect("upper should match");
let () = write.advance_upper(&Antichain::new()).await;

let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
.open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
Expand Down Expand Up @@ -2047,7 +2026,6 @@ mod tests {
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
async fn finalize_shard(dyncfgs: ConfigUpdates) {
const EMPTY: &[(((), ()), u64, i64)] = &[];
const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)];
let persist_client = new_test_client(&dyncfgs).await;

Expand All @@ -2069,11 +2047,7 @@ mod tests {
// Advance since and upper to empty, which is a pre-requisite for
// finalization/tombstoning.
let () = read.downgrade_since(&Antichain::new()).await;
let () = write
.compare_and_append(EMPTY, Antichain::from_elem(1), Antichain::new())
.await
.expect("usage should be valid")
.expect("upper should match");
let () = write.advance_upper(&Antichain::new()).await;

let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
.open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
Expand Down
4 changes: 3 additions & 1 deletion src/persist-client/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ mod tests {
let schema0 = StringsSchema(vec![false]);
let schema1 = StringsSchema(vec![false, true]);

let write0 = client
let mut write0 = client
.open_writer::<Strings, (), u64, i64>(
shard_id,
Arc::new(schema0.clone()),
Expand All @@ -614,6 +614,8 @@ mod tests {
)
.await
.unwrap();

write0.ensure_schema_registered().await;
assert_eq!(write0.write_schemas.id.unwrap(), SchemaId(0));

// Not backward compatible (yet... we don't support dropping a column at
Expand Down
Loading