Skip to content

Commit 9db9d12

Browse files
committed
Replacement materialized views
Implements the skeleton to support replacing materialized views. Specifically, the change introduces the following SQL syntax: ``` CREATE REPLACEMENT <replacement_name> FOR MATERIALIZED VIEW <view_name AS ... ``` This creates a new dataflow targeting the same shard. The dataflow selects an as-of of its inputs. The dataflow is read-only. ``` ALTER MATERIALIZED VIEW <view_name> APPLY REPLACEMENT <replacement_name> ``` Replaces the old materialized view with `<replacement_name>`. Enables writes for the replacement. The change adds convenience SQL syntax (`SHOW REPLACEMENTS`, `SHOW CREATE REPLACEMENT`), and relations to query the state of replacements. The syntax to create replacements is guarded by a feature flag that is off by default: `enable_replacement_materialized_views`. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 03cc66e commit 9db9d12

File tree

78 files changed

+4084
-343
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+4084
-343
lines changed
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# Replacement materialized views
2+
3+
Associated:
4+
- https://github.com/MaterializeInc/materialize/pull/34032 (MVP)
5+
- https://github.com/MaterializeInc/materialize/pull/34039 (Per-object read only mode)
6+
7+
## Problem
8+
9+
At the moment, a materialized view names a _view definition_ and the _output columns_ derived from that definition.
10+
We cannot change one or the other independently, which means that any change to a materialized view requires dropping and recreating it.
11+
This is inconvenient for users as changes need to cascade through the dependency graph.
12+
13+
We call this strong coupling between a materialized view and its definition and output columns.
14+
This design explores an alternative in which we can decouple these concepts, allowing users to change one without needing to drop and recreate the other.
15+
This would move us closer to a losely coupled system, which is easier to maintain and evolve over time.
16+
17+
## Success criteria
18+
19+
We allow users to change the definition and output columns of a materialized view without needing to drop and recreate it.
20+
We preserve existing dependencies on the materialized view when doing so, and we ensure that the system remains consistent.
21+
22+
Changing a running materialized can cause additional work for downstream consumers.
23+
While we cannot avoid this work, we aim to provide tools to quantify the amount of changed data.
24+
25+
## Solution proposal
26+
27+
We introduce the notion of a "replacement" for maintained SQL objects, starting with materialized views.
28+
A replacement allows users to stage the change of definition and output columns of a materialized view.
29+
The user can then inspect the replacement, and decide to apply or discard it.
30+
31+
We add the following SQL commands:
32+
* `CREATE REPLACEMENT replacement_name FOR MATERIALIZED VIEW mv_name AS SELECT ...`
33+
Creates a replacement for the specified materialized view with the new definition.
34+
The usual properties for materialized views apply, such as the cluster and its options.
35+
* `ALTER MATERIALIZED VIEW mv_name APPLY REPLACEMENT replacement_name`
36+
Applies the specified replacement to the materialized view.
37+
This updates the definition and output columns of the materialized view to match those of the replacement.
38+
Existing dependencies on the materialized view are preserved.
39+
* `DROP REPLACEMENT replacement_name`
40+
Discards the specified replacement without applying it.
41+
42+
When a replacement is created, we validate that the new definition is compatible with the existing materialized view.
43+
44+
### Schema evolution
45+
46+
When applying a replacement, we need to ensure that the new schema is compatible with the existing schema.
47+
We define compatibility as follows:
48+
1. The schema must be the same as the original schema,
49+
2. Or, the schema must be a superset of the original schema (i.e., it can add new columns but cannot remove existing ones).
50+
51+
## Minimal Viable Prototype
52+
53+
* Update the parser to support the above syntax.
54+
* Implement planning and sequencing for the new commands.
55+
* Support `SHOW REPLACEMENTS`, `SHOW CREATE REPLACEMENT` commands.
56+
* Add catalog relations for `replacements`: `mz_replacement_materialized_views`.
57+
* Treat a replacement as a first-class object in the catalog.
58+
* Record replacements and state transitions in the audit log.
59+
* Do not support schema evolution in the MVP.
60+
61+
The syntax allows users to create multiple replacements for the same target.
62+
This has some interesting implications:
63+
* Creating two replacements and applying them in reverse order creates versions where the more recent version has a smaller global ID than an older version.
64+
We're not relying on global ID's partial ordering for correctness, so this is acceptable.
65+
* We need to check the schema once when creating the replacement, and once when applying it.
66+
This is to ensure that the replacement is still valid at the time of application.
67+
* Alternatively, we could restrict to a single replacement per target at any time.
68+
This would simplify the implementation, but would also limit the user's ability to stage multiple changes.
69+
70+
## Future work
71+
72+
* Provide better introspection data for replacements, such as the ability to see the differences between the current and replacement definitions.
73+
* Surface metadata about the amount of staged changes (records, bytes) between the current and replacement definitions.
74+
* Introspect the actual changes.
75+
For example, which rows would be added or removed.
76+
* Automate applying a replacement once the new definition is hydrated.
77+
78+
## Alternatives
79+
80+
<!--
81+
What other solutions were considered, and why weren't they chosen?
82+
83+
This is your chance to demonstrate that you've fully discovered the problem.
84+
Alternative solutions can come from many places, like: you or your Materialize
85+
team members, our customers, our prospects, academic research, prior art, or
86+
competitive research. One of our company values is to "do the reading" and
87+
to "write things down." This is your opportunity to demonstrate both!
88+
-->
89+
90+
## Open questions
91+
92+
<!--
93+
What is left unaddressed by this design document that needs to be
94+
closed out?
95+
96+
When a design document is authored and shared, there might still be
97+
open questions that need to be explored. Through the design document
98+
process, you are responsible for getting answers to these open
99+
questions. All open questions should be answered by the time a design
100+
document is merged.
101+
-->

