Skip to content

Commit e56a49c

Browse files
committed
Replacement materialized views
Implements the skeleton to support replacing materialized views. Specifically, the change introduces the following SQL syntax: ``` CREATE REPLACEMENT <replacement_name> FOR MATERIALIZED VIEW <view_name AS ... ``` This creates a new dataflow targeting the same shard. The dataflow selects an as-of of its inputs. The dataflow is read-only. ``` ALTER MATERIALIZED VIEW <view_name> APPLY REPLACEMENT <replacement_name> ``` Replaces the old materialized view with `<replacement_name>`. Enables writes for the replacement. The change adds convenience SQL syntax (`SHOW REPLACEMENTS`, `SHOW CREATE REPLACEMENT`), and relations to query the state of replacements. The syntax to create replacements is guarded by a feature flag that is off by default: `enable_replacement_materialized_views`. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 07a134e commit e56a49c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+3992
-349
lines changed

doc/user/content/sql/system-catalog/mz_internal.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1329,6 +1329,7 @@ The `mz_webhook_sources` table contains a row for each webhook source in the sys
13291329
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_prepared_statement_history -->
13301330
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_recent_sql_text -->
13311331
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_recent_sql_text_redacted -->
1332+
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_replacement_materialized_views -->
13321333
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_session_history -->
13331334
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_all_objects -->
13341335
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_clusters -->
@@ -1340,6 +1341,7 @@ The `mz_webhook_sources` table contains a row for each webhook source in the sys
13401341
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_indexes -->
13411342
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_materialized_views -->
13421343
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_network_policies -->
1344+
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_replacements -->
13431345
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_roles -->
13441346
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_schemas -->
13451347
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_secrets -->

