Skip to content

Commit 3acd41c

Browse files
committed
Replacement materialized views
Implements the skeleton to support replacing materialized views. Specifically, the change introduces the following SQL syntax: ``` CREATE REPLACEMENT <replacement_name> FOR MATERIALIZED VIEW <view_name AS ... ``` This creates a new dataflow targeting the same shard. The dataflow selects an as-of of its inputs. The dataflow is read-only. ``` ALTER MATERIALIZED VIEW <view_name> APPLY REPLACEMENT <replacement_name> ``` Replaces the old materialized view with `<replacement_name>`. Enables writes for the replacement. The change adds convenience SQL syntax (`SHOW REPLACEMENTS`, `SHOW CREATE REPLACEMENT`), and relations to query the state of replacements. The syntax to create replacements is guarded by a feature flag that is off by default: `enable_replacement_materialized_views`. Signed-off-by: Moritz Hoffmann <[email protected]> Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent a84d4e8 commit 3acd41c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+2166
-260
lines changed

src/adapter/src/catalog/builtin_table_updates.rs

Lines changed: 87 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,18 @@ use mz_catalog::builtin::{
2626
MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
2727
MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OBJECT_GLOBAL_IDS,
2828
MZ_OPERATORS, MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
29-
MZ_PSEUDO_TYPES, MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SCHEMAS,
30-
MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES,
31-
MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
32-
MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
33-
MZ_WEBHOOKS_SOURCES,
29+
MZ_PSEUDO_TYPES, MZ_REPLACEMENT_MATERIALIZED_VIEWS, MZ_ROLE_AUTH, MZ_ROLE_MEMBERS,
30+
MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS,
31+
MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS,
32+
MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES,
33+
MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
3434
};
3535
use mz_catalog::config::AwsPrincipalContext;
3636
use mz_catalog::durable::SourceReferences;
3737
use mz_catalog::memory::error::{Error, ErrorKind};
3838
use mz_catalog::memory::objects::{
3939
CatalogEntry, CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func,
40-
Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
40+
Index, MaterializedView, ReplacementMaterializedView, Sink, Table, TableDataSource, Type, View,
4141
};
4242
use mz_controller::clusters::{
4343
ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
@@ -55,7 +55,7 @@ use mz_repr::adt::jsonb::Jsonb;
5555
use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
5656
use mz_repr::adt::regex;
5757
use mz_repr::network_policy_id::NetworkPolicyId;
58-
use mz_repr::refresh_schedule::RefreshEvery;
58+
use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
5959
use mz_repr::role_id::RoleId;
6060
use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, SqlScalarType, Timestamp};
6161
use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
@@ -818,6 +818,10 @@ impl CatalogState {
818818
CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
819819
id, oid, schema_id, name, owner_id, privileges, ct, diff,
820820
),
821+
CatalogItem::ReplacementMaterializedView(mview) => self
822+
.pack_replacement_materialized_view_update(
823+
id, oid, schema_id, name, owner_id, privileges, mview, diff,
824+
),
821825
};
822826

823827
if !entry.item().is_temporary() {
@@ -1480,7 +1484,23 @@ impl CatalogState {
14801484
diff,
14811485
));
14821486