doc/user/content/sql/system-catalog/mz_internal.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1329,6 +1329,7 @@ The `mz_webhook_sources` table contains a row for each webhook source in the sys
13291329
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_prepared_statement_history -->
13301330
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_recent_sql_text -->
13311331
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_recent_sql_text_redacted -->
1332+
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_replacement_materialized_views -->
13321333
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_session_history -->
13331334
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_all_objects -->
13341335
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_clusters -->
@@ -1340,6 +1341,7 @@ The `mz_webhook_sources` table contains a row for each webhook source in the sys
13401341
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_indexes -->
13411342
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_materialized_views -->
13421343
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_network_policies -->
1344+
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_replacements -->
13431345
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_roles -->
13441346
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_schemas -->
13451347
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_secrets -->

misc/python/materialize/mzcompose/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ def get_minimal_system_parameters(
108108
"enable_refresh_every_mvs": "true",
109109
"enable_repr_typecheck": "true",
110110
"enable_cluster_schedule_refresh": "true",
111+
"enable_replacement_materialized_views": "true",
111112
"enable_sql_server_source": "true",
112113
"enable_statement_lifecycle_logging": "true",
113114
"enable_compute_temporal_bucketing": "true",

src/adapter/src/catalog.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,8 @@ impl Catalog {
396396
| CatalogItemType::Func
397397
| CatalogItemType::Secret
398398
| CatalogItemType::Connection
399-
| CatalogItemType::ContinualTask => {
399+
| CatalogItemType::ContinualTask
400+
| CatalogItemType::ReplacementMaterializedView => {
400401
dependencies.extend(global_ids);
401402
}
402403
CatalogItemType::View => {
@@ -1551,6 +1552,7 @@ pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType
15511552
CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
15521553
CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
15531554
CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1555+
CommentObjectId::ReplacementMaterializedView(_) => ObjectType::ReplacementMaterializedView,
15541556
}
15551557
}
15561558

@@ -1582,6 +1584,9 @@ pub(crate) fn system_object_type_to_audit_object_type(
15821584
mz_sql::catalog::ObjectType::Func => ObjectType::Func,
15831585
mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
15841586
mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1587+
mz_sql::catalog::ObjectType::ReplacementMaterializedView => {
1588+
ObjectType::ReplacementMaterializedView
1589+
}
15851590
},
15861591
SystemObjectType::System => ObjectType::System,
15871592
}

src/adapter/src/catalog/apply.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1925,7 +1925,8 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
19251925
| CatalogItemType::Type
19261926
| CatalogItemType::Func
19271927
| CatalogItemType::Secret
1928-
| CatalogItemType::Connection => push_update(
1928+
| CatalogItemType::Connection
1929+
| CatalogItemType::ReplacementMaterializedView => push_update(
19291930
StateUpdate {
19301931
kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
19311932
ts,
@@ -2046,7 +2047,8 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
20462047
CatalogItemType::Table => tables.push(update),
20472048
CatalogItemType::View
20482049
| CatalogItemType::MaterializedView
2049-
| CatalogItemType::Index => derived_items.push(update),
2050+
| CatalogItemType::Index
2051+
| CatalogItemType::ReplacementMaterializedView => derived_items.push(update),
20502052
CatalogItemType::Sink => sinks.push(update),
20512053
CatalogItemType::ContinualTask => continual_tasks.push(update),
20522054
}
@@ -2116,7 +2118,8 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
21162118
CatalogItemType::Table => tables.push(update),
21172119
CatalogItemType::View
21182120
| CatalogItemType::MaterializedView
2119-
| CatalogItemType::Index => derived_items.push(update),
2121+
| CatalogItemType::Index
2122+
| CatalogItemType::ReplacementMaterializedView => derived_items.push(update),
21202123
CatalogItemType::Sink => sinks.push(update),
21212124
CatalogItemType::ContinualTask => continual_tasks.push(update),
21222125
}

src/adapter/src/catalog/builtin_table_updates.rs

Lines changed: 89 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,18 @@ use mz_catalog::builtin::{
2626
MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
2727
MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OPERATORS,
2828
MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES,
29-
MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SCHEMAS, MZ_SECRETS,
30-
MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES,
31-
MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES,
32-
MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
29+
MZ_REPLACEMENT_MATERIALIZED_VIEWS, MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES,
30+
MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES,
31+
MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
32+
MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
33+
MZ_WEBHOOKS_SOURCES,
3334
};
3435
use mz_catalog::config::AwsPrincipalContext;
3536
use mz_catalog::durable::SourceReferences;
3637
use mz_catalog::memory::error::{Error, ErrorKind};
3738
use mz_catalog::memory::objects::{
3839
CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func, Index,
39-
MaterializedView, Sink, Table, TableDataSource, Type, View,
40+
MaterializedView, ReplacementMaterializedView, Sink, Table, TableDataSource, Type, View,
4041
};
4142
use mz_controller::clusters::{
4243
ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
@@ -54,7 +55,7 @@ use mz_repr::adt::jsonb::Jsonb;
5455
use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
5556
use mz_repr::adt::regex;
5657
use mz_repr::network_policy_id::NetworkPolicyId;
57-
use mz_repr::refresh_schedule::RefreshEvery;
58+
use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
5859
use mz_repr::role_id::RoleId;
5960
use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, SqlScalarType, Timestamp};
6061
use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
@@ -817,6 +818,10 @@ impl CatalogState {
817818
CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
818819
id, oid, schema_id, name, owner_id, privileges, ct, diff,
819820
),
821+
CatalogItem::ReplacementMaterializedView(mview) => self
822+
.pack_replacement_materialized_view_update(
823+
id, oid, schema_id, name, owner_id, privileges, mview, diff,
824+
),
820825
};
821826

822827
if !entry.item().is_temporary() {
@@ -1462,7 +1467,23 @@ impl CatalogState {
14621467
diff,
14631468
));
14641469

1465-
if let Some(refresh_schedule) = &mview.refresh_schedule {
1470+
Self::pack_refresh_strategy_update(
1471+
&id,
1472+
mview.refresh_schedule.as_ref(),
1473+
diff,
1474+
&mut updates,
1475+
);
1476+
1477+
updates
1478+
}
1479+
1480+
fn pack_refresh_strategy_update(
1481+
id: &CatalogItemId,
1482+
refresh_schedule: Option<&RefreshSchedule>,
1483+
diff: Diff,
1484+
updates: &mut Vec<BuiltinTableUpdate<&BuiltinTable>>,
1485+
) {
1486+
if let Some(refresh_schedule) = refresh_schedule {
14661487
// This can't be `ON COMMIT`, because that is represented by a `None` instead of an
14671488
// empty `RefreshSchedule`.
14681489
assert!(!refresh_schedule.is_empty());
@@ -1519,6 +1540,65 @@ impl CatalogState {
15191540
diff,
15201541
));
15211542
}
1543+
}
1544+
1545+
fn pack_replacement_materialized_view_update(
1546+
&self,
1547+
id: CatalogItemId,
1548+
oid: u32,
1549+
schema_id: &SchemaSpecifier,
1550+
name: &str,
1551+
owner_id: &RoleId,
1552+
privileges: Datum,
1553+
mview: &ReplacementMaterializedView,
1554+
diff: Diff,
1555+
) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1556+
let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1557+
.unwrap_or_else(|e| {
1558+
panic!(
1559+
"create_sql cannot be invalid: `{}` --- error: `{}`",
1560+
mview.create_sql, e
1561+
)
1562+
})
1563+
.into_element()
1564+
.ast;
1565+
let query_string = match &create_stmt {
1566+
Statement::CreateReplacementMaterializedView(stmt) => {
1567+
let mut query_string = stmt.query.to_ast_string_stable();
1568+
// PostgreSQL appends a semicolon in `pg_matviews.definition`, we
1569+
// do the same for compatibility's sake.
1570+
query_string.push(';');
1571+
query_string
1572+
}
1573+
_ => unreachable!(),
1574+
};
1575+
1576+
let mut updates = Vec::new();
1577+
1578+
updates.push(BuiltinTableUpdate::row(
1579+
&*MZ_REPLACEMENT_MATERIALIZED_VIEWS,
1580+
Row::pack_slice(&[
1581+
Datum::String(&id.to_string()),
1582+
Datum::UInt32(oid),
1583+
Datum::String(&mview.replaces.to_string()),
1584+
Datum::String(&schema_id.to_string()),
1585+
Datum::String(name),
1586+
Datum::String(&mview.cluster_id.to_string()),
1587+
Datum::String(&query_string),
1588+
Datum::String(&owner_id.to_string()),
1589+
privileges,
1590+
Datum::String(&mview.create_sql),
1591+
Datum::String(&create_stmt.to_ast_string_redacted()),
1592+
]),
1593+
diff,
1594+
));
1595+
1596+
Self::pack_refresh_strategy_update(
1597+
&id,
1598+
mview.refresh_schedule.as_ref(),
1599+
diff,
1600+
&mut updates,
1601+
);
15221602

