Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ build_exceptions! {
DatabaseAlreadyExists(2301),
/// Table already exists
TableAlreadyExists(2302),
/// Database version mismatch
DatabaseVersionMismatched(2303),
/// View already exists
ViewAlreadyExists(2306),
/// Create table with drop time
Expand Down
63 changes: 63 additions & 0 deletions src/meta/api/src/database_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use chrono::Utc;
use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::app_error::CreateDatabaseWithDropTime;
use databend_common_meta_app::app_error::DatabaseAlreadyExists;
use databend_common_meta_app::app_error::DatabaseVersionMismatched;
use databend_common_meta_app::app_error::UndropDbHasNoHistory;
use databend_common_meta_app::app_error::UndropDbWithNoDropTime;
use databend_common_meta_app::app_error::UnknownDatabase;
Expand All @@ -43,10 +44,13 @@ use databend_common_meta_app::schema::RenameDatabaseReply;
use databend_common_meta_app::schema::RenameDatabaseReq;
use databend_common_meta_app::schema::UndropDatabaseReply;
use databend_common_meta_app::schema::UndropDatabaseReq;
use databend_common_meta_app::schema::UpdateDatabaseOptionsReply;
use databend_common_meta_app::schema::UpdateDatabaseOptionsReq;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_types::ConditionResult::Eq;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaId;
use databend_common_meta_types::SeqV;
Expand All @@ -65,6 +69,7 @@ use crate::error_util::db_has_to_not_exist;
use crate::fetch_id;
use crate::kv_app_error::KVAppError;
use crate::kv_pb_api::KVPbApi;
use crate::kv_pb_api::UpsertPB;
use crate::serialize_struct;
use crate::serialize_u64;
use crate::txn_backoff::txn_backoff;
Expand Down Expand Up @@ -494,6 +499,64 @@ where
}
}

#[logcall::logcall]
#[fastrace::trace]
async fn update_database_options(
&self,
req: UpdateDatabaseOptionsReq,
) -> Result<UpdateDatabaseOptionsReply, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let db_id = req.db_id;
let expected_seq = req.expected_meta_seq;
let new_options = req.options.clone();
let db_key = DatabaseId::new(db_id);

let seq_meta = self.get_pb(&db_key).await?;
let Some(seq_meta) = seq_meta else {
return Err(KVAppError::AppError(AppError::UnknownDatabaseId(
UnknownDatabaseId::new(db_id, "update_database_options"),
)));
};

if seq_meta.seq != expected_seq {
return Err(KVAppError::AppError(AppError::DatabaseVersionMismatched(
DatabaseVersionMismatched::new(
db_id,
MatchSeq::Exact(expected_seq),
seq_meta.seq,
"update_database_options",
),
)));
}

let mut meta = seq_meta.data;
meta.options = new_options;
meta.updated_on = Utc::now();

let upsert = UpsertPB::update_exact(db_key, SeqV::new(expected_seq, meta));
let transition = self.upsert_pb(&upsert).await?;

if !transition.is_changed() {
let curr_seq = self
.get_pb(&db_key)
.await?
.map(|v| v.seq())
.unwrap_or_default();

return Err(KVAppError::AppError(AppError::DatabaseVersionMismatched(
DatabaseVersionMismatched::new(
db_id,
MatchSeq::Exact(expected_seq),
curr_seq,
"update_database_options",
),
)));
}

Ok(UpdateDatabaseOptionsReply {})
}