misc/python/materialize/mzcompose/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ def get_minimal_system_parameters(
108108
"enable_refresh_every_mvs": "true",
109109
"enable_repr_typecheck": "true",
110110
"enable_cluster_schedule_refresh": "true",
111+
"enable_replacement_materialized_views": "true",
111112
"enable_sql_server_source": "true",
112113
"enable_statement_lifecycle_logging": "true",
113114
"enable_compute_temporal_bucketing": "true",

src/adapter/src/catalog.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,8 @@ impl Catalog {
396396
| CatalogItemType::Func
397397
| CatalogItemType::Secret
398398
| CatalogItemType::Connection
399-
| CatalogItemType::ContinualTask => {
399+
| CatalogItemType::ContinualTask
400+
| CatalogItemType::ReplacementMaterializedView => {
400401
dependencies.extend(global_ids);
401402
}
402403
CatalogItemType::View => {
@@ -1553,6 +1554,7 @@ pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType
15531554
CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
15541555
CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
15551556
CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1557+
CommentObjectId::ReplacementMaterializedView(_) => ObjectType::ReplacementMaterializedView,
15561558
}
15571559
}
15581560

@@ -1584,6 +1586,9 @@ pub(crate) fn system_object_type_to_audit_object_type(
15841586
mz_sql::catalog::ObjectType::Func => ObjectType::Func,
15851587
mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
15861588
mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1589+
mz_sql::catalog::ObjectType::ReplacementMaterializedView => {
1590+
ObjectType::ReplacementMaterializedView
1591+
}
15871592
},
15881593
SystemObjectType::System => ObjectType::System,
15891594
}

src/adapter/src/catalog/apply.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1925,7 +1925,8 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
19251925
| CatalogItemType::Type
19261926
| CatalogItemType::Func
19271927
| CatalogItemType::Secret
1928-
| CatalogItemType::Connection => push_update(
1928+
| CatalogItemType::Connection
1929+
| CatalogItemType::ReplacementMaterializedView => push_update(
19291930
StateUpdate {
19301931
kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
19311932
ts,
@@ -2046,7 +2047,8 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
20462047
CatalogItemType::Table => tables.push(update),
20472048
CatalogItemType::View
20482049
| CatalogItemType::MaterializedView
2049-
| CatalogItemType::Index => derived_items.push(update),
2050+
| CatalogItemType::Index
2051+
| CatalogItemType::ReplacementMaterializedView => derived_items.push(update),
20502052
CatalogItemType::Sink => sinks.push(update),
20512053
CatalogItemType::ContinualTask => continual_tasks.push(update),
20522054
}
@@ -2116,7 +2118,8 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
21162118
CatalogItemType::Table => tables.push(update),
21172119
CatalogItemType::View
21182120
| CatalogItemType::MaterializedView
2119-
| CatalogItemType::Index => derived_items.push(update),
2121+
| CatalogItemType::Index
2122+
| CatalogItemType::ReplacementMaterializedView => derived_items.push(update),
21202123
CatalogItemType::Sink => sinks.push(update),
21212124
CatalogItemType::ContinualTask => continual_tasks.push(update),
21222125
}

src/adapter/src/catalog/builtin_table_updates.rs

Lines changed: 89 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,18 @@ use mz_catalog::builtin::{
2626
MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
2727
MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OPERATORS,
2828
MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES,
29-
MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SCHEMAS, MZ_SECRETS,
30-
MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES,
31-
MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES,
32-
MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
29+
MZ_REPLACEMENT_MATERIALIZED_VIEWS, MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES,
30+
MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES,
31+
MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
32+
MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
33+
MZ_WEBHOOKS_SOURCES,
3334
};
3435
use mz_catalog::config::AwsPrincipalContext;
3536
use mz_catalog::durable::SourceReferences;
3637
use mz_catalog::memory::error::{Error, ErrorKind};
3738
use mz_catalog::memory::objects::{
3839
CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func, Index,
39-
MaterializedView, Sink, Table, TableDataSource, Type, View,
40+
MaterializedView, ReplacementMaterializedView, Sink, Table, TableDataSource, Type, View,
4041
};
4142
use mz_controller::clusters::{
4243
ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
@@ -54,7 +55,7 @@ use mz_repr::adt::jsonb::Jsonb;
5455
use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
5556
use mz_repr::adt::regex;
5657
use mz_repr::network_policy_id::NetworkPolicyId;
57-
use mz_repr::refresh_schedule::RefreshEvery;
58+
use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
5859
use mz_repr::role_id::RoleId;
5960
use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, SqlScalarType, Timestamp};
6061
use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
@@ -817,6 +818,10 @@ impl CatalogState {
817818
CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
818819
id, oid, schema_id, name, owner_id, privileges, ct, diff,
819820
),
821+
CatalogItem::ReplacementMaterializedView(mview) => self
822+
.pack_replacement_materialized_view_update(
823+
id, oid, schema_id, name, owner_id, privileges, mview, diff,
824+
),
820825
};
821826

822827
if !entry.item().is_temporary() {
@@ -1462,7 +1467,23 @@ impl CatalogState {
14621467
diff,
14631468
));
14641469

1465-
if let Some(refresh_schedule) = &mview.refresh_schedule {
1470+
Self::pack_refresh_strategy_update(
1471+
&id,
1472+
mview.refresh_schedule.as_ref(),
1473+
diff,
1474+
&mut updates,
1475+
);
1476+
1477+
updates
1478+
}
1479+
1480+
fn pack_refresh_strategy_update(
1481+
id: &CatalogItemId,
1482+
refresh_schedule: Option<&RefreshSchedule>,
1483+
diff: Diff,
1484+
updates: &mut Vec<BuiltinTableUpdate<&BuiltinTable>>,
1485+
) {
1486+
if let Some(refresh_schedule) = refresh_schedule {
14661487
// This can't be `ON COMMIT`, because that is represented by a `None` instead of an
14671488
// empty `RefreshSchedule`.
14681489
assert!(!refresh_schedule.is_empty());
@@ -1519,6 +1540,65 @@ impl CatalogState {
15191540
diff,
15201541
));
15211542
}
1543+
}
1544+
1545+
fn pack_replacement_materialized_view_update(
1546+
&self,
1547+
id: CatalogItemId,
1548+
oid: u32,
1549+
schema_id: &SchemaSpecifier,
1550+
name: &str,
1551+
owner_id: &RoleId,
1552+
privileges: Datum,
1553+
mview: &ReplacementMaterializedView,
1554+
diff: Diff,
1555+
) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1556+
let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1557+
.unwrap_or_else(|e| {
1558+
panic!(
1559+
"create_sql cannot be invalid: `{}` --- error: `{}`",
1560+
mview.create_sql, e
1561+
)
1562+
})
1563+
.into_element()
1564+
.ast;
1565+
let query_string = match &create_stmt {
1566+
Statement::CreateReplacementMaterializedView(stmt) => {
1567+
let mut query_string = stmt.query.to_ast_string_stable();
1568+
// PostgreSQL appends a semicolon in `pg_matviews.definition`, we
1569+
// do the same for compatibility's sake.
1570+
query_string.push(';');
1571+
query_string
1572+
}
1573+
_ => unreachable!(),
1574+
};
1575+
1576+
let mut updates = Vec::new();
1577+
1578+
updates.push(BuiltinTableUpdate::row(
1579+
&*MZ_REPLACEMENT_MATERIALIZED_VIEWS,
1580+
Row::pack_slice(&[
1581+
Datum::String(&id.to_string()),
1582+
Datum::UInt32(oid),
1583+
Datum::String(&mview.replaces.to_string()),
1584+
Datum::String(&schema_id.to_string()),
1585+
Datum::String(name),
1586+
Datum::String(&mview.cluster_id.to_string()),
1587+
Datum::String(&query_string),
1588+
Datum::String(&owner_id.to_string()),
1589+
privileges,
1590+
Datum::String(&mview.create_sql),
1591+
Datum::String(&create_stmt.to_ast_string_redacted()),
1592+
]),
1593+
diff,
1594+
));
1595+
1596+
Self::pack_refresh_strategy_update(
1597+
&id,
1598+
mview.refresh_schedule.as_ref(),
1599+
diff,
1600+
&mut updates,
1601+
);
15221602