1483-
if let Some(refresh_schedule) = &mview.refresh_schedule {
1487+
Self::pack_refresh_strategy_update(
1488+
&id,
1489+
mview.refresh_schedule.as_ref(),
1490+
diff,
1491+
&mut updates,
1492+
);
1493+
1494+
updates
1495+
}
1496+
1497+
fn pack_refresh_strategy_update(
1498+
id: &CatalogItemId,
1499+
refresh_schedule: Option<&RefreshSchedule>,
1500+
diff: Diff,
1501+
updates: &mut Vec<BuiltinTableUpdate<&BuiltinTable>>,
1502+
) {
1503+
if let Some(refresh_schedule) = refresh_schedule {
14841504
// This can't be `ON COMMIT`, because that is represented by a `None` instead of an
14851505
// empty `RefreshSchedule`.
14861506
assert!(!refresh_schedule.is_empty());
@@ -1537,6 +1557,65 @@ impl CatalogState {
15371557
diff,
15381558
));
15391559
}
1560+
}
1561+
1562+
fn pack_replacement_materialized_view_update(
1563+
&self,
1564+
id: CatalogItemId,
1565+
oid: u32,
1566+
schema_id: &SchemaSpecifier,
1567+
name: &str,
1568+
owner_id: &RoleId,
1569+
privileges: Datum,
1570+
mview: &ReplacementMaterializedView,
1571+
diff: Diff,
1572+
) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1573+
let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1574+
.unwrap_or_else(|e| {
1575+
panic!(
1576+
"create_sql cannot be invalid: `{}` --- error: `{}`",
1577+
mview.create_sql, e
1578+
)
1579+
})
1580+
.into_element()
1581+
.ast;
1582+
let query_string = match &create_stmt {
1583+
Statement::CreateReplacementMaterializedView(stmt) => {
1584+
let mut query_string = stmt.query.to_ast_string_stable();
1585+
// PostgreSQL appends a semicolon in `pg_matviews.definition`, we
1586+
// do the same for compatibility's sake.
1587+
query_string.push(';');
1588+
query_string
1589+
}
1590+
_ => unreachable!(),
1591+
};
1592+
1593+
let mut updates = Vec::new();
1594+
1595+
updates.push(BuiltinTableUpdate::row(
1596+
&*MZ_REPLACEMENT_MATERIALIZED_VIEWS,
1597+
Row::pack_slice(&[
1598+
Datum::String(&id.to_string()),
1599+
Datum::UInt32(oid),
1600+
Datum::String(&mview.replaces.to_string()),
1601+
Datum::String(&schema_id.to_string()),
1602+
Datum::String(name),
1603+
Datum::String(&mview.cluster_id.to_string()),
1604+
Datum::String(&query_string),
1605+
Datum::String(&owner_id.to_string()),
1606+
privileges,
1607+
Datum::String(&mview.create_sql),
1608+
Datum::String(&create_stmt.to_ast_string_redacted()),
1609+
]),
1610+
diff,
1611+
));
1612+
1613+
Self::pack_refresh_strategy_update(
1614+
&id,
1615+
mview.refresh_schedule.as_ref(),
1616+
diff,
1617+
&mut updates,
1618+
);
15401619

15411620
updates
15421621
}