15231603
updates
15241604
}
@@ -2266,7 +2346,8 @@ impl CatalogState {
22662346
| CommentObjectId::Connection(global_id)
22672347
| CommentObjectId::Secret(global_id)
22682348
| CommentObjectId::Type(global_id)
2269-
| CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2349+
| CommentObjectId::ContinualTask(global_id)
2350+
| CommentObjectId::ReplacementMaterializedView(global_id) => global_id.to_string(),
22702351
CommentObjectId::Role(role_id) => role_id.to_string(),
22712352
CommentObjectId::Database(database_id) => database_id.to_string(),
22722353
CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),

src/adapter/src/catalog/consistency.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,8 @@ impl CatalogState {
274274
| CommentObjectId::Connection(item_id)
275275
| CommentObjectId::Type(item_id)
276276
| CommentObjectId::Secret(item_id)
277-
| CommentObjectId::ContinualTask(item_id) => {
277+
| CommentObjectId::ContinualTask(item_id)
278+
| CommentObjectId::ReplacementMaterializedView(item_id) => {
278279
let entry = self.entry_by_id.get(&item_id);
279280
match entry {
280281
None => comment_inconsistencies

src/adapter/src/catalog/open.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -918,7 +918,8 @@ fn add_new_remove_old_builtin_items_migration(
918918
| CatalogItemType::Func
919919
| CatalogItemType::Secret
920920
| CatalogItemType::Connection
921-
| CatalogItemType::ContinualTask => continue,
921+
| CatalogItemType::ContinualTask
922+
| CatalogItemType::ReplacementMaterializedView => continue,
922923
};
923924
deleted_comments.insert(comment_id);
924925
}

src/adapter/src/catalog/open/builtin_item_migration.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,10 @@ pub(crate) async fn migrate_builtin_items(
9292
match &state.get_entry(&id).item() {
9393
Table(table) => Some(table.global_ids().into_element()),
9494
Source(source) => Some(source.global_id()),
95-
MaterializedView(mv) => Some(mv.global_id()),
95+
MaterializedView(mv) => Some(mv.global_id_writes()),
9696
ContinualTask(ct) => Some(ct.global_id()),
97+
// TODO(alter-mv): Do we need to migrate replacement materialized views?
98+
ReplacementMaterializedView(mv) => Some(mv.global_id()),
9799
Log(_) | Sink(_) | View(_) | Index(_) | Type(_) | Func(_) | Secret(_)
98100
| Connection(_) => None,
99101
}

0 commit comments

Comments
 (0)