Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cf3b531
Update from main branch
slvrtrn Jun 20, 2025
197ce8b
Merge remote-tracking branch 'origin' into rbwnat-insert
slvrtrn Jun 26, 2025
1b24497
Initial impl of serializer validation
slvrtrn Jun 27, 2025
dcca54a
Add insert tests, reorganize existing RBWNAT tests
slvrtrn Jun 27, 2025
e8b9783
Merge remote-tracking branch 'origin' into rbwnat-insert
slvrtrn Jul 3, 2025
ab6047e
Fix docs issues
slvrtrn Jul 3, 2025
901659e
Fix cargo fmt
slvrtrn Jul 3, 2025
8d76d18
Fix `Insert::end` ownership
slvrtrn Jul 3, 2025
8ecfbe7
Move row_metadata around, don't use header for raw RowBinary
slvrtrn Jul 3, 2025
ae9a4b1
Support wrong struct field order with inserts
slvrtrn Jul 3, 2025
a0dbeb9
Add tests for various third-party `*Map` types
slvrtrn Jul 3, 2025
2284cce
Merge remote-tracking branch 'origin' into rbwnat-insert
slvrtrn Jul 29, 2025
d9c3125
cargo fmt
slvrtrn Jul 29, 2025
36350ac
Merge remote-tracking branch 'origin' into rbwnat-insert
slvrtrn Jul 30, 2025
59be19f
Add more tests
slvrtrn Jul 30, 2025
ac2ba48
Fix tuple test
slvrtrn Jul 30, 2025
3be06b0
Update README.md
slvrtrn Jul 30, 2025
b383bee
Adjust mocked_insert benchmark, add more tests
slvrtrn Jul 30, 2025
c5699bd
Fix clippy
slvrtrn Jul 30, 2025
170a545
Allow single element loop with disabled features
slvrtrn Jul 30, 2025
300de26
Fix from_utf8_lossy import in tests
slvrtrn Sep 22, 2025
fc97158
Merge remote-tracking branch 'origin' into rbwnat-insert
slvrtrn Sep 22, 2025
664b440
Fix futures imports
slvrtrn Sep 22, 2025
08de9a4
Fix rustfmt
slvrtrn Sep 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ criterion = "0.6"
serde = { version = "1.0.106", features = ["derive"] }
tokio = { version = "1.0.1", features = ["full", "test-util"] }
hyper = { version = "1.1", features = ["server"] }
indexmap = { version = "2.10.0", features = ["serde"] }
linked-hash-map = { version = "0.5.6", features = ["serde_impl"] }
fxhash = { version = "0.2.1" }
serde_bytes = "0.11.4"
serde_json = "1"
serde_repr = "0.1.7"
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ struct MyRow {
name: String,
}

let mut insert = client.insert("some")?;
let mut insert = client.insert::<MyRow>("some").await?;
Copy link
Contributor

@abonander abonander Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of ```rust,ignore we could use ```rust,no_run so that these at least get compile-tested.

I could take that on as a task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be done as a part of finalizing #160

insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
Expand All @@ -158,14 +158,14 @@ insert.end().await?;
Requires the `inserter` feature.