#[logcall::logcall]
#[fastrace::trace]
async fn get_database(&self, req: GetDatabaseReq) -> Result<Arc<DatabaseInfo>, KVAppError> {
Expand Down
28 changes: 28 additions & 0 deletions src/meta/app/src/app_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,26 @@ impl TableVersionMismatched {
}
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[error("DatabaseVersionMismatched: {db_id} expect `{expect}` but `{curr}` while `{context}`")]
pub struct DatabaseVersionMismatched {
db_id: u64,
expect: MatchSeq,
curr: u64,
context: String,
}

impl DatabaseVersionMismatched {
pub fn new(db_id: u64, expect: MatchSeq, curr: u64, context: impl Into<String>) -> Self {
Self {
db_id,
expect,
curr,
context: context.into(),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[error("StreamAlreadyExists: {name} while {context}")]
pub struct StreamAlreadyExists {
Expand Down Expand Up @@ -1010,6 +1030,9 @@ pub enum AppError {
#[error(transparent)]
TableVersionMismatched(#[from] TableVersionMismatched),

#[error(transparent)]
DatabaseVersionMismatched(#[from] DatabaseVersionMismatched),

#[error(transparent)]
DuplicatedUpsertFiles(#[from] DuplicatedUpsertFiles),

Expand Down Expand Up @@ -1293,6 +1316,8 @@ impl AppErrorMessage for UnknownDatabaseId {}

impl AppErrorMessage for TableVersionMismatched {}

impl AppErrorMessage for DatabaseVersionMismatched {}

impl AppErrorMessage for StreamAlreadyExists {
fn message(&self) -> String {
format!("'{}' as stream Already Exists", self.name)
Expand Down Expand Up @@ -1653,6 +1678,9 @@ impl From<AppError> for ErrorCode {
AppError::UndropTableHasNoHistory(err) => {
ErrorCode::UndropTableHasNoHistory(err.message())
}
AppError::DatabaseVersionMismatched(err) => {
ErrorCode::DatabaseVersionMismatched(err.message())
}
AppError::TableVersionMismatched(err) => {
ErrorCode::TableVersionMismatched(err.message())
}
Expand Down
23 changes: 23 additions & 0 deletions src/meta/app/src/schema/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,29 @@ pub struct DropDatabaseReply {
pub db_id: u64,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UpdateDatabaseOptionsReq {
pub db_id: u64,
/// The database meta sequence the caller observed. Used for CAS semantics.
pub expected_meta_seq: u64,
/// The complete option map that should replace the existing options when the
/// expected meta sequence still matches.
pub options: BTreeMap<String, String>,
}

impl Display for UpdateDatabaseOptionsReq {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"update_db_options:{}@{}={:?}",
self.db_id, self.expected_meta_seq, self.options
)
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UpdateDatabaseOptionsReply {}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UndropDatabaseReq {
pub name_ident: DatabaseNameIdent,
Expand Down
2 changes: 2 additions & 0 deletions src/meta/app/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub use database::RenameDatabaseReq;
pub use database::ShareDbId;
pub use database::UndropDatabaseReply;
pub use database::UndropDatabaseReq;
pub use database::UpdateDatabaseOptionsReply;
pub use database::UpdateDatabaseOptionsReq;
pub use database_id::DatabaseId;
pub use database_id_history_ident::DatabaseIdHistoryIdent;
pub use dictionary::*;
Expand Down
23 changes: 22 additions & 1 deletion src/query/ast/src/ast/statements/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,17 @@ impl Display for CreateDatabaseStmt {
write!(f, " ENGINE = {engine}")?;
}

// TODO(leiysky): display rest information
if !self.options.is_empty() {
write!(f, " OPTIONS (")?;
for (i, option) in self.options.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{} = '{}'", option.name, option.value)?;
}
write!(f, ")")?;
}

Ok(())
}
}
Expand Down Expand Up @@ -169,6 +179,16 @@ impl Display for AlterDatabaseStmt {
AlterDatabaseAction::RefreshDatabaseCache => {
write!(f, " REFRESH CACHE")?;
}
AlterDatabaseAction::SetOptions { options } => {
write!(f, " SET OPTIONS (")?;
for (i, option) in options.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{} = '{}'", option.name, option.value)?;
}
write!(f, ")")?;
}
}

Ok(())
Expand All @@ -179,6 +199,7 @@ impl Display for AlterDatabaseStmt {
pub enum AlterDatabaseAction {
RenameDatabase { new_db: Identifier },
RefreshDatabaseCache,
SetOptions { options: Vec<SQLProperty> },
}

#[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)]
Expand Down
65 changes: 46 additions & 19 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub type ShareDatabaseParams = (ShareNameIdent, Identifier);
#[derive(Clone)]
pub enum CreateDatabaseOption {
DatabaseEngine(DatabaseEngine),
Options(Vec<SQLProperty>),
}

fn procedure_type_name(i: Input) -> IResult<Vec<TypeName>> {
Expand Down Expand Up @@ -796,28 +797,24 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
~ ( DATABASE | SCHEMA )
~ ( IF ~ ^NOT ~ ^EXISTS )?
~ #database_ref
~ #create_database_option?
~ ( ENGINE ~ ^"=" ~ ^#database_engine )?
~ ( OPTIONS ~ ^"(" ~ ^#sql_property_list ~ ^")" )?
},
|(_, opt_or_replace, _, opt_if_not_exists, database, create_database_option)| {
|(_, opt_or_replace, _, opt_if_not_exists, database, engine_opt, options_opt)| {
let create_option =
parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?;

let statement = match create_database_option {
Some(CreateDatabaseOption::DatabaseEngine(engine)) => {
Statement::CreateDatabase(CreateDatabaseStmt {
create_option,
database,
engine: Some(engine),
options: vec![],
})
}
None => Statement::CreateDatabase(CreateDatabaseStmt {
create_option,
database,
engine: None,
options: vec![],
}),
};
let engine = engine_opt.map(|(_, _, engine)| engine);
let options = options_opt
.map(|(_, _, options, _)| options)
.unwrap_or_default();

let statement = Statement::CreateDatabase(CreateDatabaseStmt {
create_option,
database,
engine,
options,
});

Ok(statement)
},
Expand Down Expand Up @@ -4133,9 +4130,17 @@ pub fn alter_database_action(i: Input) -> IResult<AlterDatabaseAction> {
|(_, _)| AlterDatabaseAction::RefreshDatabaseCache,
);

let set_options = map(
rule! {
SET ~ OPTIONS ~ "(" ~ #sql_property_list ~ ")"
},
|(_, _, _, options, _)| AlterDatabaseAction::SetOptions { options },
);

rule!(
#rename_database
| #refresh_cache
| #set_options
)(i)
}

Expand Down Expand Up @@ -5061,18 +5066,40 @@ pub fn database_engine(i: Input) -> IResult<DatabaseEngine> {
}

pub fn create_database_option(i: Input) -> IResult<CreateDatabaseOption> {
let mut create_db_engine = map(
let create_db_engine = map(
rule! {
ENGINE ~ ^"=" ~ ^#database_engine
},
|(_, _, option)| CreateDatabaseOption::DatabaseEngine(option),
);

let create_db_options = map(
rule! {
OPTIONS ~ "(" ~ #sql_property_list ~ ")"
},
|(_, _, options, _)| CreateDatabaseOption::Options(options),
);

rule!(
#create_db_engine
| #create_db_options
)(i)
}

pub fn sql_property_list(i: Input) -> IResult<Vec<SQLProperty>> {
let property = map(
rule! {
#ident ~ "=" ~ #option_to_string
},
|(name, _, value)| SQLProperty {
name: name.name,
value,
},
);

comma_separated_list1(property)(i)
}

pub fn catalog_type(i: Input) -> IResult<CatalogType> {
alt((
value(CatalogType::Default, rule! { DEFAULT }),
Expand Down
3 changes: 3 additions & 0 deletions src/query/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ fn test_statement() {
r#"create database if not exists a;"#,
r#"create database ctl.t engine = Default;"#,
r#"create database t engine = Default;"#,
r#"create database test_db OPTIONS (DEFAULT_STORAGE_CONNECTION = 'my_conn', DEFAULT_STORAGE_PATH = 's3://bucket/path');"#,
r#"create database mydb ENGINE = DEFAULT OPTIONS (DEFAULT_STORAGE_CONNECTION = 'test_conn', DEFAULT_STORAGE_PATH = 's3://test/path');"#,
r#"CREATE TABLE `t3`(a int not null, b int not null, c int not null) bloom_index_columns='a,b,c' COMPRESSION='zstd' STORAGE_FORMAT='native';"#,
r#"create or replace database a;"#,
r#"drop database ctl.t;"#,
Expand All @@ -168,6 +170,7 @@ fn test_statement() {
r#"create view v1(c1) as select number % 3 as a from numbers(1000);"#,
r#"create or replace view v1(c1) as select number % 3 as a from numbers(1000);"#,
r#"alter view v1(c2) as select number % 3 as a from numbers(1000);"#,
r#"alter database test_db SET OPTIONS (DEFAULT_STORAGE_CONNECTION = 'updated_conn');"#,
r#"show views"#,
r#"show views format TabSeparatedWithNamesAndTypes;"#,
r#"show full views"#,
Expand Down
2 changes: 1 addition & 1 deletion src/query/ast/tests/it/testdata/stmt-error.txt
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ error:
--> SQL:1:23
|
1 | alter database system x rename to db
| ----- ^ unexpected `x`, expecting `RENAME`, `REFRESH`, or `.`
| ----- ^ unexpected `x`, expecting `RENAME`, `REFRESH`, `SET`, or `.`
| |
| while parsing `ALTER DATABASE [IF EXISTS] <action>`

Expand Down
Loading