15231603
updates
15241604
}
@@ -2266,7 +2346,8 @@ impl CatalogState {
22662346
| CommentObjectId::Connection(global_id)
22672347
| CommentObjectId::Secret(global_id)
22682348
| CommentObjectId::Type(global_id)
2269-
| CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2349+
| CommentObjectId::ContinualTask(global_id)
2350+
| CommentObjectId::ReplacementMaterializedView(global_id) => global_id.to_string(),
22702351
CommentObjectId::Role(role_id) => role_id.to_string(),
22712352
CommentObjectId::Database(database_id) => database_id.to_string(),
22722353
CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),

src/adapter/src/catalog/consistency.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,8 @@ impl CatalogState {
274274
| CommentObjectId::Connection(item_id)
275275
| CommentObjectId::Type(item_id)
276276
| CommentObjectId::Secret(item_id)
277-
| CommentObjectId::ContinualTask(item_id) => {
277+
| CommentObjectId::ContinualTask(item_id)
278+
| CommentObjectId::ReplacementMaterializedView(item_id) => {
278279
let entry = self.entry_by_id.get(&item_id);
279280
match entry {
280281
None => comment_inconsistencies

src/adapter/src/catalog/open.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -887,7 +887,8 @@ fn add_new_remove_old_builtin_items_migration(
887887
| CatalogItemType::Func
888888
| CatalogItemType::Secret
889889
| CatalogItemType::Connection
890-
| CatalogItemType::ContinualTask => continue,
890+
| CatalogItemType::ContinualTask
891+
| CatalogItemType::ReplacementMaterializedView => continue,
891892
};
892893
deleted_comments.insert(comment_id);
893894
}

0 commit comments

Comments
 (0)