diff --git a/Cargo.toml b/Cargo.toml index d5627662..f813b0fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,7 +126,8 @@ clickhouse-macros = { version = "0.3.0", path = "macros" } clickhouse-types = { version = "0.1.0", path = "types" } thiserror = "2.0" -serde = "1.0.106" +serde = { version = "1.0.106", features = ["derive"] } +serde_json = "1" bytes = "1.5.0" tokio = { version = "1.0.1", features = ["rt", "macros"] } http-body-util = "0.1.2" @@ -153,18 +154,17 @@ chrono = { version = "0.4", optional = true, features = ["serde"] } bstr = { version = "1.11.0", default-features = false } quanta = { version = "0.12", optional = true } replace_with = { version = "0.1.7" } +tracing = "0.1" [dev-dependencies] clickhouse-macros = { version = "0.3.0", path = "macros" } criterion = "0.6" -serde = { version = "1.0.106", features = ["derive"] } tokio = { version = "1.0.1", features = ["full", "test-util"] } hyper = { version = "1.1", features = ["server"] } indexmap = { version = "2.10.0", features = ["serde"] } linked-hash-map = { version = "0.5.6", features = ["serde_impl"] } fxhash = { version = "0.2.1" } serde_bytes = "0.11.4" -serde_json = "1" serde_repr = "0.1.7" uuid = { version = "1", features = ["v4", "serde"] } time = { version = "0.3.17", features = ["macros", "rand", "parsing"] } diff --git a/src/insert.rs b/src/insert.rs index 89ad7ac3..cd477585 100644 --- a/src/insert.rs +++ b/src/insert.rs @@ -354,6 +354,21 @@ impl Insert { debug_assert!(matches!(self.state, InsertState::NotStarted { .. })); let (client, sql) = self.state.client_with_sql().unwrap(); // checked above + let span = tracing::info_span!( + "clickhouse.insert", + status = tracing::field::Empty, + otel.status_code = tracing::field::Empty, + otel.kind = "CLIENT", + db.system.name = "clickhouse", + db.query.text = sql, + db.response.returned_rows = tracing::field::Empty, + db.response.read_bytes = tracing::field::Empty, + db.response.read_rows = tracing::field::Empty, + db.response.written_bytes = tracing::field::Empty, + db.response.written_rows = tracing::field::Empty, + ) + .entered(); + let mut url = Url::parse(&client.url).map_err(|err| Error::InvalidParams(err.into()))?; let mut pairs = url.query_pairs_mut(); pairs.clear(); @@ -385,9 +400,13 @@ impl Insert { .map_err(|err| Error::InvalidParams(Box::new(err)))?; let future = client.http.request(request); + let span = span.exit(); // TODO: introduce `Executor` to allow bookkeeping of spawned tasks. - let handle = - tokio::spawn(async move { Response::new(future, Compression::None).finish().await }); + let handle = tokio::spawn(async move { + Response::new(future, Compression::None, span) + .finish() + .await + }); match self.row_metadata { None => (), // RowBinary is used, no header is required. diff --git a/src/lib.rs b/src/lib.rs index e5c56b53..36075247 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,6 +39,7 @@ mod response; mod row; mod row_metadata; mod rowbinary; +mod summary_header; #[cfg(feature = "inserter")] mod ticks; @@ -385,6 +386,12 @@ impl Client { query::Query::new(self, query) } + /// Starts a new SELECT/DDL query, with the `wait_end_of_query` setting enabled + /// to buffer the full query results on the server + pub fn query_buffered(&self, query: &str) -> query::Query { + query::Query::new_buffered(self, query) + } + /// Enables or disables [`Row`] data types validation against the database schema /// at the cost of performance. Validation is enabled by default, and in this mode, /// the client will use `RowBinaryWithNamesAndTypes` format. diff --git a/src/query.rs b/src/query.rs index ee9c4b80..fc5a3625 100644 --- a/src/query.rs +++ b/src/query.rs @@ -24,6 +24,7 @@ use crate::headers::with_authentication; pub struct Query { client: Client, sql: SqlBuilder, + wait_end_of_query: bool, } impl Query { @@ -31,6 +32,15 @@ impl Query { Self { client: client.clone(), sql: SqlBuilder::new(template), + wait_end_of_query: false, + } + } + + pub(crate) fn new_buffered(client: &Client, template: &str) -> Self { + Self { + client: client.clone(), + sql: SqlBuilder::new(template), + wait_end_of_query: true, } } @@ -154,8 +164,25 @@ impl Query { read_only: bool, default_format: Option<&str>, ) -> Result { + let query_formatted = format!("{}", self.sql_display()); let query = self.sql.finish()?; + let execution_span = tracing::info_span!( + "clickhouse.query", + status = tracing::field::Empty, + otel.status_code = tracing::field::Empty, + otel.kind = "CLIENT", + db.system.name = "clickhouse", + db.query.text = query_formatted, + db.response.returned_rows = tracing::field::Empty, + db.response.read_bytes = tracing::field::Empty, + db.response.read_rows = tracing::field::Empty, + db.response.written_bytes = tracing::field::Empty, + db.response.written_rows = tracing::field::Empty, + clickhouse.wait_end_of_query = self.wait_end_of_query, + ) + .entered(); + let mut url = Url::parse(&self.client.url).map_err(|err| Error::InvalidParams(Box::new(err)))?; let mut pairs = url.query_pairs_mut(); @@ -186,6 +213,10 @@ impl Query { pairs.append_pair("compress", "1"); } + if self.wait_end_of_query { + pairs.append_pair("wait_end_of_query", "1"); + } + for (name, value) in &self.client.options { pairs.append_pair(name, value); } @@ -206,7 +237,11 @@ impl Query { .map_err(|err| Error::InvalidParams(Box::new(err)))?; let future = self.client.http.request(request); - Ok(Response::new(future, self.client.compression)) + Ok(Response::new( + future, + self.client.compression, + execution_span.exit(), + )) } /// Similar to [`Client::with_option`], but for this particular query only. diff --git a/src/response.rs b/src/response.rs index 3e233809..30b8d6f7 100644 --- a/src/response.rs +++ b/src/response.rs @@ -19,63 +19,111 @@ use crate::compression::lz4::Lz4Decoder; use crate::{ compression::Compression, error::{Error, Result}, + summary_header::Summary, }; +use tracing::{Instrument, Span}; + // === Response === pub(crate) enum Response { // Headers haven't been received yet. // `Box<_>` improves performance by reducing the size of the whole future. - Waiting(ResponseFuture), + Waiting(ResponseFuture, Span), // Headers have been received, streaming the body. - Loading(Chunks), + Loading(Chunks, Span), } pub(crate) type ResponseFuture = Pin> + Send>>; impl Response { - pub(crate) fn new(response: HyperResponseFuture, compression: Compression) -> Self { - Self::Waiting(Box::pin(async move { - let response = response.await?; - - let status = response.status(); - let exception_code = response.headers().get("X-ClickHouse-Exception-Code"); - - if status == StatusCode::OK && exception_code.is_none() { - // More likely to be successful, start streaming. - // It still can fail, but we'll handle it in `DetectDbException`. - Ok(Chunks::new(response.into_body(), compression)) - } else { - // An instantly failed request. - Err(collect_bad_response( - status, - exception_code - .and_then(|value| value.to_str().ok()) - .map(|code| format!("Code: {code}")), - response.into_body(), - compression, - ) - .await) + pub(crate) fn new(response: HyperResponseFuture, compression: Compression, span: Span) -> Self { + let inner_span = span.clone(); + Self::Waiting( + Box::pin(async move { + let response = response.await?; + if let Some(summary_header) = response.headers().get("x-clickhouse-summary") { + match serde_json::from_slice::(summary_header.as_bytes()) { + Ok(summary_header) => { + if let Some(rows) = summary_header.result_rows { + inner_span.record("db.response.returned_rows", rows); + } + if let Some(rows) = summary_header.read_rows { + inner_span.record("db.response.read_rows", rows); + } + if let Some(rows) = summary_header.written_rows { + inner_span.record("db.response.written_rows", rows); + } + if let Some(bytes) = summary_header.read_bytes { + inner_span.record("db.response.read_bytes", bytes); + } + if let Some(bytes) = summary_header.written_bytes { + inner_span.record("db.response.written_bytes", bytes); + } + tracing::debug!( + read_rows = summary_header.read_rows, + read_bytes = summary_header.read_bytes, + written_rows = summary_header.written_bytes, + written_bytes = summary_header.written_rows, + total_rows_to_read = summary_header.total_rows_to_read, + result_rows = summary_header.result_rows, + result_bytes = summary_header.result_bytes, + elapsed_ns = summary_header.elapsed_ns, + "finished processing query" + ) + } + Err(e) => { + tracing::warn!( + error = &e as &dyn std::error::Error, + ?summary_header, + "invalid x-clickhouse-summary header returned", + ); + } + } + } + + let status = response.status(); + let exception_code = response.headers().get("X-ClickHouse-Exception-Code"); + + if status == StatusCode::OK && exception_code.is_none() { + inner_span.record("otel.status_code", "OK"); + // More likely to be successful, start streaming. + // It still can fail, but we'll handle it in `DetectDbException`. + Ok(Chunks::new(response.into_body(), compression, inner_span)) + } else { + inner_span.record("otel.status_code", "ERROR"); + // An instantly failed request. + Err(collect_bad_response( + status, + exception_code + .and_then(|value| value.to_str().ok()) + .map(|code| format!("Code: {code}")), + response.into_body(), + compression, + ) + .await) } - })) + }), span) } pub(crate) fn into_future(self) -> ResponseFuture { match self { - Self::Waiting(future) => future, - Self::Loading(_) => panic!("response is already streaming"), + Self::Waiting(future, span) => Box::pin(future.instrument(span)), + Self::Loading(_, _) => panic!("response is already streaming"), } } pub(crate) async fn finish(&mut self) -> Result<()> { - let chunks = loop { + let (chunks, span) = loop { match self { - Self::Waiting(future) => *self = Self::Loading(future.await?), - Self::Loading(chunks) => break chunks, + Self::Waiting(future, span) => { + *self = Self::Loading(future.instrument(span.clone()).await?, span.clone()) + } + Self::Loading(chunks, span) => break (chunks, span), } }; - while chunks.try_next().await?.is_some() {} + while chunks.try_next().instrument(span.clone()).await?.is_some() {} Ok(()) } } @@ -153,23 +201,32 @@ pub(crate) struct Chunk { // * Uses `Option<_>` to make this stream fused. // * Uses `Box<_>` in order to reduce the size of cursors. -pub(crate) struct Chunks(Option>>>); +pub(crate) struct Chunks { + stream: Option>>>, + span: Option, +} impl Chunks { - fn new(stream: Incoming, compression: Compression) -> Self { + fn new(stream: Incoming, compression: Compression, span: Span) -> Self { let stream = IncomingStream(stream); let stream = Decompress::new(stream, compression); let stream = DetectDbException(stream); - Self(Some(Box::new(stream))) + Self { + stream: Some(Box::new(stream)), + span: Some(span), + } } pub(crate) fn empty() -> Self { - Self(None) + Self { + stream: None, + span: None, + } } #[cfg(feature = "futures03")] pub(crate) fn is_terminated(&self) -> bool { - self.0.is_none() + self.stream.is_none() } } @@ -177,16 +234,19 @@ impl Stream for Chunks { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let guard = self.span.take().map(|s| s.entered()); // We use `take()` to make the stream fused, including the case of panics. - if let Some(mut stream) = self.0.take() { + if let Some(mut stream) = self.stream.take() { let res = Pin::new(&mut stream).poll_next(cx); if matches!(res, Poll::Pending | Poll::Ready(Some(Ok(_)))) { - self.0 = Some(stream); + self.stream = Some(stream); + self.span = guard.map(|g| g.exit()); } res } else { + self.span = guard.map(|g| g.exit()); Poll::Ready(None) } } diff --git a/src/sql/mod.rs b/src/sql/mod.rs index 560fe5dd..bc60192e 100644 --- a/src/sql/mod.rs +++ b/src/sql/mod.rs @@ -1,4 +1,4 @@ -use std::fmt::{self, Display, Write}; +use std::fmt::{self, Display}; use crate::{ error::{Error, Result}, @@ -13,7 +13,7 @@ pub(crate) mod ser; #[derive(Debug, Clone)] pub(crate) enum SqlBuilder { - InProgress(Vec), + InProgress { template: String, parts: Vec }, Failed(String), } @@ -28,15 +28,7 @@ pub(crate) enum Part { impl fmt::Display for SqlBuilder { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - SqlBuilder::InProgress(parts) => { - for part in parts { - match part { - Part::Arg => f.write_char('?')?, - Part::Fields => f.write_str("?fields")?, - Part::Text(text) => f.write_str(text)?, - } - } - } + SqlBuilder::InProgress { template, .. } => f.write_str(template)?, SqlBuilder::Failed(err) => f.write_str(err)?, } Ok(()) @@ -69,11 +61,11 @@ impl SqlBuilder { parts.push(Part::Text(rest.to_string())); } - SqlBuilder::InProgress(parts) + SqlBuilder::InProgress { template: template.to_owned(), parts } } pub(crate) fn bind_arg(&mut self, value: impl Bind) { - let Self::InProgress(parts) = self else { + let Self::InProgress { parts , .. } = self else { return; }; @@ -91,7 +83,7 @@ impl SqlBuilder { } pub(crate) fn bind_fields(&mut self) { - let Self::InProgress(parts) = self else { + let Self::InProgress { parts , .. } = self else { return; }; @@ -108,7 +100,7 @@ impl SqlBuilder { let mut sql = String::new(); match self { - Self::InProgress(parts) => { + Self::InProgress { parts, .. } => { for part in parts { match part { Part::Text(text) => sql.push_str(&text), diff --git a/src/summary_header.rs b/src/summary_header.rs new file mode 100644 index 00000000..c3fedf53 --- /dev/null +++ b/src/summary_header.rs @@ -0,0 +1,36 @@ +use serde::de::Error; +use serde::{Deserialize, Deserializer}; + +#[derive(Debug, serde::Deserialize)] +pub(crate) struct Summary { + #[serde(deserialize_with = "int_in_string")] + pub read_rows: Option, + #[serde(deserialize_with = "int_in_string")] + pub read_bytes: Option, + #[serde(deserialize_with = "int_in_string")] + pub written_rows: Option, + #[serde(deserialize_with = "int_in_string")] + pub written_bytes: Option, + #[serde(deserialize_with = "int_in_string")] + pub total_rows_to_read: Option, + #[serde(deserialize_with = "int_in_string")] + pub result_rows: Option, + #[serde(deserialize_with = "int_in_string")] + pub result_bytes: Option, + #[serde(deserialize_with = "int_in_string")] + pub elapsed_ns: Option, +} + +fn int_in_string<'de, D>(deser: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + if let Some(string) = Option::<&str>::deserialize(deser)? { + let value: usize = string + .parse() + .map_err(|_| D::Error::custom("invalid integer"))?; + Ok(Some(value)) + } else { + Ok(None) + } +}