```rust,ignore
let mut inserter = client.inserter("some")?
let mut inserter = client.inserter::<MyRow>("some")?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
.with_max_bytes(50_000_000)
.with_max_rows(750_000)
.with_period(Some(Duration::from_secs(15)));

inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
inserter.write(&MyRow { no: 0, name: "foo".into() }).await?;
inserter.write(&MyRow { no: 1, name: "bar".into() }).await?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
println!(
Expand Down
116 changes: 80 additions & 36 deletions benches/mocked_insert.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,57 @@
use bytes::Bytes;
use clickhouse::{error::Result, Client, Compression, Row};
use clickhouse_types::{Column, DataTypeNode};
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use http_body_util::Empty;
use futures_util::stream;
use http_body_util::StreamBody;
use hyper::body::{Body, Frame};
use hyper::{body::Incoming, Request, Response};
use serde::Serialize;
use std::convert::Infallible;
use std::hint::black_box;
use std::net::SocketAddr;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::{
future::Future,
mem,
time::{Duration, Instant},
};

use clickhouse::{error::Result, Client, Compression, Row};

mod common;

async fn serve(request: Request<Incoming>) -> Response<Empty<Bytes>> {
async fn serve(
request: Request<Incoming>,
compression: Compression,
with_validation: bool,
) -> Response<impl Body<Data = Bytes, Error = Infallible>> {
common::skip_incoming(request).await;
Response::new(Empty::new())

let bytes = if with_validation {
let schema = vec![
Column::new("a".to_string(), DataTypeNode::UInt64),
Column::new("b".to_string(), DataTypeNode::Int64),
Column::new("c".to_string(), DataTypeNode::Int32),
Column::new("d".to_string(), DataTypeNode::UInt32),
Column::new("e".to_string(), DataTypeNode::UInt64),
Column::new("f".to_string(), DataTypeNode::UInt32),
Column::new("g".to_string(), DataTypeNode::UInt64),
Column::new("h".to_string(), DataTypeNode::Int64),
];

let mut buffer = Vec::new();
clickhouse_types::put_rbwnat_columns_header(&schema, &mut buffer).unwrap();

match compression {
Compression::None => Bytes::from(buffer),
#[cfg(feature = "lz4")]
Compression::Lz4 => clickhouse::_priv::lz4_compress(&buffer).unwrap(),
_ => unreachable!(),
}
} else {
Bytes::new()
};

let stream = StreamBody::new(stream::once(async { Ok(Frame::data(bytes)) }));
Response::new(stream)
}

#[derive(Row, Serialize)]
Expand Down Expand Up @@ -47,11 +81,18 @@ impl SomeRow {
}
}

async fn run_insert(client: Client, addr: SocketAddr, iters: u64) -> Result<Duration> {
let _server = common::start_server(addr, serve).await;
const ADDR: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 6524));

async fn run_insert(
client: Client,
iters: u64,
compression: Compression,
validation: bool,
) -> Result<Duration> {
let _server = common::start_server(ADDR, move |req| serve(req, compression, validation)).await;

let start = Instant::now();
let mut insert = client.insert::<SomeRow>("table")?;
let mut insert = client.insert::<SomeRow>("table").await?;

for _ in 0..iters {
insert.write(&SomeRow::sample()).await?;
Expand All @@ -64,65 +105,68 @@ async fn run_insert(client: Client, addr: SocketAddr, iters: u64) -> Result<Dura
#[cfg(feature = "inserter")]
async fn run_inserter<const WITH_PERIOD: bool>(
client: Client,
addr: SocketAddr,
iters: u64,
compression: Compression,
validation: bool,
) -> Result<Duration> {
let _server = common::start_server(addr, serve).await;
let _server = common::start_server(ADDR, move |req| serve(req, compression, validation)).await;

let start = Instant::now();
let mut inserter = client.inserter::<SomeRow>("table")?.with_max_rows(iters);
let mut inserter = client.inserter::<SomeRow>("table").with_max_rows(iters);

if WITH_PERIOD {
// Just to measure overhead, not to actually use it.
inserter = inserter.with_period(Some(Duration::from_secs(1000)));
}

for _ in 0..iters {
inserter.write(&SomeRow::sample())?;
inserter.write(&SomeRow::sample()).await?;
inserter.commit().await?;
}

inserter.end().await?;
Ok(start.elapsed())
}

fn run<F>(c: &mut Criterion, name: &str, port: u16, f: impl Fn(Client, SocketAddr, u64) -> F)
fn run<F>(c: &mut Criterion, name: &str, f: impl Fn(Client, u64, Compression, bool) -> F)
where
F: Future<Output = Result<Duration>> + Send + 'static,
{
let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let runner = common::start_runner();

let mut group = c.benchmark_group(name);
group.throughput(Throughput::Bytes(mem::size_of::<SomeRow>() as u64));
group.bench_function("uncompressed", |b| {
b.iter_custom(|iters| {
let client = Client::default()
.with_url(format!("http://{addr}"))
.with_compression(Compression::None);
runner.run((f)(client, addr, iters))
})
});
#[cfg(feature = "lz4")]
group.bench_function("lz4", |b| {
b.iter_custom(|iters| {
let client = Client::default()
.with_url(format!("http://{addr}"))
.with_compression(Compression::Lz4);
runner.run((f)(client, addr, iters))
})
});
for validation in [true, false] {
#[allow(clippy::single_element_loop)]
for compression in [
Compression::None,
#[cfg(feature = "lz4")]
Compression::Lz4,
] {
group.bench_function(
format!("validation={validation}/compression={compression:?}"),
|b| {
b.iter_custom(|iters| {
let client = Client::default()
.with_url(format!("http://{ADDR}"))
.with_compression(compression)
.with_validation(validation);
runner.run((f)(client, iters, compression, validation))
})
},
);
}
}
group.finish();
}

fn insert(c: &mut Criterion) {
run(c, "insert", 6543, run_insert);
run(c, "insert", run_insert);
}

#[cfg(feature = "inserter")]
fn inserter(c: &mut Criterion) {
run(c, "inserter", 6544, run_inserter::<false>);
run(c, "inserter-period", 6545, run_inserter::<true>);
run(c, "inserter", run_inserter::<false>);
run(c, "inserter-period", run_inserter::<true>);
}

#[cfg(not(feature = "inserter"))]
Expand Down
8 changes: 4 additions & 4 deletions benches/mocked_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ mod common;
async fn serve(
request: Request<Incoming>,
compression: Compression,
use_rbwnat: bool,
with_validation: bool,
) -> Response<impl Body<Data = Bytes, Error = Infallible>> {
common::skip_incoming(request).await;

let maybe_schema = if use_rbwnat {
let maybe_schema = if with_validation {
let schema = vec![
Column::new("a".to_string(), DataTypeNode::UInt64),
Column::new("b".to_string(), DataTypeNode::Int64),
Expand Down Expand Up @@ -79,8 +79,8 @@ fn prepare_chunk() -> Bytes {
const ADDR: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 6523));

fn select(c: &mut Criterion) {
async fn start_server(compression: Compression, use_rbwnat: bool) -> common::ServerHandle {
common::start_server(ADDR, move |req| serve(req, compression, use_rbwnat)).await
async fn start_server(compression: Compression, with_validation: bool) -> common::ServerHandle {
common::start_server(ADDR, move |req| serve(req, compression, with_validation)).await
}

let runner = common::start_runner();
Expand Down
32 changes: 16 additions & 16 deletions benches/select_market_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,21 @@ async fn prepare_data() {
client
.query(
r#"
CREATE TABLE IF NOT EXISTS l2_book_log
(
`instrument_id` UInt32 CODEC(T64, Default),
`received_time` DateTime64(9) CODEC(DoubleDelta, Default),
`exchange_time` Nullable(DateTime64(9)) CODEC(DoubleDelta, Default),
`sequence_no` Nullable(UInt64) CODEC(DoubleDelta, Default),
`trace_id` UInt64 CODEC(DoubleDelta, Default),
`side` Enum8('Bid' = 0, 'Ask' = 1),
`price` Float64,
`amount` Float64,
`is_eot` Bool
)
ENGINE = MergeTree
PRIMARY KEY (instrument_id, received_time)
"#,
CREATE TABLE IF NOT EXISTS l2_book_log
(
`instrument_id` UInt32 CODEC(T64, Default),
`received_time` DateTime64(9) CODEC(DoubleDelta, Default),
`exchange_time` Nullable(DateTime64(9)) CODEC(DoubleDelta, Default),
`sequence_no` Nullable(UInt64) CODEC(DoubleDelta, Default),
`trace_id` UInt64 CODEC(DoubleDelta, Default),
`side` Enum8('Bid' = 0, 'Ask' = 1),
`price` Float64,
`amount` Float64,
`is_eot` Bool
)
ENGINE = MergeTree
PRIMARY KEY (instrument_id, received_time)
"#,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using raw string literals is also something idiomatic to SQLx, though strictly speaking this is only necessary for when you need to quote identifiers as it avoids the need for escapes: https://docs.rs/sqlx/latest/sqlx/macro.query.html#type-overrides-output-columns

Since it appears ClickHouse uses backticks instead of double quotes for quoted identifiers, there's not much benefit to using raw string literals here.

)
.execute()
.await
Expand All @@ -71,7 +71,7 @@ async fn prepare_data() {
return;
}

let mut insert = client.insert::<L2Update>("l2_book_log").unwrap();
let mut insert = client.insert::<L2Update>("l2_book_log").await.unwrap();

for i in 0..10_000_000 {
insert
Expand Down
2 changes: 1 addition & 1 deletion examples/async_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn main() -> Result<()> {
.execute()
.await?;

let mut insert = client.insert::<Event>(table_name)?;
let mut insert = client.insert::<Event>(table_name).await?;
insert
.write(&Event {
timestamp: now(),
Expand Down
2 changes: 1 addition & 1 deletion examples/clickhouse_cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn main() -> clickhouse::error::Result<()> {
.execute()
.await?;

let mut insert = client.insert::<Data>(table_name)?;
let mut insert = client.insert::<Data>(table_name).await?;
insert
.write(&Data {
id: 42,
Expand Down
2 changes: 1 addition & 1 deletion examples/data_types_derive_containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn main() -> Result<()> {
.execute()
.await?;

let mut insert = client.insert::<Row>(table_name)?;
let mut insert = client.insert::<Row>(table_name).await?;
insert.write(&Row::new()).await?;
insert.end().await?;

Expand Down
2 changes: 1 addition & 1 deletion examples/data_types_derive_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async fn main() -> Result<()> {
.execute()
.await?;

let mut insert = client.insert::<Row>(table_name)?;
let mut insert = client.insert::<Row>(table_name).await?;
insert.write(&Row::new()).await?;
insert.end().await?;

Expand Down
2 changes: 1 addition & 1 deletion examples/data_types_new_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn main() -> Result<()> {
.to_string(),
};

let mut insert = client.insert::<Row>(table_name)?;
let mut insert = client.insert::<Row>(table_name).await?;
insert.write(&row).await?;
insert.end().await?;

Expand Down
2 changes: 1 addition & 1 deletion examples/data_types_variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn main() -> Result<()> {
.execute()
.await?;

let mut insert = client.insert::<MyRow>(table_name)?;
let mut insert = client.insert::<MyRow>(table_name).await?;
let rows_to_insert = get_rows();
for row in rows_to_insert {
insert.write(&row).await?;
Expand Down
2 changes: 1 addition & 1 deletion examples/enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn main() -> Result<()> {
Error = 4,
}

let mut insert = client.insert::<Event>("event_log")?;
let mut insert = client.insert::<Event>("event_log").await?;
insert
.write(&Event {
timestamp: now(),
Expand Down
8 changes: 4 additions & 4 deletions examples/inserter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct MyRow {
// In other words, this pattern is applicable for ETL-like tasks.
async fn dense(client: &Client, mut rx: Receiver<u32>) -> Result<()> {
let mut inserter = client
.inserter::<MyRow>(TABLE_NAME)?
.inserter::<MyRow>(TABLE_NAME)
// We limit the number of rows to be inserted in a single `INSERT` statement.
// We use small value (100) for the example only.
// See documentation of `with_max_rows` for details.
Expand All @@ -32,7 +32,7 @@ async fn dense(client: &Client, mut rx: Receiver<u32>) -> Result<()> {
.with_max_bytes(1_048_576);

while let Some(no) = rx.recv().await {
inserter.write(&MyRow { no })?;
inserter.write(&MyRow { no }).await?;
inserter.commit().await?;
}

Expand All @@ -47,7 +47,7 @@ async fn dense(client: &Client, mut rx: Receiver<u32>) -> Result<()> {
// Some rows are arriving one by one with delay, some batched.
async fn sparse(client: &Client, mut rx: Receiver<u32>) -> Result<()> {
let mut inserter = client
.inserter::<MyRow>(TABLE_NAME)?
.inserter::<MyRow>(TABLE_NAME)
// Slice the stream into chunks (one `INSERT` per chunk) by time.
// See documentation of `with_period` for details.
.with_period(Some(Duration::from_millis(100)))
Expand Down Expand Up @@ -85,7 +85,7 @@ async fn sparse(client: &Client, mut rx: Receiver<u32>) -> Result<()> {
Err(TryRecvError::Disconnected) => break,
};

inserter.write(&MyRow { no })?;
inserter.write(&MyRow { no }).await?;
inserter.commit().await?;

// You can use result of `commit()` to get the number of rows inserted.
Expand Down
2 changes: 1 addition & 1 deletion examples/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn make_select(client: &Client) -> Result<Vec<SomeRow>> {
}

async fn make_insert(client: &Client, data: &[SomeRow]) -> Result<()> {
let mut insert = client.insert::<SomeRow>("who cares")?;
let mut insert = client.insert::<SomeRow>("who cares").await?;
for row in data {
insert.write(row).await?;
}
Expand Down
Loading