Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ async-trait = "0.1"
ipld-core = { version = "0.4", features = ["serde"] }
serde_ipld_dagcbor = "0.6"
ucan-capabilities-object = { git = "https://github.com/tinycloudlabs/ucan-capabilities-object" }
rusqlite = { version = "0.32", features = ["bundled", "column_decltype", "hooks", "backup"] }
sqlparser = { version = "0.44", features = ["serde"] }
tracing = "0.1"

# Internal crate dependencies
tinycloud-lib = { path = "tinycloud-lib", version = "1.0.0" }
Expand Down
5 changes: 5 additions & 0 deletions src/auth_guards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ where
.map_err(|_| Status::InternalServerError)?,
)
.respond_to(request),
InvocationOutcome::SqlResult(json) => Json(json).respond_to(request),
InvocationOutcome::SqlExport(data) => Response::build()
.header(ContentType::new("application", "x-sqlite3"))
.sized_body(data.len(), std::io::Cursor::new(data))
.ok(),
}
}
}
Expand Down
30 changes: 30 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,33 @@ pub struct SpacesConfig {
pub allowlist: Option<SpaceAllowListService>,
}

#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)]
pub struct SqlStorageConfig {
#[serde(default = "default_sql_path")]
pub path: String,
pub limit: Option<ByteUnit>,
#[serde(default = "default_sql_memory_threshold")]
pub memory_threshold: ByteUnit,
}

fn default_sql_path() -> String {
"./tinycloud/sql".to_string()
}

fn default_sql_memory_threshold() -> ByteUnit {
ByteUnit::Mebibyte(10)
}

impl Default for SqlStorageConfig {
fn default() -> Self {
Self {
path: default_sql_path(),
limit: None,
memory_threshold: default_sql_memory_threshold(),
}
}
}