src/adapter/src/catalog/state.rs

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ use mz_catalog::memory::error::{Error, ErrorKind};
3232
use mz_catalog::memory::objects::{
3333
CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
3434
Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35-
NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36-
TableDataSource, Type, View,
35+
NetworkPolicy, ReplacementMaterializedView, Role, RoleAuth, Schema, Secret, Sink, Source,
36+
SourceReferences, Table, TableDataSource, Type, View,
3737
};
3838
use mz_controller::clusters::{
3939
ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
@@ -76,9 +76,9 @@ use mz_sql::names::{
7676
ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
7777
};
7878
use mz_sql::plan::{
79-
CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
80-
CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
81-
Plan, PlanContext,
79+
CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan,
80+
CreateReplacementMaterializedViewPlan, CreateSecretPlan, CreateSinkPlan, CreateSourcePlan,
81+
CreateTablePlan, CreateTypePlan, CreateViewPlan, Params, Plan, PlanContext,
8282
};
8383
use mz_sql::rbac;
8484
use mz_sql::session::metadata::SessionMetadata;
@@ -423,7 +423,8 @@ impl CatalogState {
423423
item @ (CatalogItem::View(_)
424424
| CatalogItem::MaterializedView(_)
425425
| CatalogItem::Connection(_)
426-
| CatalogItem::ContinualTask(_)) => {
426+
| CatalogItem::ContinualTask(_)
427+
| CatalogItem::ReplacementMaterializedView(_)) => {
427428
// TODO(jkosh44) Unclear if this table wants to include all uses or only references.
428429
for item_id in item.references().items() {
429430
self.introspection_dependencies_inner(*item_id, out);
@@ -1452,6 +1453,86 @@ impl CatalogState {
14521453
details,
14531454
resolved_ids,
14541455
}),
1456+
Plan::CreateReplacementMaterializedView(CreateReplacementMaterializedViewPlan {
1457+
materialized_view,
1458+
replaces,
1459+
..
1460+
}) => {
1461+
// Collect optimizer parameters.
1462+
let optimizer_config =
1463+
optimize::OptimizerConfig::from(session_catalog.system_vars());
1464+
let previous_exprs = previous_item.map(|item| match item {
1465+
CatalogItem::ReplacementMaterializedView(materialized_view) => {
1466+
(materialized_view.raw_expr, materialized_view.optimized_expr)
1467+
}
1468+
item => {
1469+
unreachable!("expected replacement materialized view, found: {item:#?}")
1470+
}
1471+
});
1472+
1473+
let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1474+
(Some(local_expr), _)
1475+
if local_expr.optimizer_features == optimizer_config.features =>
1476+
{
1477+
debug!("local expression cache hit for {global_id:?}");
1478+
(
1479+
Arc::new(materialized_view.expr),
1480+
Arc::new(local_expr.local_mir),
1481+
)
1482+
}
1483+
// If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1484+
(_, Some((raw_expr, optimized_expr)))
1485+
if *raw_expr == materialized_view.expr =>
1486+
{
1487+
(Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1488+
}
1489+
(cached_expr, _) => {
1490+
let optimizer_features = optimizer_config.features.clone();
1491+
// TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
1492+
let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1493+
1494+
let raw_expr = materialized_view.expr;
1495+
let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1496+
Ok(optimized_expr) => optimized_expr,
1497+
Err(err) => return Err((err.into(), cached_expr)),
1498+
};
1499+
1500+
uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1501+
1502+
(Arc::new(raw_expr), Arc::new(optimized_expr))
1503+
}
1504+
};
1505+
let mut typ = optimized_expr.typ();
1506+
for &i in &materialized_view.non_null_assertions {
1507+
typ.column_types[i].nullable = false;
1508+
}
1509+
let desc = RelationDesc::new(typ, materialized_view.column_names);
1510+
1511+
let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1512+
1513+
// Resolve all item dependencies from the HIR expression.
1514+
let dependencies = raw_expr
1515+
.depends_on()
1516+
.into_iter()
1517+
.map(|gid| self.get_entry_by_global_id(&gid).id())
1518+
.collect();
1519+
1520+
CatalogItem::ReplacementMaterializedView(ReplacementMaterializedView {
1521+
create_sql: materialized_view.create_sql,
1522+
global_id,
1523+
replaces,
1524+
raw_expr,
1525+
optimized_expr,
1526+
desc,
1527+
resolved_ids,
1528+
dependencies,
1529+
cluster_id: materialized_view.cluster_id,
1530+
non_null_assertions: materialized_view.non_null_assertions,
1531+
custom_logical_compaction_window: materialized_view.compaction_window,
1532+
refresh_schedule: materialized_view.refresh_schedule,
1533+
initial_as_of,
1534+
})
1535+
}
14551536
_ => {
14561537
return Err((
14571538
Error::new(ErrorKind::Corruption {

src/adapter/src/catalog/timeline.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
use std::collections::{BTreeMap, BTreeSet};
1313

1414
use itertools::Itertools;
15-
use mz_catalog::memory::objects::{CatalogItem, ContinualTask, MaterializedView, View};
15+
use mz_catalog::memory::objects::{
16+
CatalogItem, ContinualTask, MaterializedView, ReplacementMaterializedView, View,
17+
};
1618
use mz_expr::CollectionPlan;
1719
use mz_ore::collections::CollectionExt;
1820
use mz_repr::{CatalogItemId, GlobalId};
@@ -82,7 +84,10 @@ impl Catalog {
8284
id_bundle.storage_ids.insert(source.global_id());
8385
}
8486
CatalogItem::MaterializedView(mv) => {
85-
id_bundle.storage_ids.insert(mv.global_id_writes());
87+
id_bundle.storage_ids.extend(mv.global_ids());
88+
}
89+
CatalogItem::ReplacementMaterializedView(mv) => {
90+
id_bundle.storage_ids.insert(mv.global_id());
8691
}
8792
CatalogItem::ContinualTask(ct) => {
8893
id_bundle.storage_ids.insert(ct.global_id());
@@ -218,6 +223,23 @@ impl Catalog {
218223
.map(|gid| self.resolve_item_id(&gid));
219224
ids.extend(item_ids);
220225
}
226+
CatalogItem::ReplacementMaterializedView(ReplacementMaterializedView {
227+
optimized_expr,
228+
..
229+
}) => {
230+
// In some cases the timestamp selected may not affect the answer to a
231+
// query, but it may affect our ability to query the materialized view.
232+
// Materialized views must durably materialize the result of a query, even
233+
// for constant queries. If we choose a timestamp larger than the upper,
234+
// which represents the current progress of the view, then the query will
235+
// need to block and wait for the materialized view to advance.
236+
timelines.insert(TimelineContext::TimestampDependent);
237+
let item_ids = optimized_expr
238+
.depends_on()
239+
.into_iter()
240+
.map(|gid| self.resolve_item_id(&gid));
241+
ids.extend(item_ids);
242+
}
221243
CatalogItem::ContinualTask(ContinualTask { raw_expr, .. }) => {
222244
// See comment in MaterializedView
223245
timelines.insert(TimelineContext::TimestampDependent);

0 commit comments

Comments
 (0)