#[serde_as]
#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)]
pub struct Storage {
Expand All @@ -95,6 +122,8 @@ pub struct Storage {
#[serde(default = "memory_db")]
pub database: String,
pub limit: Option<ByteUnit>,
#[serde(default)]
pub sql: SqlStorageConfig,
}

impl Default for Storage {
Expand All @@ -104,6 +133,7 @@ impl Default for Storage {
staging: StagingStorage::default().into(),
database: memory_db(),
limit: None,
sql: SqlStorageConfig::default(),
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use storage::{
use tinycloud_core::{
keys::{SecretsSetup, StaticSecret},
sea_orm::{ConnectOptions, Database, DatabaseConnection},
sql::SqlService,
storage::{either::Either, memory::MemoryStaging, StorageConfig},
SpaceDatabase,
};
Expand Down Expand Up @@ -93,13 +94,19 @@ pub async fn app(config: &Figment) -> Result<Rocket<Build>> {
)
.await?;

let sql_service = SqlService::new(
tinycloud_config.storage.sql.path.clone(),
tinycloud_config.storage.sql.memory_threshold.as_u64(),
);

let rocket = rocket::custom(config)
.mount("/", routes)
.attach(AdHoc::config::<Config>())
.attach(tracing::TracingFairing {
header_name: tinycloud_config.log.tracing.traceheader,
})
.manage(tinycloud)
.manage(sql_service)
.manage(tinycloud_config.storage.staging.open().await?);

if tinycloud_config.cors {
Expand Down
131 changes: 129 additions & 2 deletions src/routes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::Result;
use rocket::{data::ToByteUnit, http::Status, serde::json::Json, State};
use serde::Serialize;
use std::collections::HashMap;
use tokio::io::AsyncReadExt;
use tokio_util::compat::TokioAsyncReadCompatExt;
use tracing::{info_span, Instrument};

Expand All @@ -14,10 +15,11 @@ use crate::{
};
use tinycloud_core::{
sea_orm::DbErr,
sql::{SqlCaveats, SqlError, SqlRequest, SqlService},
storage::{ImmutableReadStore, ImmutableStaging},
types::Resource,
util::{DelegationInfo, InvocationInfo},
TxError, TxStoreError,
InvocationOutcome, TxError, TxStoreError,
};

pub mod util;
Expand All @@ -35,7 +37,7 @@ pub fn version() -> Json<VersionInfo> {
Json(VersionInfo {
protocol: tinycloud_lib::protocol::PROTOCOL_VERSION,
version: env!("CARGO_PKG_VERSION").to_string(),
features: vec!["kv", "delegation", "sharing"],
features: vec!["kv", "delegation", "sharing", "sql"],
})
}

Expand Down Expand Up @@ -116,6 +118,7 @@ pub async fn delegate(
}

#[post("/invoke", data = "<data>")]
#[allow(clippy::too_many_arguments)]
pub async fn invoke(
i: AuthHeaderGetter<InvocationInfo>,
req_span: TracingSpan,
Expand All @@ -124,6 +127,7 @@ pub async fn invoke(
staging: &State<BlockStage>,
tinycloud: &State<TinyCloud>,
config: &State<Config>,
sql_service: &State<SqlService>,
) -> Result<DataOut<<BlockStores as ImmutableReadStore>::Readable>, (Status, String)> {
let action_label = "invocation";
let span = info_span!(parent: &req_span.0, "invoke", action = %action_label);
Expand All @@ -133,6 +137,32 @@ pub async fn invoke(
.with_label_values(&["invoke"])
.start_timer();

// Check for SQL capabilities
let sql_caps: Vec<_> = i
.0
.0
.capabilities
.iter()
.filter_map(|c| match (&c.resource, c.ability.as_ref().as_ref()) {
(Resource::TinyCloud(r), ability)
if r.service().as_str() == "sql" && ability.starts_with("tinycloud.sql/") =>
{
Some((
r.space().clone(),
r.path().map(|p| p.to_string()),
ability.to_string(),
))
}
_ => None,
})
.collect();

if !sql_caps.is_empty() {
let result = handle_sql_invoke(i, data, tinycloud, sql_service, &sql_caps).await;
timer.observe_duration();
return result;
}

let mut put_iter = i.0 .0.capabilities.iter().filter_map(|c| {
match (&c.resource, c.ability.as_ref().as_ref()) {
(Resource::TinyCloud(r), "tinycloud.kv/put")
Expand Down Expand Up @@ -229,3 +259,100 @@ pub async fn invoke(
.instrument(span)
.await
}

async fn handle_sql_invoke(
i: AuthHeaderGetter<InvocationInfo>,
data: DataIn<'_>,
tinycloud: &State<TinyCloud>,
sql_service: &State<SqlService>,
sql_caps: &[(tinycloud_lib::resource::SpaceId, Option<String>, String)],
) -> Result<DataOut<<BlockStores as ImmutableReadStore>::Readable>, (Status, String)> {
// Extract caveats from the invocation facts before consuming i
let caveats: Option<SqlCaveats> =
i.0 .0
.invocation
.payload()
.facts
.as_ref()
.and_then(|facts| {
facts.iter().find_map(|fact| {
fact.as_object()
.and_then(|obj| obj.get("sqlCaveats"))
.and_then(|v| serde_json::from_value(v.clone()).ok())
})
});

// Verify authorization by invoking with empty inputs
// SQL capabilities don't match KV patterns, so invoke just verifies auth
tinycloud
.invoke::<BlockStage>(i.0, HashMap::new())
.await
.map_err(|e| {
(
match e {
TxStoreError::Tx(TxError::SpaceNotFound) => Status::NotFound,
TxStoreError::Tx(TxError::Db(DbErr::ConnectionAcquire(_))) => {
Status::InternalServerError
}
_ => Status::Unauthorized,
},
e.to_string(),
)
})?;

// Read the request body as JSON
let body_str = match data {
DataIn::One(d) => {
let mut buf = Vec::new();
let mut reader = d.open(1u8.megabytes());
reader
.read_to_end(&mut buf)
.await
.map_err(|e| (Status::BadRequest, e.to_string()))?;
String::from_utf8(buf).map_err(|e| (Status::BadRequest, e.to_string()))?
}
_ => {
return Err((Status::BadRequest, "Expected JSON body".to_string()));
}
};

let (space, path, ability) = &sql_caps[0];
let db_name = SqlService::db_name_from_path(path.as_deref());

let sql_request: SqlRequest =
serde_json::from_str(&body_str).map_err(|e| (Status::BadRequest, e.to_string()))?;

// Handle export specially
if matches!(sql_request, SqlRequest::Export) {
let data = sql_service
.export(space, &db_name)
.await
.map_err(|e| (sql_error_to_status(&e), e.to_string()))?;
return Ok(DataOut::One(InvOut(InvocationOutcome::SqlExport(data))));
}

let response = sql_service
.execute(space, &db_name, sql_request, caveats, ability.clone())
.await
.map_err(|e| (sql_error_to_status(&e), e.to_string()))?;

let json =
serde_json::to_value(response).map_err(|e| (Status::InternalServerError, e.to_string()))?;

Ok(DataOut::One(InvOut(InvocationOutcome::SqlResult(json))))
}

fn sql_error_to_status(err: &SqlError) -> Status {
match err {
SqlError::Sqlite(_) => Status::BadRequest,
SqlError::PermissionDenied(_) => Status::Forbidden,
SqlError::DatabaseNotFound => Status::NotFound,
SqlError::ResponseTooLarge(_) => Status::new(413),
SqlError::QuotaExceeded => Status::new(429),
SqlError::InvalidStatement(_) => Status::BadRequest,
SqlError::SchemaError(_) => Status::BadRequest,
SqlError::ReadOnlyViolation => Status::Forbidden,
SqlError::ParseError(_) => Status::BadRequest,
SqlError::Internal(_) => Status::InternalServerError,
}
}
4 changes: 4 additions & 0 deletions tinycloud-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ serde_json.workspace = true
serde_ipld_dagcbor = "0.3"
ucan-capabilities-object.workspace = true
multihash-derive = "0.9"
rusqlite.workspace = true
sqlparser.workspace = true
tracing.workspace = true
tokio.workspace = true

[dev-dependencies]
sea-orm = { version = "1.1", features = ["runtime-tokio-rustls", "sqlx-sqlite"] }
Expand Down
Loading